From: Ali Younis Date: Thu, 1 Dec 2016 07:08:32 +0000 (-0800) Subject: Fixes to tex doc; Code Updates X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=52f0181d22e9868bebf074148a775394eb9293a0 Fixes to tex doc; Code Updates --- diff --git a/doc2/iotcloud.tex b/doc2/iotcloud.tex index e3ae679..1d760d7 100644 --- a/doc2/iotcloud.tex +++ b/doc2/iotcloud.tex @@ -1499,12 +1499,12 @@ This rescue is not mandatory. This is trying to fill the remaining portion of t \EndIf\\ \If{$\lnot$\Call{EvaluateGuard}{$Guard_t, CurrKV$}} - \State $abort_{de} \gets $\Call{CreateAbort}{$seq_t, id_t$} + \State $abortde \gets $\Call{CreateAbort}{$seq_t, id_t$} \LeftComment{No more space so we cant arbitrate any further} - \If($\lnot$\Call{DeHasSpace}{$DE_a, abort_{de}$}) + \If($lnot$\Call{DeHasSpace}{$DE_a, abortde$}) \State \Return{$DE_a$} \EndIf - \State $DE_a \gets DE_a \cup abort_{de}$ + \State $DE_a \gets DE_a \cup abortde$ \Else \State $DKV \gets \{\tuple{k,v}| \tuple{k,v} \in KV \land \tuple{k',v'}\in KV_t \land k'=k\}$ \State $KVTmp \gets (KV \setminus DKV) \cup KV'$ @@ -1847,7 +1847,7 @@ Creates a new key and specifies which machine ID is the arbitrator. If there is \State \Return{$false$} \Comment{Key already created} \EndIf\\ - \State $success \gets$ \Call{TryInsertNewKey}{$k_a, id_a$} + \State $success \gets$ \Call{TryInsertNewKey}{$k_a, id_a, false$} \EndWhile \State \Return{$true$} \Comment{If got here then insertion was correct} diff --git a/src2/java/iotcloud/Commit.java b/src2/java/iotcloud/Commit.java index c533c36..1ee044d 100644 --- a/src2/java/iotcloud/Commit.java +++ b/src2/java/iotcloud/Commit.java @@ -15,7 +15,6 @@ import java.util.Iterator; class Commit extends Entry { private long seqnumtrans; private Set keyValueUpdateSet = null; - private Set liveValues = null; public Commit(Slot slot, long _seqnumtrans, Set _keyValueUpdateSet) { @@ -23,12 +22,10 @@ class Commit extends Entry { seqnumtrans = _seqnumtrans; keyValueUpdateSet = new HashSet(); - liveValues = new HashSet(); for (KeyValue kv : _keyValueUpdateSet) { KeyValue kvCopy = kv.getCopy(); keyValueUpdateSet.add(kvCopy); - liveValues.add(kvCopy); } } @@ -89,17 +86,17 @@ class Commit extends Entry { return; for (KeyValue kv1 : kvSet) { - for (Iterator i = liveValues.iterator(); i.hasNext();) { + for (Iterator i = keyValueUpdateSet.iterator(); i.hasNext();) { KeyValue kv2 = i.next(); if (kv1.getKey() == kv2.getKey()) { - liveValues.remove(kv2); + keyValueUpdateSet.remove(kv2); break; } } } - if (liveValues.size() == 0) + if (keyValueUpdateSet.size() == 0) this.setDead(); } } \ No newline at end of file diff --git a/src2/java/iotcloud/Entry.java b/src2/java/iotcloud/Entry.java index af99192..8734bb1 100644 --- a/src2/java/iotcloud/Entry.java +++ b/src2/java/iotcloud/Entry.java @@ -68,6 +68,11 @@ abstract class Entry implements Liveness { */ public void setDead() { + + if (!islive ) { + return; // already dead + } + islive = false; parentslot.decrementLiveCount(); } diff --git a/src2/java/iotcloud/PendingTransaction.java b/src2/java/iotcloud/PendingTransaction.java index 5253a94..3d39e12 100644 --- a/src2/java/iotcloud/PendingTransaction.java +++ b/src2/java/iotcloud/PendingTransaction.java @@ -14,6 +14,7 @@ class PendingTransaction { private Set keyValueUpdateSet; private Guard guard; + private long arbitrator = -1; public PendingTransaction() { keyValueUpdateSet = new HashSet(); @@ -40,6 +41,17 @@ class PendingTransaction { keyValueUpdateSet.add(newKV); } + public boolean checkArbitrator(long arb) { + if (arbitrator == -1) { + arbitrator = arb; + return true; + } + + return arb == arbitrator; + } + + + /** * Get the key value update set * diff --git a/src2/java/iotcloud/Table.java b/src2/java/iotcloud/Table.java index 732867f..1cdb3d5 100644 --- a/src2/java/iotcloud/Table.java +++ b/src2/java/iotcloud/Table.java @@ -9,6 +9,8 @@ import java.util.Random; import java.util.Queue; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.Collection; /** * IoTTable data structure. Provides client inferface. @@ -20,7 +22,7 @@ final public class Table { private int numslots; //number of slots stored in buffer //table of key-value pairs - private HashMap table = new HashMap(); + //private HashMap table = new HashMap(); // machine id -> (sequence number, Slot or LastMessage); records last message by each client private HashMap > lastmessagetable = new HashMap >(); @@ -46,10 +48,12 @@ final public class Table { private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building private Queue pendingTransQueue = null; // Queue of pending transactions private List commitList = null; // List of all the most recent live commits + private Set abortSet = null; // Set of the live aborts private Map commitedTable = null; // Table of committed KV + private Map speculativeTable = null; // Table of speculative KV private List uncommittedTransactionsList = null; // private Map arbitratorTable = null; // Table of arbitrators - private Set arbitratorTable = null; // Table of arbitrators + // private Set arbitratorTable = null; // Table of arbitrators public Table(String baseurl, String password, long _localmachineid) { @@ -63,7 +67,9 @@ final public class Table { pendingTransQueue = new LinkedList(); commitList = new LinkedList(); + abortSet = new HashSet(); commitedTable = new HashMap(); + speculativeTable = new HashMap(); uncommittedTransactionsList = new LinkedList(); arbitratorTable = new HashMap(); } @@ -78,7 +84,9 @@ final public class Table { pendingTransQueue = new LinkedList(); commitList = new LinkedList(); + abortSet = new HashSet(); commitedTable = new HashMap(); + speculativeTable = new HashMap(); uncommittedTransactionsList = new LinkedList(); arbitratorTable = new HashMap(); } @@ -88,20 +96,27 @@ final public class Table { validateandupdate(newslots, true); } - public void update() { - Slot[] newslots = cloud.getSlots(sequencenumber + 1); - validateandupdate(newslots, false); + + public IoTString getCommitted(IoTString key) { + KeyValue kv = commitedTable.get(key); + if (kv != null) { + return kv.getValue(); + } else { + return null; + } } - public IoTString get(IoTString key) { - KeyValue kv = table.get(key); - if (kv != null) + public IoTString getSpeculative(IoTString key) { + KeyValue kv = speculativeTable.get(key); + if (kv != null) { return kv.getValue(); - else + } else { return null; + } } + public void initTable() { cloud.setSalt();//Set the salt Slot s = new Slot(this, 1, localmachineid); @@ -118,9 +133,26 @@ final public class Table { } public String toString() { - return table.toString(); + + + String retString = " Committed Table: \n"; + retString += "---------------------------\n"; + retString += commitedTable.toString(); + + retString += "\n\n"; + + retString += " Speculative Table: \n"; + retString += "---------------------------\n"; + retString += speculativeTable.toString(); + + return retString; } + + + + + public void startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransBuild = new PendingTransaction(); @@ -128,6 +160,11 @@ final public class Table { public void commitTransaction() { + if (pendingTransBuild.getKVUpdates().size() == 0) { + // If no updates are made then there is no point inserting into the chain + return; + } + // Add the pending transaction to the queue pendingTransQueue.add(pendingTransBuild); @@ -139,15 +176,48 @@ final public class Table { } public void addKV(IoTString key, IoTString value) { + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) { + // TODO: Maybe not throw and error + throw new Error("Not all Key Values match"); + } + + + KeyValue kv = new KeyValue(key, value); pendingTransBuild.addKV(kv); } + + // TODo: FIx Guard public void addGuard(IoTString key, IoTString value) { KeyValue kv = new KeyValue(key, value); pendingTransBuild.addKV(kv); } + public void update() { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); + + validateandupdate(newslots, false); + } + + public boolean createNewKey(IoTString keyName, long machineId) { + + while (true) { + if (arbitratorTable.get(keyName) != null) { + // There is already an arbitrator + return false; + } + + if (tryput(keyName, machineId, false)) { + + // If successfully inserted + return true; + } + } + } + void decrementLiveCount() { liveslotcount--; } @@ -218,6 +288,8 @@ final public class Table { long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point + + // Mandatory Rescue for (; seqn < threshold; seqn++) { Slot prevslot = buffer.getSlot(seqn); //Push slot number forward @@ -241,6 +313,46 @@ final public class Table { } + // Arbitrate + Map speculativeTableTmp = new HashMap(commitedTable); + for (Transaction ut : uncommittedTransactionsList) { + + KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; + // Check if this machine arbitrates for this transaction + if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { + continue; + } + + Entry newEntry = null; + + try { + if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + // Guard evaluated as true + + // update the local tmp current key set + for (KeyValue kv : ut.getkeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } + + // create the commit + newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); + } else { + // Guard was false + + // create the abort + newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + if ((newEntry != null) && s.hasSpace(newEntry)) { + s.addEntry(newEntry); + } else { + break; + } + } + Transaction trans = new Transaction(s, s.getSequenceNumber(), localmachineid, @@ -298,14 +410,195 @@ final public class Table { return insertedTrans; } + private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) { + Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; + if (liveslotcount > resizethreshold) { + resize = true; //Resize is forced + } + + if (resize) { + newsize = (int) (numslots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); + } + + if (! rejectedmessagelist.isEmpty()) { + /* TODO: We should avoid generating a rejected message entry if + * there is already a sufficient entry in the queue (e.g., + * equalsto value of true and same sequence number). */ + + long old_seqn = rejectedmessagelist.firstElement(); + if (rejectedmessagelist.size() > REJECTED_THRESHOLD) { + long new_seqn = rejectedmessagelist.lastElement(); + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false); + s.addEntry(rm); + } else { + long prev_seqn = -1; + int i = 0; + /* Go through list of missing messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + if (s_msg != null) + break; + prev_seqn = curr_seqn; + } + /* Generate rejected message entry for missing messages */ + if (prev_seqn != -1) { + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false); + s.addEntry(rm); + } + /* Generate rejected message entries for present messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + long machineid = s_msg.getMachineID(); + RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); + s.addEntry(rm); + } + } + } + + long newestseqnum = buffer.getNewestSeqNum(); + long oldestseqnum = buffer.getOldestSeqNum(); + if (lastliveslotseqn < oldestseqnum) + lastliveslotseqn = oldestseqnum; + + long seqn = lastliveslotseqn; + boolean seenliveslot = false; + long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full + long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point + + + // Mandatory Rescue + for (; seqn < threshold; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (! seenliveslot) + lastliveslotseqn = seqn; + + if (! prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) { + s.addEntry(liveentry); + } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue + if (!resize) { + System.out.print("B"); //? + return tryput(keyName, arbMachineid, true); + } + } + } + } + + + // Arbitrate + Map speculativeTableTmp = new HashMap(commitedTable); + for (Transaction ut : uncommittedTransactionsList) { + + KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; + // Check if this machine arbitrates for this transaction + if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { + continue; + } + + Entry newEntry = null; + + try { + if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + // Guard evaluated as true + + // update the local tmp current key set + for (KeyValue kv : ut.getkeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } + + // create the commit + newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); + } else { + // Guard was false + + // create the abort + newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + if ((newEntry != null) && s.hasSpace(newEntry)) { + s.addEntry(newEntry); + } else { + break; + } + } + + + NewKey newKey = new NewKey(s, keyName, arbMachineid); + + boolean insertedNewKey = false; + if (s.hasSpace(newKey)) { + s.addEntry(newKey); + insertedNewKey = true; + } + + /* now go through live entries from least to greatest sequence number until + * either all live slots added, or the slot doesn't have enough room + * for SKIP_THRESHOLD consecutive entries*/ + int skipcount = 0; + search: + for (; seqn <= newestseqnum; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (!seenliveslot) + lastliveslotseqn = seqn; + + if (!prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else { + skipcount++; + if (skipcount > SKIP_THRESHOLD) + break search; + } + } + } + + int max = 0; + if (resize) + max = newsize; + Slot[] array = cloud.putSlot(s, max); + if (array == null) { + array = new Slot[] {s}; + rejectedmessagelist.clear(); + } else { + if (array.length == 0) + throw new Error("Server Error: Did not send any slots"); + rejectedmessagelist.add(s.getSequenceNumber()); + insertedNewKey = false; + } + + /* update data structure */ + validateandupdate(array, true); + + return insertedNewKey; + } + private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) { /* The cloud communication layer has checked slot HMACs already before decoding */ if (newslots.length == 0) return; long firstseqnum = newslots[0].getSequenceNumber(); - if (firstseqnum <= sequencenumber) + if (firstseqnum <= sequencenumber) { throw new Error("Server Error: Sent older slots!"); + } SlotIndexer indexer = new SlotIndexer(newslots, buffer); checkHMACChain(indexer, newslots); @@ -320,9 +613,12 @@ final public class Table { /* If there is a gap, check to see if the server sent us everything. */ if (firstseqnum != (sequencenumber + 1)) { + + // TODO: Check size checkNumSlots(newslots.length); - if (!machineSet.isEmpty()) + if (!machineSet.isEmpty()) { throw new Error("Missing record for machines: " + machineSet); + } } commitNewMaxSize(); @@ -333,13 +629,37 @@ final public class Table { liveslotcount++; } sequencenumber = newslots[newslots.length - 1].getSequenceNumber(); + + // Speculate on key value pairs + createSpeculativeTable(); + } + + private void createSpeculativeTable() { + Map speculativeTableTmp = new HashMap(commitedTable); + + for (Transaction trans : uncommittedTransactionsList) { + + try { + if (trans.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + for (KeyValue kv : trans.getkeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + speculativeTable = speculativeTableTmp; } private int expectedsize, currmaxsize; private void checkNumSlots(int numslots) { - if (numslots != expectedsize) + if (numslots != expectedsize) { throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots); + } } private void initExpectedSize(long firstsequencenumber) { @@ -350,8 +670,9 @@ final public class Table { private void updateExpectedSize() { expectedsize++; - if (expectedsize > currmaxsize) + if (expectedsize > currmaxsize) { expectedsize = currmaxsize; + } } private void updateCurrMaxSize(int newmaxsize) { @@ -366,6 +687,10 @@ final public class Table { setResizeThreshold(); } + + + + private void processEntry(LastMessage entry, HashSet machineSet) { updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } @@ -414,6 +739,16 @@ final public class Table { } private void processEntry(Abort entry) { + + + if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) { + // Abort has not been seen yet so we need to keep track of it + abortSet.add(entry); + } else { + // The machine already saw this so it is dead + entry.setDead(); + } + for (Iterator i = uncommittedTransactionsList.iterator(); i.hasNext();) { Transaction prevtrans = i.next(); if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) { @@ -456,13 +791,6 @@ final public class Table { } } - private void addWatchList(long machineid, RejectedMessage entry) { - HashSet entries = watchlist.get(machineid); - if (entries == null) - watchlist.put(machineid, entries = new HashSet()); - entries.add(entry); - } - private void processEntry(TableStatus entry) { int newnumslots = entry.getMaxSlots(); updateCurrMaxSize(newnumslots); @@ -471,6 +799,14 @@ final public class Table { lastTableStatus = entry; } + + private void addWatchList(long machineid, RejectedMessage entry) { + HashSet entries = watchlist.get(machineid); + if (entries == null) + watchlist.put(machineid, entries = new HashSet()); + entries.add(entry); + } + private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet machineSet) { machineSet.remove(machineid); @@ -498,6 +834,16 @@ final public class Table { } } + // Set dead the abort + for (Iterator ait = abortSet.iterator(); ait.hasNext(); ) { + Abort abort = ait.next(); + + if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) { + abort.setDead(); + ait.remove(); + } + } + Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); if (lastmsgentry == null) diff --git a/src2/java/iotcloud/Transaction.java b/src2/java/iotcloud/Transaction.java index d25403e..18b8721 100644 --- a/src2/java/iotcloud/Transaction.java +++ b/src2/java/iotcloud/Transaction.java @@ -36,6 +36,9 @@ class Transaction extends Entry { return keyValueUpdateSet; } + public Guard getGuard() { + return guard; + } public byte getType() { return Entry.TypeTransaction;