From d9248eb59faf043e544deb7592f8de63b51e7878 Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Sun, 25 Dec 2016 01:33:22 -0800 Subject: [PATCH] Commits working, Transactions Working, Arbitrations working, Still needs a lot of testing --- version2/src/java/iotcloud/Commit.java | 10 +- version2/src/java/iotcloud/Slot.java | 77 ++- version2/src/java/iotcloud/SlotBuffer.java | 60 +- version2/src/java/iotcloud/Table.java | 716 +++++++++------------ version2/src/java/iotcloud/Test.java | 381 ++++++++++- version2/src/java/iotcloud/ThreeTuple.java | 29 + 6 files changed, 780 insertions(+), 493 deletions(-) create mode 100644 version2/src/java/iotcloud/ThreeTuple.java diff --git a/version2/src/java/iotcloud/Commit.java b/version2/src/java/iotcloud/Commit.java index 1ee044d..86650c9 100644 --- a/version2/src/java/iotcloud/Commit.java +++ b/version2/src/java/iotcloud/Commit.java @@ -77,6 +77,7 @@ class Commit extends Entry { } public Entry getCopy(Slot s) { + // System.out.println("Commit Rescued: " + this); // TODO remove return new Commit(s, seqnumtrans, keyValueUpdateSet); } @@ -89,14 +90,17 @@ class Commit extends Entry { for (Iterator i = keyValueUpdateSet.iterator(); i.hasNext();) { KeyValue kv2 = i.next(); - if (kv1.getKey() == kv2.getKey()) { - keyValueUpdateSet.remove(kv2); + if (kv1.getKey().equals(kv2.getKey())) { + // keyValueUpdateSet.remove(kv2); + i.remove(); break; } } } - if (keyValueUpdateSet.size() == 0) + if (keyValueUpdateSet.size() == 0) { + // System.out.println("Killed Commit: " + this); // TODO remove this.setDead(); + } } } \ No newline at end of file diff --git a/version2/src/java/iotcloud/Slot.java b/version2/src/java/iotcloud/Slot.java index 5828fd3..2ceb142 100644 --- a/version2/src/java/iotcloud/Slot.java +++ b/version2/src/java/iotcloud/Slot.java @@ -12,9 +12,9 @@ import java.util.Arrays; class Slot implements Liveness { /** Sets the slot size. */ - static final int SLOT_SIZE=2048; + static final int SLOT_SIZE = 2048; /** Sets the size for the HMAC. */ - static final int HMAC_SIZE=32; + static final int HMAC_SIZE = 32; /** Sequence number of the slot. */ private long seqnum; @@ -37,15 +37,15 @@ class Slot implements Liveness { private Table table; Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) { - seqnum=_seqnum; - machineid=_machineid; - prevhmac=_prevhmac; - hmac=_hmac; - entries=new Vector(); - livecount=1; - seqnumlive=true; + seqnum = _seqnum; + machineid = _machineid; + prevhmac = _prevhmac; + hmac = _hmac; + entries = new Vector(); + livecount = 1; + seqnumlive = true; freespace = SLOT_SIZE - getBaseSize(); - table=_table; + table = _table; } Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) { @@ -64,11 +64,18 @@ class Slot implements Liveness { return prevhmac; } - void addEntry(Entry e) { - e=e.getCopy(this); + Entry addEntry(Entry e) { + e = e.getCopy(this); entries.add(e); livecount++; freespace -= e.getSize(); + return e; + } + + void removeEntry(Entry e) { + entries.remove(e); + livecount--; + freespace += e.getSize(); } private void addShallowEntry(Entry e) { @@ -91,23 +98,23 @@ class Slot implements Liveness { } static Slot decode(Table table, byte[] array, Mac mac) { - mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE); - byte[] realmac=mac.doFinal(); + mac.update(array, HMAC_SIZE, array.length - HMAC_SIZE); + byte[] realmac = mac.doFinal(); - ByteBuffer bb=ByteBuffer.wrap(array); - byte[] hmac=new byte[HMAC_SIZE]; - byte[] prevhmac=new byte[HMAC_SIZE]; + ByteBuffer bb = ByteBuffer.wrap(array); + byte[] hmac = new byte[HMAC_SIZE]; + byte[] prevhmac = new byte[HMAC_SIZE]; bb.get(hmac); bb.get(prevhmac); if (!Arrays.equals(realmac, hmac)) throw new Error("Server Error: Invalid HMAC! Potential Attack!"); - long seqnum=bb.getLong(); - long machineid=bb.getLong(); - int numentries=bb.getInt(); - Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac); + long seqnum = bb.getLong(); + long machineid = bb.getLong(); + int numentries = bb.getInt(); + Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac); - for(int i=0; i getLiveEntries(boolean resize) { - Vector liveEntries=new Vector(); - for(Entry entry: entries) { + Vector liveEntries = new Vector(); + for (Entry entry : entries) { if (entry.isLive()) { if (!resize || entry.getType() != Entry.TypeTableStatus) liveEntries.add(entry); } } - + if (seqnumlive && !resize) liveEntries.add(new LastMessage(this, machineid, seqnum)); @@ -186,7 +193,7 @@ class Slot implements Liveness { */ void setDead() { - seqnumlive=false; + seqnumlive = false; decrementLiveCount(); } @@ -196,8 +203,10 @@ class Slot implements Liveness { void decrementLiveCount() { livecount--; - if (livecount==0) + if (livecount == 0) { + // System.out.println("Slot Set Dead"); // TODO: remove table.decrementLiveCount(); + } } /** @@ -209,6 +218,6 @@ class Slot implements Liveness { } public String toString() { - return "<"+getSequenceNumber()+">"; + return "<" + getSequenceNumber() + ">"; } } diff --git a/version2/src/java/iotcloud/SlotBuffer.java b/version2/src/java/iotcloud/SlotBuffer.java index 14bc926..ba6c3de 100644 --- a/version2/src/java/iotcloud/SlotBuffer.java +++ b/version2/src/java/iotcloud/SlotBuffer.java @@ -12,12 +12,12 @@ class SlotBuffer { private Slot[] array; private int head; private int tail; - private long oldestseqn; + public long oldestseqn; SlotBuffer() { - array=new Slot[DEFAULT_SIZE+1]; - head=tail=0; - oldestseqn=0; + array = new Slot[DEFAULT_SIZE + 1]; + head = tail = 0; + oldestseqn = 0; } int size() { @@ -31,12 +31,12 @@ class SlotBuffer { } void resize(int newsize) { - if (newsize == (array.length-1)) + if (newsize == (array.length - 1)) return; - Slot[] newarray = new Slot[newsize+1]; + Slot[] newarray = new Slot[newsize + 1]; int currsize = size(); int index = tail; - for(int i=0; i < currsize; i++) { + for (int i = 0; i < currsize; i++) { newarray[i] = array[index]; if ((++index) == array.length) index = 0; @@ -49,42 +49,64 @@ class SlotBuffer { private void incrementHead() { head++; if (head >= array.length) - head=0; + head = 0; } private void incrementTail() { tail++; if (tail >= array.length) - tail=0; + tail = 0; } void putSlot(Slot s) { - array[head]=s; + + long checkNum = (getNewestSeqNum() + 1); + + if (checkNum != s.getSequenceNumber()) { + // We have a gap so expunge all our slots + oldestseqn = s.getSequenceNumber(); + tail = 0; + head = 1; + array[0] = s; + return; + } + + array[head] = s; incrementHead(); - if (oldestseqn==0) + if (oldestseqn == 0) { oldestseqn = s.getSequenceNumber(); + } - if (head==tail) { + if (head == tail) { incrementTail(); oldestseqn++; } } Slot getSlot(long seqnum) { - int diff=(int) (seqnum-oldestseqn); - int index=diff + tail; + int diff = (int) (seqnum - oldestseqn); + int index = diff + tail; + + if (index < 0) { + // Really old message so we dont have it anymore + return null; + } + if (index >= array.length) { - if (head >= tail) + if (head >= tail) { return null; - index-= array.length; + } + index -= array.length; } - if (index >= array.length) - return null; + if (index >= array.length) { - if (head >= tail && index >= head) return null; + } + if (head >= tail && index >= head) { + return null; + } return array[index]; } diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index cf21f48..b0b4c1a 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -8,9 +8,12 @@ import java.util.Vector; import java.util.Random; import java.util.Queue; import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.Collection; +import java.util.Collections; + /** * IoTTable data structure. Provides client inferface. @@ -36,24 +39,30 @@ final public class Table { private TableStatus lastTableStatus; static final int FREE_SLOTS = 10; //number of slots that should be kept free static final int SKIP_THRESHOLD = 10; - private long liveslotcount = 0; + public long liveslotcount = 0; // TODO: MAKE PRIVATE private int chance; static final double RESIZE_MULTIPLE = 1.2; static final double RESIZE_THRESHOLD = 0.75; static final int REJECTED_THRESHOLD = 5; - private int resizethreshold; + public int resizethreshold; // TODO: MAKE PRIVATE private long lastliveslotseqn; //smallest sequence number with a live entry private Random random = new Random(); + private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building private Queue pendingTransQueue = null; // Queue of pending transactions private List commitList = null; // List of all the most recent live commits + private List commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers + private Set abortSet = null; // Set of the live aborts - private Map commitedTable = null; // Table of committed KV + public Map commitedTable = null; // Table of committed KV TODO: Make Private private Map speculativeTable = null; // Table of speculative KV - private List uncommittedTransactionsList = null; // + public Map uncommittedTransactionsMap = null; // TODO: make private private Map arbitratorTable = null; // Table of arbitrators + private Map newKeyTable = null; // Table of speculative KV // private Set arbitratorTable = null; // Table of arbitrators + private Map newCommitMap = null; // Map of all the new commits + public Table(String baseurl, String password, long _localmachineid) { @@ -65,13 +74,7 @@ final public class Table { cloud = new CloudComm(this, baseurl, password); lastliveslotseqn = 1; - pendingTransQueue = new LinkedList(); - commitList = new LinkedList(); - abortSet = new HashSet(); - commitedTable = new HashMap(); - speculativeTable = new HashMap(); - uncommittedTransactionsList = new LinkedList(); - arbitratorTable = new HashMap(); + setupDataStructs(); } public Table(CloudComm _cloud, long _localmachineid) { @@ -82,13 +85,19 @@ final public class Table { sequencenumber = 0; cloud = _cloud; + setupDataStructs(); + } + + private void setupDataStructs() { pendingTransQueue = new LinkedList(); commitList = new LinkedList(); abortSet = new HashSet(); commitedTable = new HashMap(); speculativeTable = new HashMap(); - uncommittedTransactionsList = new LinkedList(); + uncommittedTransactionsMap = new HashMap(); arbitratorTable = new HashMap(); + newKeyTable = new HashMap(); + newCommitMap = new HashMap (); } public void rebuild() { @@ -96,7 +105,45 @@ final public class Table { validateandupdate(newslots, true); } + // TODO: delete method + public void printSlots() { + long o = buffer.getOldestSeqNum(); + long n = buffer.getNewestSeqNum(); + int[] types = new int[10]; + + int num = 0; + + int livec = 0; + int deadc = 0; + for (long i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); + + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; + } + } + } + + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + System.out.println("Commits Map: " + commitedTable.size()); + System.out.println("Commits List: " + commitList.size()); + } public IoTString getCommitted(IoTString key) { KeyValue kv = commitedTable.get(key); @@ -116,7 +163,6 @@ final public class Table { } } - public void initTable() { cloud.setSalt();//Set the salt Slot s = new Slot(this, 1, localmachineid); @@ -133,8 +179,6 @@ final public class Table { } public String toString() { - - String retString = " Committed Table: \n"; retString += "---------------------------\n"; retString += commitedTable.toString(); @@ -148,11 +192,6 @@ final public class Table { return retString; } - - - - - public void startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransBuild = new PendingTransaction(); @@ -177,14 +216,16 @@ final public class Table { public void addKV(IoTString key, IoTString value) { + if (arbitratorTable.get(key) == null) { + throw new Error("Key not Found."); + } + // Make sure new key value pair matches the current arbitrator if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw and error - throw new Error("Not all Key Values match"); + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match."); } - - KeyValue kv = new KeyValue(key, value); pendingTransBuild.addKV(kv); } @@ -194,25 +235,16 @@ final public class Table { } public void update() { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); validateandupdate(newslots, false); - if (uncommittedTransactionsList.size() > 0) { - List uncommittedTransArb = new LinkedList(); - - for (Transaction ut : uncommittedTransactionsList) { - KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]); - long arb = arbitratorTable.get(kv.getKey()); - - if (arb == localmachineid) { - uncommittedTransArb.add(ut); - } - } - + if (uncommittedTransactionsMap.keySet().size() > 0) { + boolean doEnd = false; boolean needResize = false; - while (uncommittedTransArb.size() > 0) { + while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) { boolean resize = needResize; needResize = false; @@ -228,156 +260,30 @@ final public class Table { s.addEntry(status); } - if (! rejectedmessagelist.isEmpty()) { - /* TODO: We should avoid generating a rejected message entry if - * there is already a sufficient entry in the queue (e.g., - * equalsto value of true and same sequence number). */ - - long old_seqn = rejectedmessagelist.firstElement(); - if (rejectedmessagelist.size() > REJECTED_THRESHOLD) { - long new_seqn = rejectedmessagelist.lastElement(); - RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false); - s.addEntry(rm); - } else { - long prev_seqn = -1; - int i = 0; - /* Go through list of missing messages */ - for (; i < rejectedmessagelist.size(); i++) { - long curr_seqn = rejectedmessagelist.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - if (s_msg != null) - break; - prev_seqn = curr_seqn; - } - /* Generate rejected message entry for missing messages */ - if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false); - s.addEntry(rm); - } - /* Generate rejected message entries for present messages */ - for (; i < rejectedmessagelist.size(); i++) { - long curr_seqn = rejectedmessagelist.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - long machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); - s.addEntry(rm); - } - } - } - - long newestseqnum = buffer.getNewestSeqNum(); - long oldestseqnum = buffer.getOldestSeqNum(); - if (lastliveslotseqn < oldestseqnum) { - lastliveslotseqn = oldestseqnum; - } + doRejectedMessages(s); - long seqn = lastliveslotseqn; - boolean seenliveslot = false; - long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full - long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point - - boolean tryAgain = false; - - // Mandatory Rescue - for (; seqn < threshold; seqn++) { - Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward - if (! seenliveslot) - lastliveslotseqn = seqn; - - if (! prevslot.isLive()) - continue; - seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) { - s.addEntry(liveentry); - } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue - if (!resize) { - System.out.print("B"); //? - tryAgain = true; - needResize = true; - } - } - } - } + ThreeTuple retTup = doMandatoryResuce(s, resize); - if (tryAgain) { + // Resize was needed so redo call + if (retTup.getFirst()) { + needResize = true; continue; } - // Arbitrate - Map speculativeTableTmp = new HashMap(commitedTable); - for (Iterator i = uncommittedTransArb.iterator(); i.hasNext();) { - Transaction ut = i.next(); - - KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; - // Check if this machine arbitrates for this transaction - if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { - continue; - } + // Extract working variables + boolean seenliveslot = retTup.getSecond(); + long seqn = retTup.getThird(); - Entry newEntry = null; + // Did need to arbitrate + doEnd = !doArbitration(s); - try { - if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { - // Guard evaluated as true - - // update the local tmp current key set - for (KeyValue kv : ut.getkeyValueUpdateSet()) { - speculativeTableTmp.put(kv.getKey(), kv); - } - - // create the commit - newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); - } else { - // Guard was false - - // create the abort - newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); - } - } catch (Exception e) { - e.printStackTrace(); - } - - if ((newEntry != null) && s.hasSpace(newEntry)) { - s.addEntry(newEntry); - i.remove(); - - } else { - break; - } - } - - /* now go through live entries from least to greatest sequence number until - * either all live slots added, or the slot doesn't have enough room - * for SKIP_THRESHOLD consecutive entries*/ - int skipcount = 0; - search: - for (; seqn <= newestseqnum; seqn++) { - Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward - if (!seenliveslot) - lastliveslotseqn = seqn; - - if (!prevslot.isLive()) - continue; - seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); - else { - skipcount++; - if (skipcount > SKIP_THRESHOLD) - break search; - } - } - } + doOptionalRescue(s, seenliveslot, seqn, resize); int max = 0; - if (resize) + if (resize) { max = newsize; + } + Slot[] array = cloud.putSlot(s, max); if (array == null) { array = new Slot[] {s}; @@ -386,13 +292,12 @@ final public class Table { if (array.length == 0) throw new Error("Server Error: Did not send any slots"); rejectedmessagelist.add(s.getSequenceNumber()); + doEnd = false; } /* update data structure */ validateandupdate(array, true); } - - } } @@ -412,11 +317,11 @@ final public class Table { } } - void decrementLiveCount() { + public void decrementLiveCount() { liveslotcount--; + // System.out.println("Decrement Live Count"); } - private void setResizeThreshold() { int resize_lower = (int) (RESIZE_THRESHOLD * numslots); resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower); @@ -424,128 +329,39 @@ final public class Table { private boolean tryput(PendingTransaction pendingTrans, boolean resize) { Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; if (liveslotcount > resizethreshold) { resize = true; //Resize is forced + System.out.println("Live count resize: " + liveslotcount + " " + resizethreshold); + } if (resize) { newsize = (int) (numslots * RESIZE_MULTIPLE); - TableStatus status = new TableStatus(s, newsize); - s.addEntry(status); - } - if (! rejectedmessagelist.isEmpty()) { - /* TODO: We should avoid generating a rejected message entry if - * there is already a sufficient entry in the queue (e.g., - * equalsto value of true and same sequence number). */ + System.out.println("New Size: " + newsize + " old: " + buffer.oldestseqn); // TODO remove - long old_seqn = rejectedmessagelist.firstElement(); - if (rejectedmessagelist.size() > REJECTED_THRESHOLD) { - long new_seqn = rejectedmessagelist.lastElement(); - RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false); - s.addEntry(rm); - } else { - long prev_seqn = -1; - int i = 0; - /* Go through list of missing messages */ - for (; i < rejectedmessagelist.size(); i++) { - long curr_seqn = rejectedmessagelist.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - if (s_msg != null) - break; - prev_seqn = curr_seqn; - } - /* Generate rejected message entry for missing messages */ - if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false); - s.addEntry(rm); - } - /* Generate rejected message entries for present messages */ - for (; i < rejectedmessagelist.size(); i++) { - long curr_seqn = rejectedmessagelist.get(i); - Slot s_msg = buffer.getSlot(curr_seqn); - long machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); - s.addEntry(rm); - } - } + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); } - long newestseqnum = buffer.getNewestSeqNum(); - long oldestseqnum = buffer.getOldestSeqNum(); - if (lastliveslotseqn < oldestseqnum) - lastliveslotseqn = oldestseqnum; + doRejectedMessages(s); - long seqn = lastliveslotseqn; - boolean seenliveslot = false; - long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full - long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point - // Mandatory Rescue - for (; seqn < threshold; seqn++) { - Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward - if (! seenliveslot) - lastliveslotseqn = seqn; + ThreeTuple retTup = doMandatoryResuce(s, resize); - if (! prevslot.isLive()) - continue; - seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) { - s.addEntry(liveentry); - } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue - if (!resize) { - System.out.print("B"); //? - return tryput(pendingTrans, true); - } - } - } + // Resize was needed so redo call + if (retTup.getFirst()) { + return tryput(pendingTrans, true); } + // Extract working variables + boolean seenliveslot = retTup.getSecond(); + long seqn = retTup.getThird(); - // Arbitrate - Map speculativeTableTmp = new HashMap(commitedTable); - for (Transaction ut : uncommittedTransactionsList) { - - KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; - // Check if this machine arbitrates for this transaction - if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { - continue; - } - - Entry newEntry = null; - - try { - if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { - // Guard evaluated as true - - // update the local tmp current key set - for (KeyValue kv : ut.getkeyValueUpdateSet()) { - speculativeTableTmp.put(kv.getKey(), kv); - } - - // create the commit - newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); - } else { - // Guard was false - - // create the abort - newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); - } - } catch (Exception e) { - e.printStackTrace(); - } - - if ((newEntry != null) && s.hasSpace(newEntry)) { - s.addEntry(newEntry); - } else { - break; - } - } + doArbitration(s); Transaction trans = new Transaction(s, s.getSequenceNumber(), @@ -558,49 +374,13 @@ final public class Table { insertedTrans = true; } - /* now go through live entries from least to greatest sequence number until - * either all live slots added, or the slot doesn't have enough room - * for SKIP_THRESHOLD consecutive entries*/ - int skipcount = 0; - search: - for (; seqn <= newestseqnum; seqn++) { - Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward - if (!seenliveslot) - lastliveslotseqn = seqn; - - if (!prevslot.isLive()) - continue; - seenliveslot = true; - Vector liveentries = prevslot.getLiveEntries(resize); - for (Entry liveentry : liveentries) { - if (s.hasSpace(liveentry)) - s.addEntry(liveentry); - else { - skipcount++; - if (skipcount > SKIP_THRESHOLD) - break search; - } - } - } + doOptionalRescue(s, seenliveslot, seqn, resize); + insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize); - int max = 0; - if (resize) - max = newsize; - Slot[] array = cloud.putSlot(s, max); - if (array == null) { - array = new Slot[] {s}; - rejectedmessagelist.clear(); - } else { - if (array.length == 0) - throw new Error("Server Error: Did not send any slots"); - rejectedmessagelist.add(s.getSequenceNumber()); - insertedTrans = false; + if (insertedTrans) { + // System.out.println("Inserted: " + trans.getSequenceNumber()); } - /* update data structure */ - validateandupdate(array, true); - return insertedTrans; } @@ -617,6 +397,34 @@ final public class Table { s.addEntry(status); } + doRejectedMessages(s); + ThreeTuple retTup = doMandatoryResuce(s, resize); + + // Resize was needed so redo call + if (retTup.getFirst()) { + return tryput(keyName, arbMachineid, true); + } + + // Extract working variables + boolean seenliveslot = retTup.getSecond(); + long seqn = retTup.getThird(); + + + doArbitration(s); + + NewKey newKey = new NewKey(s, keyName, arbMachineid); + + boolean insertedNewKey = false; + if (s.hasSpace(newKey)) { + s.addEntry(newKey); + insertedNewKey = true; + } + + doOptionalRescue(s, seenliveslot, seqn, resize); + return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize); + } + + private void doRejectedMessages(Slot s) { if (! rejectedmessagelist.isEmpty()) { /* TODO: We should avoid generating a rejected message entry if * there is already a sufficient entry in the queue (e.g., @@ -653,7 +461,9 @@ final public class Table { } } } + } + private ThreeTuple doMandatoryResuce(Slot s, boolean resize) { long newestseqnum = buffer.getNewestSeqNum(); long oldestseqnum = buffer.getOldestSeqNum(); if (lastliveslotseqn < oldestseqnum) @@ -661,14 +471,14 @@ final public class Table { long seqn = lastliveslotseqn; boolean seenliveslot = false; - long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full - long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point + long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full + long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point // Mandatory Rescue for (; seqn < threshold; seqn++) { Slot prevslot = buffer.getSlot(seqn); - //Push slot number forward + // Push slot number forward if (! seenliveslot) lastliveslotseqn = seqn; @@ -681,70 +491,81 @@ final public class Table { s.addEntry(liveentry); } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue if (!resize) { - System.out.print("B"); //? - return tryput(keyName, arbMachineid, true); + System.out.println("B"); //? + + System.out.println("==============================NEEEEDDDD RESIZING"); + return new ThreeTuple(true, seenliveslot, seqn); } } } } + // Did not resize + return new ThreeTuple(false, seenliveslot, seqn); + } - // // Arbitrate - // Map speculativeTableTmp = new HashMap(commitedTable); - // for (Transaction ut : uncommittedTransactionsList) { + private boolean doArbitration(Slot s) { + // Arbitrate + Map speculativeTableTmp = new HashMap(commitedTable); - // KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; - // // Check if this machine arbitrates for this transaction - // if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { - // continue; - // } + List transSeqNums = new ArrayList(uncommittedTransactionsMap.keySet()); - // Entry newEntry = null; + // Sort from oldest to newest + Collections.sort(transSeqNums); - // try { - // if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { - // // Guard evaluated as true - // // update the local tmp current key set - // for (KeyValue kv : ut.getkeyValueUpdateSet()) { - // speculativeTableTmp.put(kv.getKey(), kv); - // } + boolean didNeedArbitration = false; + for (Long transNum : transSeqNums) { + Transaction ut = uncommittedTransactionsMap.get(transNum); - // // create the commit - // newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); - // } else { - // // Guard was false + KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; + // Check if this machine arbitrates for this transaction + if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { + continue; + } - // // create the abort - // newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); - // } - // } catch (Exception e) { - // e.printStackTrace(); - // } + // we did have something to arbitrate on + didNeedArbitration = true; - // if ((newEntry != null) && s.hasSpace(newEntry)) { + Entry newEntry = null; - // // TODO: Remove print - // System.out.println("Arbitrating..."); - // s.addEntry(newEntry); - // } else { - // break; - // } - // } + try { + if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + // Guard evaluated as true + // update the local tmp current key set + for (KeyValue kv : ut.getkeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } - NewKey newKey = new NewKey(s, keyName, arbMachineid); + // create the commit + newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); + } else { + // Guard was false - boolean insertedNewKey = false; - if (s.hasSpace(newKey)) { - s.addEntry(newKey); - insertedNewKey = true; + // create the abort + newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + if ((newEntry != null) && s.hasSpace(newEntry)) { + s.addEntry(newEntry); + } else { + break; + } } + return didNeedArbitration; + } + + private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) { /* now go through live entries from least to greatest sequence number until * either all live slots added, or the slot doesn't have enough room * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; + long newestseqnum = buffer.getNewestSeqNum(); search: for (; seqn <= newestseqnum; seqn++) { Slot prevslot = buffer.getSlot(seqn); @@ -766,7 +587,9 @@ final public class Table { } } } + } + private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) { int max = 0; if (resize) max = newsize; @@ -778,13 +601,13 @@ final public class Table { if (array.length == 0) throw new Error("Server Error: Did not send any slots"); rejectedmessagelist.add(s.getSequenceNumber()); - insertedNewKey = false; + inserted = false; } /* update data structure */ validateandupdate(array, true); - return insertedNewKey; + return inserted; } private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) { @@ -808,6 +631,8 @@ final public class Table { updateExpectedSize(); } + proccessAllNewCommits(); + /* If there is a gap, check to see if the server sent us everything. */ if (firstseqnum != (sequencenumber + 1)) { @@ -831,10 +656,87 @@ final public class Table { createSpeculativeTable(); } + public void proccessAllNewCommits() { + + // Process only if there are commit + if (newCommitMap.keySet().size() == 0) { + return; + } + + List commitSeqNums = new ArrayList(newCommitMap.keySet()); + + // Sort from oldest to newest commit + Collections.sort(commitSeqNums); + + // Go through each new commit one by one + for (Long entrySeqNum : commitSeqNums) { + Commit entry = newCommitMap.get(entrySeqNum); + + if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) { + + // Remove any old commits + for (Iterator i = commitList.iterator(); i.hasNext();) { + Commit prevcommit = i.next(); + + if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) { + prevcommit.setDead(); + i.remove(); + } + } + commitList.add(entry); + continue; + } + + // Remove any old commits + for (Iterator i = commitList.iterator(); i.hasNext();) { + Commit prevcommit = i.next(); + prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet()); + + if (!prevcommit.isLive()) { + i.remove(); + } + } + + // Add the new commit + commitList.add(entry); + lastCommitSeenSeqNum = entry.getTransSequenceNumber(); + // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum); + + + // Update the committed table list + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + IoTString key = kv.getKey(); + commitedTable.put(key, kv); + } + + long committedTransSeq = entry.getTransSequenceNumber(); + + // Make dead the transactions + for (Iterator> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) { + Transaction prevtrans = i.next().getValue(); + + if (prevtrans.getSequenceNumber() <= committedTransSeq) { + i.remove(); + prevtrans.setDead(); + } + } + } + + + // Clear the new commits storage so we can use it later + newCommitMap.clear(); + } + private void createSpeculativeTable() { Map speculativeTableTmp = new HashMap(commitedTable); + List utSeqNums = new ArrayList(uncommittedTransactionsMap.keySet()); + + // Sort from oldest to newest commit + Collections.sort(utSeqNums); - for (Transaction trans : uncommittedTransactionsList) { + + for (Long key : utSeqNums) { + Transaction trans = uncommittedTransactionsMap.get(key); try { if (trans.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { @@ -877,17 +779,15 @@ final public class Table { } private void commitNewMaxSize() { - if (numslots != currmaxsize) + if (numslots != currmaxsize) { + System.out.println("Resizing the buffer"); // TODO: Remove buffer.resize(currmaxsize); + } numslots = currmaxsize; setResizeThreshold(); } - - - - private void processEntry(LastMessage entry, HashSet machineSet) { updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } @@ -929,15 +829,25 @@ final public class Table { private void processEntry(NewKey entry) { arbitratorTable.put(entry.getKey(), entry.getMachineID()); + + NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry); + + if (oldNewKey != null) { + oldNewKey.setDead(); + } } private void processEntry(Transaction entry) { - uncommittedTransactionsList.add(entry); + Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry); + + // Duplicate so delete old copy + if (prevTrans != null) { + prevTrans.setDead(); + } } private void processEntry(Abort entry) { - if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) { // Abort has not been seen yet so we need to keep track of it abortSet.add(entry); @@ -946,50 +856,24 @@ final public class Table { entry.setDead(); } - for (Iterator i = uncommittedTransactionsList.iterator(); i.hasNext();) { - Transaction prevtrans = i.next(); - if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) { - uncommittedTransactionsList.remove(prevtrans); - prevtrans.setDead(); - return; - } - } - } - - private void processEntry(Commit entry) { - - for (Iterator i = commitList.iterator(); i.hasNext();) { - Commit prevcommit = i.next(); - prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet()); - - if (!prevcommit.isLive()) { - //commitList.remove(prevcommit); - i.remove(); - } - } - - commitList.add(entry); - - // Update the committed table list - for (KeyValue kv : entry.getkeyValueUpdateSet()) { - IoTString key = kv.getKey(); - commitedTable.put(key, kv); - } - - long committedTransSeq = entry.getTransSequenceNumber(); - // Make dead the transactions - for (Iterator i = uncommittedTransactionsList.iterator(); i.hasNext();) { - Transaction prevtrans = i.next(); + for (Iterator> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) { + Transaction prevtrans = i.next().getValue(); - if (prevtrans.getSequenceNumber() <= committedTransSeq) { - // uncommittedTransactionsList.remove(prevtrans); + if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) { i.remove(); prevtrans.setDead(); } } } + private void processEntry(Commit entry, Slot s) { + Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry); + if (prevCommit != null) { + prevCommit.setDead(); + } + } + private void processEntry(TableStatus entry) { int newnumslots = entry.getMaxSlots(); updateCurrMaxSize(newnumslots); @@ -1079,7 +963,7 @@ final public class Table { break; case Entry.TypeCommit: - processEntry((Commit)entry); + processEntry((Commit)entry, slot); break; case Entry.TypeAbort: diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index 00d4c7c..24af2f8 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -7,69 +7,408 @@ package iotcloud; */ public class Test { + + public static final int NUMBER_OF_TESTS = 20000; //66 + public static void main(String[] args) { if (args[0].equals("2")) { test2(); + } else if (args[0].equals("3")) { + test3(); + } else if (args[0].equals("4")) { + test4(); } } - static void test2() { + + + static void test5() { Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - t2.update(); + t1.rebuild(); + System.out.println(t1); + // // Print the results + // for (int i = 0; i < NUMBER_OF_TESTS; i++) { + // String a = "a" + i; + // String b = "b" + i; + // IoTString ia = new IoTString(a); + // IoTString ib = new IoTString(b); + // System.out.println(ib + " -> " + t1.getCommitted(ib)); + // System.out.println(ia + " -> " + t2.getCommitted(ia)); + // System.out.println(); + // } + } - final int NUMBER_OF_TESTS = 200; + static Thread buildThreadTest4(String prefix, Table t) { + return new Thread() { + public void run() { + for (int i = 0; i < (NUMBER_OF_TESTS * 3); i++) { - for (int i = 0; i < NUMBER_OF_TESTS; i++) { + int num = i % NUMBER_OF_TESTS; + String key = prefix + num; + String value = prefix + (num + 2000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); - System.out.println("Doing: " + i); + t.startTransaction(); + t.addKV(iKey, iValue); + t.commitTransaction(); + } + } + }; + } + static void test4() { + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + t1.rebuild(); + t2.rebuild(); + + Thread thr1 = buildThreadTest4("b", t1); + Thread thr2 = buildThreadTest4("a", t2); + thr1.start(); + thr2.start(); + try { + thr1.join(); + thr2.join(); + } catch (Exception e) { + e.printStackTrace(); + } + + t1.update(); + t2.update(); + // t1.update(); + + // Print the results + for (int i = 0; i < NUMBER_OF_TESTS; i++) { String a = "a" + i; String b = "b" + i; IoTString ia = new IoTString(a); IoTString ib = new IoTString(b); + System.out.println(ib + " -> " + t1.getCommitted(ib)); + System.out.println(ia + " -> " + t2.getCommitted(ia)); + System.out.println(); + } + } + + static void test3() { + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + t1.rebuild(); + t2.rebuild(); + - t1.createNewKey(ia, 351); - t2.createNewKey(ib, 321); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + String key = "a" + i; + String value = "a" + (i + 1000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); t1.startTransaction(); - t1.addKV(ia, ia); + t1.addKV(iKey, iValue); t1.commitTransaction(); } for (int i = 0; i < NUMBER_OF_TESTS; i++) { + String key = "b" + i; + String value = "b" + (i + 1000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); - System.out.println("Doing: " + i); + t2.startTransaction(); + t2.addKV(iKey, iValue); + t2.commitTransaction(); + } + + // Make sure t1 sees the new updates from t2 + t1.update(); + // Print the results + for (int i = 0; i < NUMBER_OF_TESTS; i++) { String a = "a" + i; String b = "b" + i; IoTString ia = new IoTString(a); IoTString ib = new IoTString(b); - t2.startTransaction(); - t2.addKV(ib, ib); - t2.commitTransaction(); + System.out.println(ib + " -> " + t1.getCommitted(ib)); + System.out.println(ia + " -> " + t2.getCommitted(ia)); + System.out.println(); } + } + static void test2() { + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + t1.initTable(); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + t2.update(); - t1.update(); - // t2.update(); - // t1.update(); - + // Make the Keys + System.out.println("Setting up keys"); for (int i = 0; i < NUMBER_OF_TESTS; i++) { String a = "a" + i; String b = "b" + i; IoTString ia = new IoTString(a); IoTString ib = new IoTString(b); + t1.createNewKey(ia, 321); + t1.createNewKey(ib, 321); + } - System.out.println(ib + " -> " + t1.getCommitted(ib)); - System.out.println(ia + " -> " + t2.getCommitted(ia)); - System.out.println(); + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold); + // System.out.println(); + + + + // Do Updates for the keys + System.out.println("Writing Keys a..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String key = "a" + i; + String value = "a" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + + t1.startTransaction(); + t1.addKV(iKey, iValue); + t1.commitTransaction(); + } + + // Do Updates for the keys + System.out.println("Writing Keys a..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String key = "a" + i; + String value = "a" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + t1.startTransaction(); + t1.addKV(iKey, iValue); + t1.commitTransaction(); } + + + t2.update(); + System.out.println("Writing Keys b..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String key = "b" + i; + String value = "b" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + + t2.startTransaction(); + t2.addKV(iKey, iValue); + t2.commitTransaction(); + } + + + // Do Updates for the keys + System.out.println("Writing Keys a..."); + for (int i = 0; i < NUMBER_OF_TESTS; i += 2) { + System.out.println(i); + + String key = "a" + i; + String value = "a" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + t1.startTransaction(); + t1.addKV(iKey, iValue); + t1.commitTransaction(); + } + + + t1.update(); + t2.update(); + + System.out.println("Checking a keys..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + String key = "a" + i; + String value = "a" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + IoTString testVal = t1.getCommitted(iKey); + + if ((testVal == null) || (testVal.equals(iValue) == false)) { + System.out.println("Key val incorrect: " + key); + } + + key = "b" + i; + value = "b" + (i + 10000); + iKey = new IoTString(key); + iValue = new IoTString(value); + + testVal = t1.getCommitted(iKey); + + if ((testVal == null) || (testVal.equals(iValue) == false)) { + System.out.println("Key val incorrect: " + key); + } + } + + System.out.println("Checking b keys..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + String key = "a" + i; + String value = "a" + (i + 10000); + IoTString iKey = new IoTString(key); + IoTString iValue = new IoTString(value); + + IoTString testVal = t2.getCommitted(iKey); + + if ((testVal == null) || (testVal.equals(iValue) == false)) { + System.out.println("Key val incorrect: " + key); + } + + key = "b" + i; + value = "b" + (i + 10000); + iKey = new IoTString(key); + iValue = new IoTString(value); + + testVal = t2.getCommitted(iKey); + + if ((testVal == null) || (testVal.equals(iValue) == false)) { + System.out.println("Key val incorrect: " + key); + } + } + + + + + System.out.println(); + System.out.println(); + System.out.println("Update"); + // Make sure t1 sees the new updates from t2 + t1.update(); + t2.update(); + t1.update(); + System.out.println(); + System.out.println(); + System.out.println(); + System.out.println(); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + t2.update(); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-"); + + + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size()); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() ); + System.out.println(); + + t1.printSlots(); + System.out.println(); + System.out.println(); + System.out.println(); + System.out.println(); + t2.printSlots(); + System.out.println(); + + + + // // Do Updates for the keys + // System.out.println("Writing Keys a (actual)"); + // for (int i = 0; i < NUMBER_OF_TESTS; i++) { + // String key = "a" + i; + // String value = "a" + i; + // IoTString iKey = new IoTString(key); + // IoTString iValue = new IoTString(value); + + // t1.startTransaction(); + // t1.addKV(iKey, iValue); + // t1.commitTransaction(); + // } + + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size()); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size()); + // System.out.println(); + + + + // System.out.println("Writing Keys b (actual)"); + // for (int i = 0; i < NUMBER_OF_TESTS; i++) { + // String key = "b" + i; + // String value = "b" + i; + // IoTString iKey = new IoTString(key); + // IoTString iValue = new IoTString(value); + + // t2.startTransaction(); + // t2.addKV(iKey, iValue); + // t2.commitTransaction(); + // } + + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size() ); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() ); + // System.out.println(); + + + // // Do Updates for the keys + // System.out.println("Writing Keys a (actual)"); + // for (int i = 0; i < NUMBER_OF_TESTS; i++) { + // String key = "a" + i; + // String value = "a" + i; + // IoTString iKey = new IoTString(key); + // IoTString iValue = new IoTString(value); + + // t1.startTransaction(); + // t1.addKV(iKey, iValue); + // t1.commitTransaction(); + // } + + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size()); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size()); + // System.out.println(); + + + + // System.out.println("Writing Keys b (actual)"); + // for (int i = 0; i < NUMBER_OF_TESTS; i++) { + // String key = "b" + i; + // String value = "b" + i; + // IoTString iKey = new IoTString(key); + // IoTString iValue = new IoTString(value); + + // t2.startTransaction(); + // t2.addKV(iKey, iValue); + // t2.commitTransaction(); + // } + + // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size() ); + // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() ); + // System.out.println(); + + // t1.printSlots(); + // System.out.println(); + // t2.printSlots(); + + + + // Make sure t1 sees the new updates from t2 + // t1.update(); + + // // Print the results + // for (int i = NUMBER_OF_TESTS - 10; i < NUMBER_OF_TESTS; i++) { + // String a = "a" + i; + // String b = "b" + i; + // IoTString ia = new IoTString(a); + // IoTString ib = new IoTString(b); + + // System.out.println(ib + " -> " + t1.getCommitted(ib)); + // System.out.println(ia + " -> " + t2.getCommitted(ia)); + // System.out.println(); + // } } } diff --git a/version2/src/java/iotcloud/ThreeTuple.java b/version2/src/java/iotcloud/ThreeTuple.java new file mode 100644 index 0000000..8a882a4 --- /dev/null +++ b/version2/src/java/iotcloud/ThreeTuple.java @@ -0,0 +1,29 @@ +package iotcloud; + +class ThreeTuple { + private A a; + private B b; + private C c; + + ThreeTuple(A a, B b, C c) { + this.a = a; + this.b = b; + this.c = c; + } + + A getFirst() { + return a; + } + + B getSecond() { + return b; + } + + C getThird() { + return c; + } + + public String toString() { + return "<" + a + "," + b + "," + c + ">"; + } +} -- 2.34.1