Commits working, Transactions Working, Arbitrations working, Still needs a lot of...
authorAli Younis <ayounis@uci.edu>
Sun, 25 Dec 2016 09:33:22 +0000 (01:33 -0800)
committerAli Younis <ayounis@uci.edu>
Sun, 25 Dec 2016 09:33:22 +0000 (01:33 -0800)
version2/src/java/iotcloud/Commit.java
version2/src/java/iotcloud/Slot.java
version2/src/java/iotcloud/SlotBuffer.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java
version2/src/java/iotcloud/ThreeTuple.java [new file with mode: 0644]

index 1ee044d..86650c9 100644 (file)
@@ -77,6 +77,7 @@ class Commit extends Entry {
        }
 
        public Entry getCopy(Slot s) {
+               // System.out.println("Commit Rescued:  " + this);  // TODO remove
                return new Commit(s, seqnumtrans, keyValueUpdateSet);
        }
 
@@ -89,14 +90,17 @@ class Commit extends Entry {
                        for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
                                KeyValue kv2 = i.next();
 
-                               if (kv1.getKey() == kv2.getKey()) {
-                                       keyValueUpdateSet.remove(kv2);
+                               if (kv1.getKey().equals(kv2.getKey())) {
+                                       // keyValueUpdateSet.remove(kv2);
+                                       i.remove();
                                        break;
                                }
                        }
                }
 
-               if (keyValueUpdateSet.size() == 0)
+               if (keyValueUpdateSet.size() == 0) {
+                       // System.out.println("Killed Commit:  " + this); // TODO remove
                        this.setDead();
+               }
        }
 }
