From: Ali Younis Date: Tue, 22 Nov 2016 23:52:01 +0000 (-0800) Subject: Tex typo issues, Code X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=d6c71259e9c673b829709edb16974f1fe896cb2d Tex typo issues, Code --- diff --git a/doc2/iotcloud.tex b/doc2/iotcloud.tex index 3161202..e3ae679 100644 --- a/doc2/iotcloud.tex +++ b/doc2/iotcloud.tex @@ -1499,12 +1499,12 @@ This rescue is not mandatory. This is trying to fill the remaining portion of t \EndIf\\ \If{$\lnot$\Call{EvaluateGuard}{$Guard_t, CurrKV$}} - \State $abortde \gets $\Call{CreateAbort}{$seq_t, id_t$} + \State $abort_{de} \gets $\Call{CreateAbort}{$seq_t, id_t$} \LeftComment{No more space so we cant arbitrate any further} - \If($lnot$\Call{DeHasSpace}{$DE_a, abortde$}) + \If($\lnot$\Call{DeHasSpace}{$DE_a, abort_{de}$}) \State \Return{$DE_a$} \EndIf - \State $DE_a \gets DE_a \cup abortde$ + \State $DE_a \gets DE_a \cup abort_{de}$ \Else \State $DKV \gets \{\tuple{k,v}| \tuple{k,v} \in KV \land \tuple{k',v'}\in KV_t \land k'=k\}$ \State $KVTmp \gets (KV \setminus DKV) \cup KV'$ diff --git a/src2/java/iotcloud/Abort.java b/src2/java/iotcloud/Abort.java index 8ec1871..c2b57b7 100644 --- a/src2/java/iotcloud/Abort.java +++ b/src2/java/iotcloud/Abort.java @@ -10,12 +10,12 @@ import java.nio.ByteBuffer; class Abort extends Entry { - private long seqnum; + private long seqnumtrans; private long machineid; - Abort(Slot slot, long _seqnum, long _machineid) { + Abort(Slot slot, long _seqnumtrans, long _machineid) { super(slot); - seqnum=_seqnum; + seqnumtrans=_seqnumtrans; machineid=_machineid; } @@ -23,19 +23,19 @@ class Abort extends Entry { return machineid; } - long getSequenceNumber() { - return seqnum; + long getTransSequenceNumber() { + return seqnumtrans; } static Entry decode(Slot slot, ByteBuffer bb) { - long seqnum=bb.getLong(); + long seqnumtrans=bb.getLong(); long machineid=bb.getLong(); - return new Abort(slot, seqnum, machineid); + return new Abort(slot, seqnumtrans, machineid); } void encode(ByteBuffer bb) { bb.put(Entry.TypeAbort); - bb.putLong(seqnum); + bb.putLong(seqnumtrans); bb.putLong(machineid); } @@ -48,6 +48,6 @@ class Abort extends Entry { } Entry getCopy(Slot s) { - return new Abort(s, seqnum, machineid); + return new Abort(s, seqnumtrans, machineid); } } \ No newline at end of file diff --git a/src2/java/iotcloud/Commit.java b/src2/java/iotcloud/Commit.java index 05834f0..c533c36 100644 --- a/src2/java/iotcloud/Commit.java +++ b/src2/java/iotcloud/Commit.java @@ -3,54 +3,103 @@ package iotcloud; import java.nio.ByteBuffer; import java.util.Set; import java.util.HashSet; +import java.util.Iterator; /** - * This Entry records the abort sent by a given machine. + * This Entry records the commit of a transaction. * @author Ali Younis * @version 1.0 */ class Commit extends Entry { - private long seqnum; - private Set keyValueUpdateSet; + private long seqnumtrans; + private Set keyValueUpdateSet = null; + private Set liveValues = null; - public Commit(Slot slot, long _seqnum, long _machineid) { + public Commit(Slot slot, long _seqnumtrans, Set _keyValueUpdateSet) { super(slot); - seqnum=_seqnum; - machineid=_machineid; + seqnumtrans = _seqnumtrans; + + keyValueUpdateSet = new HashSet(); + liveValues = new HashSet(); + + for (KeyValue kv : _keyValueUpdateSet) { + KeyValue kvCopy = kv.getCopy(); + keyValueUpdateSet.add(kvCopy); + liveValues.add(kvCopy); + } } - public long getSequenceNumber() { - return seqnum; + public long getTransSequenceNumber() { + return seqnumtrans; } + public Set getkeyValueUpdateSet() { + return keyValueUpdateSet; + } + + public byte getType() { + return Entry.TypeCommit; + } + public int getSize() { + int size = Long.BYTES + Byte.BYTES; // seq id, entry type + size += Integer.BYTES; // number of KV's + // Size of each KV + for (KeyValue kv : keyValueUpdateSet) { + size += kv.getSize(); + } + return size; + } static Entry decode(Slot slot, ByteBuffer bb) { - long seqnum=bb.getLong(); - long machineid=bb.getLong(); - return new Abort(slot, seqnum, machineid); + long seqnumtrans = bb.getLong(); + int numberOfKeys = bb.getInt(); + + Set kvSet = new HashSet(); + for (int i = 0; i < numberOfKeys; i++) { + KeyValue kv = KeyValue.decode(bb); + kvSet.add(kv); + } + + return new Commit(slot, seqnumtrans, kvSet); } public void encode(ByteBuffer bb) { - bb.put(Entry.TypeAbort); - bb.putLong(seqnum); - bb.putLong(machineid); - } + bb.put(Entry.TypeCommit); + bb.putLong(seqnumtrans); + bb.putInt(keyValueUpdateSet.size()); - public int getSize() { - return 2*Long.BYTES+Byte.BYTES; + for (KeyValue kv : keyValueUpdateSet) { + kv.encode(bb); + } } - public byte getType() { - return Entry.TypeAbort; + public Entry getCopy(Slot s) { + return new Commit(s, seqnumtrans, keyValueUpdateSet); } - public Entry getCopy(Slot s) { - return new Abort(s, seqnum, machineid); + public void updateLiveKeys(Set kvSet) { + + if (!this.isLive()) + return; + + for (KeyValue kv1 : kvSet) { + for (Iterator i = liveValues.iterator(); i.hasNext();) { + KeyValue kv2 = i.next(); + + if (kv1.getKey() == kv2.getKey()) { + liveValues.remove(kv2); + break; + } + } + } + + if (liveValues.size() == 0) + this.setDead(); } } \ No newline at end of file diff --git a/src2/java/iotcloud/Guard.java b/src2/java/iotcloud/Guard.java index aaf312b..2f755cf 100644 --- a/src2/java/iotcloud/Guard.java +++ b/src2/java/iotcloud/Guard.java @@ -98,4 +98,9 @@ class Guard { bb.get(expr); return new Guard(IoTString.shallow(expr)); } + + public Guard getCopy() { + return new Guard(IoTString.shallow(booleanExpression.internalBytes())); + } + } \ No newline at end of file diff --git a/src2/java/iotcloud/KeyValue.java b/src2/java/iotcloud/KeyValue.java index 9fbaf90..79b9e60 100644 --- a/src2/java/iotcloud/KeyValue.java +++ b/src2/java/iotcloud/KeyValue.java @@ -7,13 +7,13 @@ import java.nio.ByteBuffer; * @version 1.0 */ -class KeyValue /*extends Entry */ { +class KeyValue { /*extends Entry */ private IoTString key; private IoTString value; public KeyValue(IoTString _key, IoTString _value) { - key=_key; - value=_value; + key = _key; + value = _value; } public IoTString getKey() { @@ -25,10 +25,10 @@ class KeyValue /*extends Entry */ { } static KeyValue decode(ByteBuffer bb) { - int keylength=bb.getInt(); - int valuelength=bb.getInt(); - byte[] key=new byte[keylength]; - byte[] value=new byte[valuelength]; + int keylength = bb.getInt(); + int valuelength = bb.getInt(); + byte[] key = new byte[keylength]; + byte[] value = new byte[valuelength]; bb.get(key); bb.get(value); return new KeyValue(IoTString.shallow(key), IoTString.shallow(value)); @@ -42,10 +42,14 @@ class KeyValue /*extends Entry */ { } public int getSize() { - return 2*Integer.BYTES+key.length()+value.length(); + return 2 * Integer.BYTES + key.length() + value.length(); } public String toString() { return value.toString(); } + + public KeyValue getCopy() { + return new KeyValue(key, value); + } } diff --git a/src2/java/iotcloud/Table.java b/src2/java/iotcloud/Table.java index 7009ed9..732867f 100644 --- a/src2/java/iotcloud/Table.java +++ b/src2/java/iotcloud/Table.java @@ -8,6 +8,8 @@ import java.util.Vector; import java.util.Random; import java.util.Queue; import java.util.LinkedList; +import java.util.List; + /** * IoTTable data structure. Provides client inferface. * @author Brian Demsky @@ -43,6 +45,11 @@ final public class Table { private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building private Queue pendingTransQueue = null; // Queue of pending transactions + private List commitList = null; // List of all the most recent live commits + private Map commitedTable = null; // Table of committed KV + private List uncommittedTransactionsList = null; // + private Map arbitratorTable = null; // Table of arbitrators + private Set arbitratorTable = null; // Table of arbitrators public Table(String baseurl, String password, long _localmachineid) { @@ -55,6 +62,10 @@ final public class Table { lastliveslotseqn = 1; pendingTransQueue = new LinkedList(); + commitList = new LinkedList(); + commitedTable = new HashMap(); + uncommittedTransactionsList = new LinkedList(); + arbitratorTable = new HashMap(); } public Table(CloudComm _cloud, long _localmachineid) { @@ -66,6 +77,10 @@ final public class Table { cloud = _cloud; pendingTransQueue = new LinkedList(); + commitList = new LinkedList(); + commitedTable = new HashMap(); + uncommittedTransactionsList = new LinkedList(); + arbitratorTable = new HashMap(); } public void rebuild() { @@ -133,11 +148,6 @@ final public class Table { pendingTransBuild.addKV(kv); } - - - - - void decrementLiveCount() { liveslotcount--; } @@ -239,7 +249,7 @@ final public class Table { boolean insertedTrans = false; if (s.hasSpace(trans)) { s.addEntry(trans); - insertedTrans=true; + insertedTrans = true; } /* now go through live entries from least to greatest sequence number until @@ -356,16 +366,7 @@ final public class Table { setResizeThreshold(); } - // private void processEntry(KeyValue entry, SlotIndexer indexer) { - // IoTString key=entry.getKey(); - // KeyValue oldvalue=table.get(key); - // if (oldvalue != null) { - // oldvalue.setDead(); - // } - // table.put(key, entry); - // } - - private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet machineSet) { + private void processEntry(LastMessage entry, HashSet machineSet) { updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } @@ -404,6 +405,57 @@ final public class Table { entry.setWatchSet(watchset); } + private void processEntry(NewKey entry) { + arbitratorTable.put(entry.getKey(), entry.getMachineID()); + } + + private void processEntry(Transaction entry) { + uncommittedTransactionsList.add(entry); + } + + private void processEntry(Abort entry) { + for (Iterator i = uncommittedTransactionsList.iterator(); i.hasNext();) { + Transaction prevtrans = i.next(); + if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) { + uncommittedTransactionsList.remove(prevtrans); + prevtrans.setDead(); + return; + } + } + } + + private void processEntry(Commit entry) { + + for (Iterator i = commitList.iterator(); i.hasNext();) { + Commit prevcommit = i.next(); + prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet()); + + if (!prevcommit.isLive()) { + commitList.remove(prevcommit); + } + } + + commitList.add(entry); + + // Update the committed table list + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + IoTString key = kv.getKey(); + commitedTable.put(key, kv); + } + + long committedTransSeq = entry.getTransSequenceNumber(); + + // Make dead the transactions + for (Iterator i = uncommittedTransactionsList.iterator(); i.hasNext();) { + Transaction prevtrans = i.next(); + + if (prevtrans.getSequenceNumber() <= committedTransSeq) { + uncommittedTransactionsList.remove(prevtrans); + prevtrans.setDead(); + } + } + } + private void addWatchList(long machineid, RejectedMessage entry) { HashSet entries = watchlist.get(machineid); if (entries == null) @@ -411,7 +463,7 @@ final public class Table { entries.add(entry); } - private void processEntry(TableStatus entry, SlotIndexer indexer) { + private void processEntry(TableStatus entry) { int newnumslots = entry.getMaxSlots(); updateCurrMaxSize(newnumslots); if (lastTableStatus != null) @@ -476,12 +528,25 @@ final public class Table { updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet); for (Entry entry : slot.getEntries()) { switch (entry.getType()) { - // case Entry.TypeKeyValue: - // processEntry((KeyValue)entry, indexer); - // break; + + case Entry.TypeNewKey: + processEntry((NewKey)entry); + break; + + case Entry.TypeCommit: + processEntry((Commit)entry); + break; + + case Entry.TypeAbort: + processEntry((Abort)entry); + break; + + case Entry.TypeTransaction: + processEntry((Transaction)entry); + break; case Entry.TypeLastMessage: - processEntry((LastMessage)entry, indexer, machineSet); + processEntry((LastMessage)entry, machineSet); break; case Entry.TypeRejectedMessage: @@ -489,7 +554,7 @@ final public class Table { break; case Entry.TypeTableStatus: - processEntry((TableStatus)entry, indexer); + processEntry((TableStatus)entry); break; default: diff --git a/src2/java/iotcloud/Transaction.java b/src2/java/iotcloud/Transaction.java index d1a9933..d25403e 100644 --- a/src2/java/iotcloud/Transaction.java +++ b/src2/java/iotcloud/Transaction.java @@ -8,15 +8,20 @@ class Transaction extends Entry { private long seqnum; private long machineid; - private Set keyValueUpdateSet; + private Set keyValueUpdateSet = null; private Guard guard; public Transaction(Slot slot, long _seqnum, long _machineid, Set _keyValueUpdateSet, Guard _guard) { super(slot); seqnum = _seqnum; machineid = _machineid; - keyValueUpdateSet = _keyValueUpdateSet; - guard = _guard; + + for (KeyValue kv : _keyValueUpdateSet) { + KeyValue kvCopy = kv.getCopy(); + keyValueUpdateSet.add(kvCopy); + } + + guard = _guard.getCopy(); } public long getMachineID() { @@ -27,8 +32,13 @@ class Transaction extends Entry { return seqnum; } + public Set getkeyValueUpdateSet() { + return keyValueUpdateSet; + } + + public byte getType() { - return Entry.TypeLastMessage; + return Entry.TypeTransaction; } public int getSize() { @@ -52,6 +62,7 @@ class Transaction extends Entry { bb.putLong(seqnum); bb.putLong(machineid); + bb.putInt(keyValueUpdateSet.size()); for (KeyValue kv : keyValueUpdateSet) { kv.encode(bb); } @@ -65,7 +76,6 @@ class Transaction extends Entry { int numberOfKeys = bb.getInt(); Set kvSet = new HashSet(); - for (int i = 0; i < numberOfKeys; i++) { KeyValue kv = KeyValue.decode(bb); kvSet.add(kv); @@ -76,8 +86,6 @@ class Transaction extends Entry { return new Transaction(slot, seqnum, machineid, kvSet, guard); } - - public Entry getCopy(Slot s) { return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard); }