\ No newline at end of file
index 5828fd3..2ceb142 100644 (file)
@@ -12,9 +12,9 @@ import java.util.Arrays;
 
 class Slot implements Liveness {
        /** Sets the slot size. */
-       static final int SLOT_SIZE=2048;
+       static final int SLOT_SIZE = 2048;
        /** Sets the size for the HMAC. */
-       static final int HMAC_SIZE=32;
+       static final int HMAC_SIZE = 32;
 
        /** Sequence number of the slot. */
        private long seqnum;
@@ -37,15 +37,15 @@ class Slot implements Liveness {
        private Table table;
 
        Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
-               seqnum=_seqnum;
-               machineid=_machineid;
-               prevhmac=_prevhmac;
-               hmac=_hmac;
-               entries=new Vector<Entry>();
-               livecount=1;
-               seqnumlive=true;
+               seqnum = _seqnum;
+               machineid = _machineid;
+               prevhmac = _prevhmac;
+               hmac = _hmac;
+               entries = new Vector<Entry>();
+               livecount = 1;
+               seqnumlive = true;
                freespace = SLOT_SIZE - getBaseSize();
-               table=_table;
+               table = _table;
        }
 
        Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) {
@@ -64,11 +64,18 @@ class Slot implements Liveness {
                return prevhmac;
        }
 
-       void addEntry(Entry e) {
-               e=e.getCopy(this);
+       Entry addEntry(Entry e) {
+               e = e.getCopy(this);
                entries.add(e);
                livecount++;
                freespace -= e.getSize();
+               return e;
+       }
+
+       void removeEntry(Entry e) {
+               entries.remove(e);
+               livecount--;
+               freespace += e.getSize();
        }
 
        private void addShallowEntry(Entry e) {
@@ -91,23 +98,23 @@ class Slot implements Liveness {
        }
 
        static Slot decode(Table table, byte[] array, Mac mac) {
-               mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
-               byte[] realmac=mac.doFinal();
+               mac.update(array, HMAC_SIZE, array.length - HMAC_SIZE);
+               byte[] realmac = mac.doFinal();
 
-               ByteBuffer bb=ByteBuffer.wrap(array);
-               byte[] hmac=new byte[HMAC_SIZE];
-               byte[] prevhmac=new byte[HMAC_SIZE];
+               ByteBuffer bb = ByteBuffer.wrap(array);
+               byte[] hmac = new byte[HMAC_SIZE];
+               byte[] prevhmac = new byte[HMAC_SIZE];
                bb.get(hmac);
                bb.get(prevhmac);
                if (!Arrays.equals(realmac, hmac))
                        throw new Error("Server Error: Invalid HMAC!  Potential Attack!");
 
-               long seqnum=bb.getLong();
-               long machineid=bb.getLong();
-               int numentries=bb.getInt();
-               Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac);
+               long seqnum = bb.getLong();
+               long machineid = bb.getLong();
+               int numentries = bb.getInt();
+               Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac);
 
-               for(int i=0; i<numentries; i++) {
+               for (int i = 0; i < numentries; i++) {
                        slot.addShallowEntry(Entry.decode(slot, bb));
                }
 
@@ -115,20 +122,20 @@ class Slot implements Liveness {
        }
 
        byte[] encode(Mac mac) {
-               byte[] array=new byte[SLOT_SIZE];
-               ByteBuffer bb=ByteBuffer.wrap(array);
+               byte[] array = new byte[SLOT_SIZE];
+               ByteBuffer bb = ByteBuffer.wrap(array);
                /* Leave space for the slot HMAC.  */
                bb.position(HMAC_SIZE);
                bb.put(prevhmac);
                bb.putLong(seqnum);
                bb.putLong(machineid);
                bb.putInt(entries.size());
-               for(Entry entry:entries) {
+               for (Entry entry : entries) {
                        entry.encode(bb);
                }
                /* Compute our HMAC */
-               mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
-               byte[] realmac=mac.doFinal();
+               mac.update(array, HMAC_SIZE, array.length - HMAC_SIZE);
+               byte[] realmac = mac.doFinal();
                hmac = realmac;
                bb.position(0);
                bb.put(realmac);
@@ -140,7 +147,7 @@ class Slot implements Liveness {
         * identifier, the sequence number, and the number of entries.
         */
        int getBaseSize() {
-               return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES;
+               return 2 * HMAC_SIZE + 2 * Long.BYTES + Integer.BYTES;
        }
 
        /**
@@ -150,14 +157,14 @@ class Slot implements Liveness {
         */
 
        Vector<Entry> getLiveEntries(boolean resize) {
-               Vector<Entry> liveEntries=new Vector<Entry>();
-               for(Entry entry: entries) {
+               Vector<Entry> liveEntries = new Vector<Entry>();
+               for (Entry entry : entries) {
                        if (entry.isLive()) {
                                if (!resize || entry.getType() != Entry.TypeTableStatus)
                                        liveEntries.add(entry);
                        }
                }
-                       
+
                if (seqnumlive && !resize)
                        liveEntries.add(new LastMessage(this, machineid, seqnum));
 
@@ -186,7 +193,7 @@ class Slot implements Liveness {
         */
 
        void setDead() {
-               seqnumlive=false;
+               seqnumlive = false;
                decrementLiveCount();
        }
 
@@ -196,8 +203,10 @@ class Slot implements Liveness {
 
        void decrementLiveCount() {
                livecount--;
-               if (livecount==0)
+               if (livecount == 0) {
+                       // System.out.println("Slot Set Dead"); // TODO: remove
                        table.decrementLiveCount();
+               }
        }
 
        /**
@@ -209,6 +218,6 @@ class Slot implements Liveness {
        }
 
        public String toString() {
-               return "<"+getSequenceNumber()+">";
+               return "<" + getSequenceNumber() + ">";
        }
 }
index 14bc926..ba6c3de 100644 (file)
@@ -12,12 +12,12 @@ class SlotBuffer {
        private Slot[] array;
        private int head;
        private int tail;
-       private long oldestseqn;
+       public long oldestseqn;
 
        SlotBuffer() {
-               array=new Slot[DEFAULT_SIZE+1];
-               head=tail=0;
-               oldestseqn=0;
+               array = new Slot[DEFAULT_SIZE + 1];
+               head = tail = 0;
+               oldestseqn = 0;
        }
 
        int size() {
@@ -31,12 +31,12 @@ class SlotBuffer {
        }
 
        void resize(int newsize) {
-               if (newsize == (array.length-1))
+               if (newsize == (array.length - 1))
                        return;
-               Slot[] newarray = new Slot[newsize+1];
+               Slot[] newarray = new Slot[newsize + 1];
                int currsize = size();
                int index = tail;
-               for(int i=0; i < currsize; i++) {
+               for (int i = 0; i < currsize; i++) {
                        newarray[i] = array[index];
                        if ((++index) == array.length)
                                index = 0;
@@ -49,42 +49,64 @@ class SlotBuffer {
        private void incrementHead() {
                head++;
                if (head >= array.length)
-                       head=0;
+                       head = 0;
        }
 
        private void incrementTail() {
                tail++;
                if (tail >= array.length)
-                       tail=0;
+                       tail = 0;
        }
 
        void putSlot(Slot s) {
-               array[head]=s;
+
+               long checkNum = (getNewestSeqNum() + 1);
+
+               if (checkNum != s.getSequenceNumber()) {
+                       // We have a gap so expunge all our slots
+                       oldestseqn = s.getSequenceNumber();
+                       tail = 0;
+                       head = 1;
+                       array[0] = s;
+                       return;
+               }
+
+               array[head] = s;
                incrementHead();
 
-               if (oldestseqn==0)
+               if (oldestseqn == 0) {
                        oldestseqn = s.getSequenceNumber();
+               }
 
-               if (head==tail) {
+               if (head == tail) {
                        incrementTail();
                        oldestseqn++;
                }
        }
 
        Slot getSlot(long seqnum) {
-               int diff=(int) (seqnum-oldestseqn);
-               int index=diff + tail;
+               int diff = (int) (seqnum - oldestseqn);
+               int index = diff + tail;
+
+               if (index < 0) {
+                       // Really old message so we dont have it anymore
+                       return null;
+               }
+
                if (index >= array.length) {
-                       if (head >= tail)
+                       if (head >= tail) {
                                return null;
-                       index-= array.length;
+                       }
+                       index -= array.length;
                }
 
-               if (index >= array.length)
-                       return null;
+               if (index >= array.length) {
 
-               if (head >= tail && index >= head)
                        return null;
+               }
+               if (head >= tail && index >= head) {
+                       return null;
+               }
 
                return array[index];
        }
index cf21f48..b0b4c1a 100644 (file)
@@ -8,9 +8,12 @@ import java.util.Vector;
 import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * IoTTable data structure.  Provides client inferface.
@@ -36,24 +39,30 @@ final public class Table {
        private TableStatus lastTableStatus;
        static final int FREE_SLOTS = 10; //number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
-       private long liveslotcount = 0;
+       public long liveslotcount = 0;  // TODO:  MAKE PRIVATE
        private int chance;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
        static final int REJECTED_THRESHOLD = 5;
-       private int resizethreshold;
+       public int resizethreshold;     // TODO:  MAKE PRIVATE
        private long lastliveslotseqn;  //smallest sequence number with a live entry
        private Random random = new Random();
+       private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
        private List<Commit> commitList = null; // List of all the most recent live commits
+       private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
+
        private Set<Abort> abortSet = null; // Set of the live aborts
-       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       public  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV       TODO: Make Private
        private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
-       private List<Transaction> uncommittedTransactionsList = null; //
+       public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+       private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
        // private Set<Abort> arbitratorTable = null; // Table of arbitrators
+       private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -65,13 +74,7 @@ final public class Table {
                cloud = new CloudComm(this, baseurl, password);
                lastliveslotseqn = 1;
 
-               pendingTransQueue = new LinkedList<PendingTransaction>();
-               commitList = new LinkedList<Commit>();
-               abortSet = new HashSet<Abort>();
-               commitedTable = new HashMap<IoTString, KeyValue>();
-               speculativeTable = new HashMap<IoTString, KeyValue>();
-               uncommittedTransactionsList = new LinkedList<Transaction>();
-               arbitratorTable = new HashMap<IoTString, Long>();
+               setupDataStructs();
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
@@ -82,13 +85,19 @@ final public class Table {
                sequencenumber = 0;
                cloud = _cloud;
 
+               setupDataStructs();
+       }
+
+       private void setupDataStructs() {
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
                abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
                speculativeTable = new HashMap<IoTString, KeyValue>();
-               uncommittedTransactionsList = new LinkedList<Transaction>();
+               uncommittedTransactionsMap = new HashMap<Long, Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
+               newKeyTable = new HashMap<IoTString, NewKey>();
+               newCommitMap = new HashMap<Long, Commit> ();
        }
 
        public void rebuild() {
@@ -96,7 +105,45 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
+       // TODO: delete method
+       public void printSlots() {
+               long o = buffer.getOldestSeqNum();
+               long n = buffer.getNewestSeqNum();
 
+               int[] types = new int[10];
+
+               int num = 0;
+
+               int livec = 0;
+               int deadc = 0;
+               for (long i = o; i < (n + 1); i++) {
+                       Slot s = buffer.getSlot(i);
+
+                       Vector<Entry> entries = s.getEntries();
+
+                       for (Entry e : entries) {
+                               if (e.isLive()) {
+                                       int type = e.getType();
+                                       types[type] = types[type] + 1;
+                                       num++;
+                                       livec++;
+                               } else {
+                                       deadc++;
+                               }
+                       }
+               }
+
+               for (int i = 0; i < 10; i++) {
+                       System.out.println(i + "    " + types[i]);
+               }
+               System.out.println("Live count:   " + livec);
+               System.out.println("Dead count:   " + deadc);
+               System.out.println("Old:   " + o);
+               System.out.println("New:   " + n);
+               System.out.println("Size:   " + buffer.size());
+               System.out.println("Commits Map:   " + commitedTable.size());
+               System.out.println("Commits List:   " + commitList.size());
+       }
 
        public IoTString getCommitted(IoTString key) {
                KeyValue kv = commitedTable.get(key);
@@ -116,7 +163,6 @@ final public class Table {
                }
        }
 
-
        public void initTable() {
                cloud.setSalt();//Set the salt
                Slot s = new Slot(this, 1, localmachineid);
@@ -133,8 +179,6 @@ final public class Table {
        }
 
        public String toString() {
-
-
                String retString = " Committed Table: \n";
                retString += "---------------------------\n";
                retString += commitedTable.toString();
@@ -148,11 +192,6 @@ final public class Table {
                return retString;
        }
 
-
-
-
-
-
        public void startTransaction() {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
@@ -177,14 +216,16 @@ final public class Table {
 
        public void addKV(IoTString key, IoTString value) {
 
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
                // Make sure new key value pair matches the current arbitrator
                if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
-                       // TODO: Maybe not throw and error
-                       throw new Error("Not all Key Values match");
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match.");
                }
 
-
-
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
@@ -194,25 +235,16 @@ final public class Table {
        }
 
        public void update() {
+
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
                validateandupdate(newslots, false);
 
-               if (uncommittedTransactionsList.size() > 0) {
-                       List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
-
-                       for (Transaction ut : uncommittedTransactionsList) {
-                               KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
-                               long arb = arbitratorTable.get(kv.getKey());
-
-                               if (arb == localmachineid) {
-                                       uncommittedTransArb.add(ut);
-                               }
-                       }
-
+               if (uncommittedTransactionsMap.keySet().size() > 0) {
 
+                       boolean doEnd = false;
                        boolean needResize = false;
-                       while (uncommittedTransArb.size() > 0) {
+                       while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
                                boolean resize = needResize;
                                needResize = false;
 
@@ -228,156 +260,30 @@ final public class Table {
                                        s.addEntry(status);
                                }
 
-                               if (! rejectedmessagelist.isEmpty()) {
-                                       /* TODO: We should avoid generating a rejected message entry if
-                                        * there is already a sufficient entry in the queue (e.g.,
-                                        * equalsto value of true and same sequence number).  */
-
-                                       long old_seqn = rejectedmessagelist.firstElement();
-                                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
-                                               long new_seqn = rejectedmessagelist.lastElement();
-                                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
-                                               s.addEntry(rm);
-                                       } else {
-                                               long prev_seqn = -1;
-                                               int i = 0;
-                                               /* Go through list of missing messages */
-                                               for (; i < rejectedmessagelist.size(); i++) {
-                                                       long curr_seqn = rejectedmessagelist.get(i);
-                                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                                       if (s_msg != null)
-                                                               break;
-                                                       prev_seqn = curr_seqn;
-                                               }
-                                               /* Generate rejected message entry for missing messages */
-                                               if (prev_seqn != -1) {
-                                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
-                                                       s.addEntry(rm);
-                                               }
-                                               /* Generate rejected message entries for present messages */
-                                               for (; i < rejectedmessagelist.size(); i++) {
-                                                       long curr_seqn = rejectedmessagelist.get(i);
-                                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                                       long machineid = s_msg.getMachineID();
-                                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
-                                                       s.addEntry(rm);
-                                               }
-                                       }
-                               }
-
-                               long newestseqnum = buffer.getNewestSeqNum();
-                               long oldestseqnum = buffer.getOldestSeqNum();
-                               if (lastliveslotseqn < oldestseqnum) {
-                                       lastliveslotseqn = oldestseqnum;
-                               }
+                               doRejectedMessages(s);
 
-                               long seqn = lastliveslotseqn;
-                               boolean seenliveslot = false;
-                               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-                               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
-
-                               boolean tryAgain = false;
-
-                               // Mandatory Rescue
-                               for (; seqn < threshold; seqn++) {
-                                       Slot prevslot = buffer.getSlot(seqn);
-                                       //Push slot number forward
-                                       if (! seenliveslot)
-                                               lastliveslotseqn = seqn;
-
-                                       if (! prevslot.isLive())
-                                               continue;
-                                       seenliveslot = true;
-                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                                       for (Entry liveentry : liveentries) {
-                                               if (s.hasSpace(liveentry)) {
-                                                       s.addEntry(liveentry);
-                                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
-                                                       if (!resize) {
-                                                               System.out.print("B"); //?
-                                                               tryAgain = true;
-                                                               needResize = true;
-                                                       }
-                                               }
-                                       }
-                               }
+                               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
-                               if (tryAgain) {
+                               // Resize was needed so redo call
+                               if (retTup.getFirst()) {
+                                       needResize = true;
                                        continue;
                                }
 
-                               // Arbitrate
-                               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-                               for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
-                                       Transaction ut = i.next();
-
-                                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-                                       // Check if this machine arbitrates for this transaction
-                                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-                                               continue;
-                                       }
+                               // Extract working variables
+                               boolean seenliveslot = retTup.getSecond();
+                               long seqn = retTup.getThird();
 
-                                       Entry newEntry = null;
+                               // Did need to arbitrate
+                               doEnd = !doArbitration(s);
 
-                                       try {
-                                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                                       // Guard evaluated as true
-
-                                                       // update the local tmp current key set
-                                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                                       }
-
-                                                       // create the commit
-                                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-                                               } else {
-                                                       // Guard was false
-
-                                                       // create the abort
-                                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-                                               }
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                       }
-
-                                       if ((newEntry != null) && s.hasSpace(newEntry)) {
-                                               s.addEntry(newEntry);
-                                               i.remove();
-
-                                       } else {
-                                               break;
-                                       }
-                               }
-
-                               /* now go through live entries from least to greatest sequence number until
-                                * either all live slots added, or the slot doesn't have enough room
-                                * for SKIP_THRESHOLD consecutive entries*/
-                               int skipcount = 0;
-                               search:
-                               for (; seqn <= newestseqnum; seqn++) {
-                                       Slot prevslot = buffer.getSlot(seqn);
-                                       //Push slot number forward
-                                       if (!seenliveslot)
-                                               lastliveslotseqn = seqn;
-
-                                       if (!prevslot.isLive())
-                                               continue;
-                                       seenliveslot = true;
-                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                                       for (Entry liveentry : liveentries) {
-                                               if (s.hasSpace(liveentry))
-                                                       s.addEntry(liveentry);
-                                               else {
-                                                       skipcount++;
-                                                       if (skipcount > SKIP_THRESHOLD)
-                                                               break search;
-                                               }
-                                       }
-                               }
+                               doOptionalRescue(s, seenliveslot, seqn, resize);
 
                                int max = 0;
-                               if (resize)
+                               if (resize) {
                                        max = newsize;
+                               }
+
                                Slot[] array = cloud.putSlot(s, max);
                                if (array == null) {
                                        array = new Slot[] {s};
@@ -386,13 +292,12 @@ final public class Table {
                                        if (array.length == 0)
                                                throw new Error("Server Error: Did not send any slots");
                                        rejectedmessagelist.add(s.getSequenceNumber());
+                                       doEnd = false;
                                }
 
                                /* update data structure */
                                validateandupdate(array, true);
                        }
-
-
                }
        }
 
@@ -412,11 +317,11 @@ final public class Table {
                }
        }
 
-       void decrementLiveCount() {
+       public void decrementLiveCount() {
                liveslotcount--;
+               // System.out.println("Decrement Live Count");
        }
 
-
        private void setResizeThreshold() {
                int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
                resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
@@ -424,128 +329,39 @@ final public class Table {
 
        private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
                Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
                int newsize = 0;
                if (liveslotcount > resizethreshold) {
                        resize = true; //Resize is forced
+                       System.out.println("Live count resize: " + liveslotcount + "   " + resizethreshold);
+
                }
 
                if (resize) {
                        newsize = (int) (numslots * RESIZE_MULTIPLE);
-                       TableStatus status = new TableStatus(s, newsize);
-                       s.addEntry(status);
-               }
 
-               if (! rejectedmessagelist.isEmpty()) {
-                       /* TODO: We should avoid generating a rejected message entry if
-                        * there is already a sufficient entry in the queue (e.g.,
-                        * equalsto value of true and same sequence number).  */
+                       System.out.println("New Size:  " + newsize + "  old: " + buffer.oldestseqn); // TODO remove
 
-                       long old_seqn = rejectedmessagelist.firstElement();
-                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
-                               long new_seqn = rejectedmessagelist.lastElement();
-                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
-                               s.addEntry(rm);
-                       } else {
-                               long prev_seqn = -1;
-                               int i = 0;
-                               /* Go through list of missing messages */
-                               for (; i < rejectedmessagelist.size(); i++) {
-                                       long curr_seqn = rejectedmessagelist.get(i);
-                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                       if (s_msg != null)
-                                               break;
-                                       prev_seqn = curr_seqn;
-                               }
-                               /* Generate rejected message entry for missing messages */
-                               if (prev_seqn != -1) {
-                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
-                                       s.addEntry(rm);
-                               }
-                               /* Generate rejected message entries for present messages */
-                               for (; i < rejectedmessagelist.size(); i++) {
-                                       long curr_seqn = rejectedmessagelist.get(i);
-                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                       long machineid = s_msg.getMachineID();
-                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
-                                       s.addEntry(rm);
-                               }
-                       }
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
                }
 
-               long newestseqnum = buffer.getNewestSeqNum();
-               long oldestseqnum = buffer.getOldestSeqNum();
-               if (lastliveslotseqn < oldestseqnum)
-                       lastliveslotseqn = oldestseqnum;
+               doRejectedMessages(s);
 
-               long seqn = lastliveslotseqn;
-               boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
 
 
-               // Mandatory Rescue
-               for (; seqn < threshold; seqn++) {
-                       Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (! seenliveslot)
-                               lastliveslotseqn = seqn;
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
-                       if (! prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for (Entry liveentry : liveentries) {
-                               if (s.hasSpace(liveentry)) {
-                                       s.addEntry(liveentry);
-                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
-                                       if (!resize) {
-                                               System.out.print("B"); //?
-                                               return tryput(pendingTrans, true);
-                                       }
-                               }
-                       }
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(pendingTrans, true);
                }
 
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
 
-               // Arbitrate
-               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-               for (Transaction ut : uncommittedTransactionsList) {
-
-                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-                       // Check if this machine arbitrates for this transaction
-                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-                               continue;
-                       }
-
-                       Entry newEntry = null;
-
-                       try {
-                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                       // Guard evaluated as true
-
-                                       // update the local tmp current key set
-                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                       }
-
-                                       // create the commit
-                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-                               } else {
-                                       // Guard was false
-
-                                       // create the abort
-                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-                               }
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
-
-                       if ((newEntry != null) && s.hasSpace(newEntry)) {
-                               s.addEntry(newEntry);
-                       } else {
-                               break;
-                       }
-               }
+               doArbitration(s);
 
                Transaction trans = new Transaction(s,
                                                    s.getSequenceNumber(),
@@ -558,49 +374,13 @@ final public class Table {
                        insertedTrans = true;
                }
 
-               /* now go through live entries from least to greatest sequence number until
-                * either all live slots added, or the slot doesn't have enough room
-                * for SKIP_THRESHOLD consecutive entries*/
-               int skipcount = 0;
-               search:
-               for (; seqn <= newestseqnum; seqn++) {
-                       Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (!seenliveslot)
-                               lastliveslotseqn = seqn;
-
-                       if (!prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for (Entry liveentry : liveentries) {
-                               if (s.hasSpace(liveentry))
-                                       s.addEntry(liveentry);
-                               else {
-                                       skipcount++;
-                                       if (skipcount > SKIP_THRESHOLD)
-                                               break search;
-                               }
-                       }
-               }
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
 
-               int max = 0;
-               if (resize)
-                       max = newsize;
-               Slot[] array = cloud.putSlot(s, max);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       rejectedmessagelist.clear();
-               }       else {
-                       if (array.length == 0)
-                               throw new Error("Server Error: Did not send any slots");
-                       rejectedmessagelist.add(s.getSequenceNumber());
-                       insertedTrans = false;
+               if (insertedTrans) {
+                       // System.out.println("Inserted: " + trans.getSequenceNumber());
                }
 
-               /* update data structure */
-               validateandupdate(array, true);
-
                return insertedTrans;
        }
 
@@ -617,6 +397,34 @@ final public class Table {
                        s.addEntry(status);
                }
 
+               doRejectedMessages(s);
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(keyName, arbMachineid, true);
+               }
+
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
+
+
+               doArbitration(s);
+
+               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+               boolean insertedNewKey = false;
+               if (s.hasSpace(newKey)) {
+                       s.addEntry(newKey);
+                       insertedNewKey = true;
+               }
+
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+       }
+
+       private void doRejectedMessages(Slot s) {
                if (! rejectedmessagelist.isEmpty()) {
                        /* TODO: We should avoid generating a rejected message entry if
                         * there is already a sufficient entry in the queue (e.g.,
@@ -653,7 +461,9 @@ final public class Table {
                                }
                        }
                }
+       }
 
+       private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
                long newestseqnum = buffer.getNewestSeqNum();
                long oldestseqnum = buffer.getOldestSeqNum();
                if (lastliveslotseqn < oldestseqnum)
@@ -661,14 +471,14 @@ final public class Table {
 
                long seqn = lastliveslotseqn;
                boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+               long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
 
 
                // Mandatory Rescue
                for (; seqn < threshold; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
+                       // Push slot number forward
                        if (! seenliveslot)
                                lastliveslotseqn = seqn;
 
@@ -681,70 +491,81 @@ final public class Table {
                                        s.addEntry(liveentry);
                                } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
                                        if (!resize) {
-                                               System.out.print("B"); //?
-                                               return tryput(keyName, arbMachineid, true);
+                                               System.out.println("B"); //?
+
+                                               System.out.println("==============================NEEEEDDDD RESIZING");
+                                               return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
                                        }
                                }
                        }
                }
 
+               // Did not resize
+               return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
+       }
 
-               // // Arbitrate
-               // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-               // for (Transaction ut : uncommittedTransactionsList) {
+       private boolean doArbitration(Slot s) {
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
 
-               //      KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-               //      // Check if this machine arbitrates for this transaction
-               //      if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-               //              continue;
-               //      }
+               List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
 
-               //      Entry newEntry = null;
+               // Sort from oldest to newest
+               Collections.sort(transSeqNums);
 
-               //      try {
-               //              if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-               //                      // Guard evaluated as true
 
-               //                      // update the local tmp current key set
-               //                      for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-               //                              speculativeTableTmp.put(kv.getKey(), kv);
-               //                      }
+               boolean didNeedArbitration = false;
+               for (Long transNum : transSeqNums) {
+                       Transaction ut = uncommittedTransactionsMap.get(transNum);
 
-               //                      // create the commit
-               //                      newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-               //              } else {
-               //                      // Guard was false
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
 
-               //                      // create the abort
-               //                      newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-               //              }
-               //      } catch (Exception e) {
-               //              e.printStackTrace();
-               //      }
+                       // we did have something to arbitrate on
+                       didNeedArbitration = true;
 
-               //      if ((newEntry != null) && s.hasSpace(newEntry)) {
+                       Entry newEntry = null;
 
-               //              // TODO: Remove print
-               //              System.out.println("Arbitrating...");
-               //              s.addEntry(newEntry);
-               //      } else {
-               //              break;
-               //      }
-               // }
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
 
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
 
-               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
 
-               boolean insertedNewKey = false;
-               if (s.hasSpace(newKey)) {
-                       s.addEntry(newKey);
-                       insertedNewKey = true;
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
                }
 
+               return didNeedArbitration;
+       }
+
+       private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
                /* now go through live entries from least to greatest sequence number until
                 * either all live slots added, or the slot doesn't have enough room
                 * for SKIP_THRESHOLD consecutive entries*/
                int skipcount = 0;
+               long newestseqnum = buffer.getNewestSeqNum();
                search:
                for (; seqn <= newestseqnum; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
@@ -766,7 +587,9 @@ final public class Table {
                                }
                        }
                }
+       }
 
+       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
                int max = 0;
                if (resize)
                        max = newsize;
@@ -778,13 +601,13 @@ final public class Table {
                        if (array.length == 0)
                                throw new Error("Server Error: Did not send any slots");
                        rejectedmessagelist.add(s.getSequenceNumber());
-                       insertedNewKey = false;
+                       inserted = false;
                }
 
                /* update data structure */
                validateandupdate(array, true);
 
-               return insertedNewKey;
+               return inserted;
        }
 
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
@@ -808,6 +631,8 @@ final public class Table {
                        updateExpectedSize();
                }
 
+               proccessAllNewCommits();
+
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
 
@@ -831,10 +656,87 @@ final public class Table {
                createSpeculativeTable();
        }
 
+       public void proccessAllNewCommits() {
+
+               // Process only if there are commit
+               if (newCommitMap.keySet().size() == 0) {
+                       return;
+               }
+
+               List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
+
+               // Sort from oldest to newest commit
+               Collections.sort(commitSeqNums);
+
+               // Go through each new commit one by one
+               for (Long entrySeqNum : commitSeqNums) {
+                       Commit entry = newCommitMap.get(entrySeqNum);
+
+                       if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+
+                               // Remove any old commits
+                               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                                       Commit prevcommit = i.next();
+
+                                       if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
+                                               prevcommit.setDead();
+                                               i.remove();
+                                       }
+                               }
+                               commitList.add(entry);
+                               continue;
+                       }
+
+                       // Remove any old commits
+                       for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                               Commit prevcommit = i.next();
+                               prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                               if (!prevcommit.isLive()) {
+                                       i.remove();
+                               }
+                       }
+
+                       // Add the new commit
+                       commitList.add(entry);
+                       lastCommitSeenSeqNum = entry.getTransSequenceNumber();
+                       // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
+
+
+                       // Update the committed table list
+                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                               IoTString key = kv.getKey();
+                               commitedTable.put(key, kv);
+                       }
+
+                       long committedTransSeq = entry.getTransSequenceNumber();
+
+                       // Make dead the transactions
+                       for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                               Transaction prevtrans = i.next().getValue();
+
+                               if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+                                       i.remove();
+                                       prevtrans.setDead();
+                               }
+                       }
+               }
+
+
+               // Clear the new commits storage so we can use it later
+               newCommitMap.clear();
+       }
+
        private void createSpeculativeTable() {
                Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+               // Sort from oldest to newest commit
+               Collections.sort(utSeqNums);
 
-               for (Transaction trans : uncommittedTransactionsList) {
+
+               for (Long key : utSeqNums) {
+                       Transaction trans = uncommittedTransactionsMap.get(key);
 
                        try {
                                if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
@@ -877,17 +779,15 @@ final public class Table {
        }
 
        private void commitNewMaxSize() {
-               if (numslots != currmaxsize)
+               if (numslots != currmaxsize) {
+                       System.out.println("Resizing the buffer"); // TODO: Remove
                        buffer.resize(currmaxsize);
+               }
 
                numslots = currmaxsize;
                setResizeThreshold();
        }
 
-
-
-
-
        private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
@@ -929,15 +829,25 @@ final public class Table {
 
        private void processEntry(NewKey entry) {
                arbitratorTable.put(entry.getKey(), entry.getMachineID());
+
+               NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
+
+               if (oldNewKey != null) {
+                       oldNewKey.setDead();
+               }
        }
 
        private void processEntry(Transaction entry) {
-               uncommittedTransactionsList.add(entry);
+               Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+               // Duplicate so delete old copy
+               if (prevTrans != null) {
+                       prevTrans.setDead();
+               }
        }
 
        private void processEntry(Abort entry) {
 
-
                if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
                        // Abort has not been seen yet so we need to keep track of it
                        abortSet.add(entry);
@@ -946,50 +856,24 @@ final public class Table {
                        entry.setDead();
                }
 
-               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
-                       Transaction prevtrans = i.next();
-                       if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
-                               uncommittedTransactionsList.remove(prevtrans);
-                               prevtrans.setDead();
-                               return;
-                       }
-               }
-       }
-
-       private void processEntry(Commit entry) {
-
-               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
-                       Commit prevcommit = i.next();
-                       prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
-
-                       if (!prevcommit.isLive()) {
-                               //commitList.remove(prevcommit);
-                               i.remove();
-                       }
-               }
-
-               commitList.add(entry);
-
-               // Update the committed table list
-               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                       IoTString key = kv.getKey();
-                       commitedTable.put(key, kv);
-               }
-
-               long committedTransSeq = entry.getTransSequenceNumber();
-
                // Make dead the transactions
-               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
-                       Transaction prevtrans = i.next();
+               for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next().getValue();
 
-                       if (prevtrans.getSequenceNumber() <= committedTransSeq) {
-                               // uncommittedTransactionsList.remove(prevtrans);
+                       if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
                                i.remove();
                                prevtrans.setDead();
                        }
                }
        }
 
+       private void processEntry(Commit entry, Slot s) {
+               Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+               if (prevCommit != null) {
+                       prevCommit.setDead();
+               }
+       }
+
        private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
@@ -1079,7 +963,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeCommit:
-                               processEntry((Commit)entry);
+                               processEntry((Commit)entry, slot);
                                break;
 
                        case Entry.TypeAbort:
index 00d4c7c..24af2f8 100644 (file)
@@ -7,69 +7,408 @@ package iotcloud;
  */
 
 public class Test {
+
+       public static final  int NUMBER_OF_TESTS = 20000; //66
+
        public static void main(String[] args) {
                if (args[0].equals("2")) {
                        test2();
+               } else if (args[0].equals("3")) {
+                       test3();
+               } else if (args[0].equals("4")) {
+                       test4();
                }
        }
 
-       static void test2() {
+
+
+       static void test5() {
                Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
-               t1.initTable();
-               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
-               t2.update();
+               t1.rebuild();
+               System.out.println(t1);
 
+               // // Print the results
+               // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+               //      String a = "a" + i;
+               //      String b = "b" + i;
+               //      IoTString ia = new IoTString(a);
+               //      IoTString ib = new IoTString(b);
 
+               //      System.out.println(ib + " -> " + t1.getCommitted(ib));
+               //      System.out.println(ia + " -> " + t2.getCommitted(ia));
+               //      System.out.println();
+               // }
+       }
 
-               final int NUMBER_OF_TESTS = 200;
+       static Thread buildThreadTest4(String prefix, Table t) {
+               return new Thread() {
+                       public void run() {
+                               for (int i = 0; i < (NUMBER_OF_TESTS * 3); i++) {
 
-               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                                       int num = i % NUMBER_OF_TESTS;
+                                       String key = prefix + num;
+                                       String value = prefix + (num + 2000);
+                                       IoTString iKey = new IoTString(key);
+                                       IoTString iValue = new IoTString(value);
 
-                       System.out.println("Doing: " + i);
+                                       t.startTransaction();
+                                       t.addKV(iKey, iValue);
+                                       t.commitTransaction();
+                               }
+                       }
+               };
+       }
 
+       static void test4() {
+               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               t1.rebuild();
+               t2.rebuild();
+
+               Thread thr1 = buildThreadTest4("b", t1);
+               Thread thr2 = buildThreadTest4("a", t2);
+               thr1.start();
+               thr2.start();
+               try {
+                       thr1.join();
+                       thr2.join();
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+
+               t1.update();
+               t2.update();
+               // t1.update();
+
+               // Print the results
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
                        String a = "a" + i;
                        String b = "b" + i;
                        IoTString ia = new IoTString(a);
                        IoTString ib = new IoTString(b);
 
+                       System.out.println(ib + " -> " + t1.getCommitted(ib));
+                       System.out.println(ia + " -> " + t2.getCommitted(ia));
+                       System.out.println();
+               }
+       }
+
+       static void test3() {
+               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               t1.rebuild();
+               t2.rebuild();
+
 
-                       t1.createNewKey(ia, 351);
-                       t2.createNewKey(ib, 321);
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       String key = "a" + i;
+                       String value = "a" + (i + 1000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
 
                        t1.startTransaction();
-                       t1.addKV(ia, ia);
+                       t1.addKV(iKey, iValue);
                        t1.commitTransaction();
                }
 
                for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       String key = "b" + i;
+                       String value = "b" + (i + 1000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
 
-                       System.out.println("Doing: " + i);
+                       t2.startTransaction();
+                       t2.addKV(iKey, iValue);
+                       t2.commitTransaction();
+               }
+
+               // Make sure t1 sees the new updates from t2
+               t1.update();
 
+               // Print the results
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
                        String a = "a" + i;
                        String b = "b" + i;
                        IoTString ia = new IoTString(a);
                        IoTString ib = new IoTString(b);
 
-                       t2.startTransaction();
-                       t2.addKV(ib, ib);
-                       t2.commitTransaction();
+                       System.out.println(ib + " -> " + t1.getCommitted(ib));
+                       System.out.println(ia + " -> " + t2.getCommitted(ia));
+                       System.out.println();
                }
+       }
 
+       static void test2() {
+               Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               t1.initTable();
+               Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               t2.update();
 
 
-               t1.update();
-               // t2.update();
-               // t1.update();
-
+               // Make the Keys
+               System.out.println("Setting up keys");
                for (int i = 0; i < NUMBER_OF_TESTS; i++) {
                        String a = "a" + i;
                        String b = "b" + i;
                        IoTString ia = new IoTString(a);
                        IoTString ib = new IoTString(b);
+                       t1.createNewKey(ia, 321);
+                       t1.createNewKey(ib, 321);
+               }
 
-                       System.out.println(ib + " -> " + t1.getCommitted(ib));
-                       System.out.println(ia + " -> " + t2.getCommitted(ia));
-                       System.out.println();
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold);
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold);
+               // System.out.println();
+
+
+
+               // Do Updates for the keys
+               System.out.println("Writing Keys a...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       System.out.println(i);
+
+                       String key = "a" + i;
+                       String value = "a" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+
+                       t1.startTransaction();
+                       t1.addKV(iKey, iValue);
+                       t1.commitTransaction();
+               }
+
+               // Do Updates for the keys
+               System.out.println("Writing Keys a...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       System.out.println(i);
+
+                       String key = "a" + i;
+                       String value = "a" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+                       t1.startTransaction();
+                       t1.addKV(iKey, iValue);
+                       t1.commitTransaction();
                }
+
+
+               t2.update();
+               System.out.println("Writing Keys b...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+                       System.out.println(i);
+
+                       String key = "b" + i;
+                       String value = "b" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+
+                       t2.startTransaction();
+                       t2.addKV(iKey, iValue);
+                       t2.commitTransaction();
+               }
+
+
+               // Do Updates for the keys
+               System.out.println("Writing Keys a...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i += 2) {
+                       System.out.println(i);
+
+                       String key = "a" + i;
+                       String value = "a" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+                       t1.startTransaction();
+                       t1.addKV(iKey, iValue);
+                       t1.commitTransaction();
+               }
+
+
+               t1.update();
+               t2.update();
+
+               System.out.println("Checking a keys...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+                       String key = "a" + i;
+                       String value = "a" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+                       IoTString testVal = t1.getCommitted(iKey);
+
+                       if ((testVal == null) || (testVal.equals(iValue) == false)) {
+                               System.out.println("Key val incorrect: " + key);
+                       }
+
+                       key = "b" + i;
+                       value = "b" + (i + 10000);
+                       iKey = new IoTString(key);
+                       iValue = new IoTString(value);
+
+                       testVal = t1.getCommitted(iKey);
+
+                       if ((testVal == null) || (testVal.equals(iValue) == false)) {
+                               System.out.println("Key val incorrect: " + key);
+                       }
+               }
+
+               System.out.println("Checking b keys...");
+               for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+                       String key = "a" + i;
+                       String value = "a" + (i + 10000);
+                       IoTString iKey = new IoTString(key);
+                       IoTString iValue = new IoTString(value);
+
+                       IoTString testVal = t2.getCommitted(iKey);
+
+                       if ((testVal == null) || (testVal.equals(iValue) == false)) {
+                               System.out.println("Key val incorrect: " + key);
+                       }
+
+                       key = "b" + i;
+                       value = "b" + (i + 10000);
+                       iKey = new IoTString(key);
+                       iValue = new IoTString(value);
+
+                       testVal = t2.getCommitted(iKey);
+
+                       if ((testVal == null) || (testVal.equals(iValue) == false)) {
+                               System.out.println("Key val incorrect: " + key);
+                       }
+               }
+
+
+
+
+               System.out.println();
+               System.out.println();
+               System.out.println("Update");
+               // Make sure t1 sees the new updates from t2
+               t1.update();
+               t2.update();
+               t1.update();
+               System.out.println();
+               System.out.println();
+               System.out.println();
+               System.out.println();
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+               t2.update();
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+               System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+
+
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold + "   commits: " + t1.commitedTable.size() + "   uncommits: " + t1.uncommittedTransactionsList.size());
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold + "   commits: " + t2.commitedTable.size() + "   uncommits: " + t2.uncommittedTransactionsList.size() );
+               System.out.println();
+
+               t1.printSlots();
+               System.out.println();
+               System.out.println();
+               System.out.println();
+               System.out.println();
+               t2.printSlots();
+               System.out.println();
+
+
+
+               // // Do Updates for the keys
+               // System.out.println("Writing Keys a (actual)");
+               // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+               //      String key = "a" + i;
+               //      String value = "a" + i;
+               //      IoTString iKey = new IoTString(key);
+               //      IoTString iValue = new IoTString(value);
+
+               //      t1.startTransaction();
+               //      t1.addKV(iKey, iValue);
+               //      t1.commitTransaction();
+               // }
+
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold + "   commits: " + t1.commitedTable.size() + "   uncommits: " + t1.uncommittedTransactionsList.size());
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold + "   commits: " + t2.commitedTable.size() + "   uncommits: " + t2.uncommittedTransactionsList.size());
+               // System.out.println();
+
+
+
+               // System.out.println("Writing Keys b (actual)");
+               // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+               //      String key = "b" + i;
+               //      String value = "b" + i;
+               //      IoTString iKey = new IoTString(key);
+               //      IoTString iValue = new IoTString(value);
+
+               //      t2.startTransaction();
+               //      t2.addKV(iKey, iValue);
+               //      t2.commitTransaction();
+               // }
+
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold + "   commits: " + t1.commitedTable.size() + "   uncommits: " + t1.uncommittedTransactionsList.size() );
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold + "   commits: " + t2.commitedTable.size() + "   uncommits: " + t2.uncommittedTransactionsList.size() );
+               // System.out.println();
+
+
+               // // Do Updates for the keys
+               // System.out.println("Writing Keys a (actual)");
+               // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+               //      String key = "a" + i;
+               //      String value = "a" + i;
+               //      IoTString iKey = new IoTString(key);
+               //      IoTString iValue = new IoTString(value);
+
+               //      t1.startTransaction();
+               //      t1.addKV(iKey, iValue);
+               //      t1.commitTransaction();
+               // }
+
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold + "   commits: " + t1.commitedTable.size() + "   uncommits: " + t1.uncommittedTransactionsList.size());
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold + "   commits: " + t2.commitedTable.size() + "   uncommits: " + t2.uncommittedTransactionsList.size());
+               // System.out.println();
+
+
+
+               // System.out.println("Writing Keys b (actual)");
+               // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+               //      String key = "b" + i;
+               //      String value = "b" + i;
+               //      IoTString iKey = new IoTString(key);
+               //      IoTString iValue = new IoTString(value);
+
+               //      t2.startTransaction();
+               //      t2.addKV(iKey, iValue);
+               //      t2.commitTransaction();
+               // }
+
+               // System.out.println("=========t1 live" + t1.liveslotcount + "     thresh: " + t1.resizethreshold + "   commits: " + t1.commitedTable.size() + "   uncommits: " + t1.uncommittedTransactionsList.size() );
+               // System.out.println("=========t2 live" + t2.liveslotcount + "     thresh: " + t2.resizethreshold + "   commits: " + t2.commitedTable.size() + "   uncommits: " + t2.uncommittedTransactionsList.size() );
+               // System.out.println();
+
+               // t1.printSlots();
+               // System.out.println();
+               // t2.printSlots();
+
+
+
+               // Make sure t1 sees the new updates from t2
+               // t1.update();
+
+               // // Print the results
+               // for (int i = NUMBER_OF_TESTS - 10; i < NUMBER_OF_TESTS; i++) {
+               // String a = "a" + i;
+               // String b = "b" + i;
+               // IoTString ia = new IoTString(a);
+               //      IoTString ib = new IoTString(b);
+
+               //      System.out.println(ib + " -> " + t1.getCommitted(ib));
+               //      System.out.println(ia + " -> " + t2.getCommitted(ia));
+               //      System.out.println();
+               // }
        }
 }
diff --git a/version2/src/java/iotcloud/ThreeTuple.java b/version2/src/java/iotcloud/ThreeTuple.java
new file mode 100644 (file)
index 0000000..8a882a4
--- /dev/null
@@ -0,0 +1,29 @@
+package iotcloud;
+
+class ThreeTuple<A, B, C> {
+    private A a;
+    private B b;
+    private C c;
+
+    ThreeTuple(A a, B b, C c) {
+        this.a = a;
+        this.b = b;
+        this.c = c;
+    }
+
+    A getFirst() {
+        return a;
+    }
+
+    B getSecond() {
+        return b;
+    }
+
+    C getThird() {
+        return c;
+    }
+
+    public String toString() {
+        return "<" + a + "," + b + "," + c + ">";
+    }
+}