\EndIf\\\r
\r
\If{$\lnot$\Call{EvaluateGuard}{$Guard_t, CurrKV$}}\r
- \State $abortde \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
+ \State $abort_{de} \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
\LeftComment{No more space so we cant arbitrate any further}\r
- \If($lnot$\Call{DeHasSpace}{$DE_a, abortde$})\r
+ \If($\lnot$\Call{DeHasSpace}{$DE_a, abort_{de}$})\r
\State \Return{$DE_a$}\r
\EndIf\r
- \State $DE_a \gets DE_a \cup abortde$\r
+ \State $DE_a \gets DE_a \cup abort_{de}$\r
\Else\r
\State $DKV \gets \{\tuple{k,v}| \tuple{k,v} \in KV \land \tuple{k',v'}\in KV_t \land k'=k\}$\r
\State $KVTmp \gets (KV \setminus DKV) \cup KV'$\r
class Abort extends Entry {
- private long seqnum;
+ private long seqnumtrans;
private long machineid;
- Abort(Slot slot, long _seqnum, long _machineid) {
+ Abort(Slot slot, long _seqnumtrans, long _machineid) {
super(slot);
- seqnum=_seqnum;
+ seqnumtrans=_seqnumtrans;
machineid=_machineid;
}
return machineid;
}
- long getSequenceNumber() {
- return seqnum;
+ long getTransSequenceNumber() {
+ return seqnumtrans;
}
static Entry decode(Slot slot, ByteBuffer bb) {
- long seqnum=bb.getLong();
+ long seqnumtrans=bb.getLong();
long machineid=bb.getLong();
- return new Abort(slot, seqnum, machineid);
+ return new Abort(slot, seqnumtrans, machineid);
}
void encode(ByteBuffer bb) {
bb.put(Entry.TypeAbort);
- bb.putLong(seqnum);
+ bb.putLong(seqnumtrans);
bb.putLong(machineid);
}
}
Entry getCopy(Slot s) {
- return new Abort(s, seqnum, machineid);
+ return new Abort(s, seqnumtrans, machineid);
}
}
\ No newline at end of file
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.HashSet;
+import java.util.Iterator;
/**
- * This Entry records the abort sent by a given machine.
+ * This Entry records the commit of a transaction.
* @author Ali Younis <ayounis@uci.edu>
* @version 1.0
*/
class Commit extends Entry {
- private long seqnum;
- private Set<KeyValue> keyValueUpdateSet;
+ private long seqnumtrans;
+ private Set<KeyValue> keyValueUpdateSet = null;
+ private Set<KeyValue> liveValues = null;
- public Commit(Slot slot, long _seqnum, long _machineid) {
+ public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
super(slot);
- seqnum=_seqnum;
- machineid=_machineid;
+ seqnumtrans = _seqnumtrans;
+
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ liveValues = new HashSet<KeyValue>();
+
+ for (KeyValue kv : _keyValueUpdateSet) {
+ KeyValue kvCopy = kv.getCopy();
+ keyValueUpdateSet.add(kvCopy);
+ liveValues.add(kvCopy);
+ }
}
- public long getSequenceNumber() {
- return seqnum;
+ public long getTransSequenceNumber() {
+ return seqnumtrans;
}
+ public Set<KeyValue> getkeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+ public byte getType() {
+ return Entry.TypeCommit;
+ }
+ public int getSize() {
+ int size = Long.BYTES + Byte.BYTES; // seq id, entry type
+ size += Integer.BYTES; // number of KV's
+ // Size of each KV
+ for (KeyValue kv : keyValueUpdateSet) {
+ size += kv.getSize();
+ }
+ return size;
+ }
static Entry decode(Slot slot, ByteBuffer bb) {
- long seqnum=bb.getLong();
- long machineid=bb.getLong();
- return new Abort(slot, seqnum, machineid);
+ long seqnumtrans = bb.getLong();
+ int numberOfKeys = bb.getInt();
+
+ Set<KeyValue> kvSet = new HashSet<KeyValue>();
+ for (int i = 0; i < numberOfKeys; i++) {
+ KeyValue kv = KeyValue.decode(bb);
+ kvSet.add(kv);
+ }
+
+ return new Commit(slot, seqnumtrans, kvSet);
}
public void encode(ByteBuffer bb) {
- bb.put(Entry.TypeAbort);
- bb.putLong(seqnum);
- bb.putLong(machineid);
- }
+ bb.put(Entry.TypeCommit);
+ bb.putLong(seqnumtrans);
+ bb.putInt(keyValueUpdateSet.size());
- public int getSize() {
- return 2*Long.BYTES+Byte.BYTES;
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv.encode(bb);
+ }
}
- public byte getType() {
- return Entry.TypeAbort;
+ public Entry getCopy(Slot s) {
+ return new Commit(s, seqnumtrans, keyValueUpdateSet);
}
- public Entry getCopy(Slot s) {
- return new Abort(s, seqnum, machineid);
+ public void updateLiveKeys(Set<KeyValue> kvSet) {
+
+ if (!this.isLive())
+ return;
+
+ for (KeyValue kv1 : kvSet) {
+ for (Iterator<KeyValue> i = liveValues.iterator(); i.hasNext();) {
+ KeyValue kv2 = i.next();
+
+ if (kv1.getKey() == kv2.getKey()) {
+ liveValues.remove(kv2);
+ break;
+ }
+ }
+ }
+
+ if (liveValues.size() == 0)
+ this.setDead();
}
}
\ No newline at end of file
bb.get(expr);
return new Guard(IoTString.shallow(expr));
}
+
+ public Guard getCopy() {
+ return new Guard(IoTString.shallow(booleanExpression.internalBytes()));
+ }
+
}
\ No newline at end of file
* @version 1.0
*/
-class KeyValue /*extends Entry */ {
+class KeyValue { /*extends Entry */
private IoTString key;
private IoTString value;
public KeyValue(IoTString _key, IoTString _value) {
- key=_key;
- value=_value;
+ key = _key;
+ value = _value;
}
public IoTString getKey() {
}
static KeyValue decode(ByteBuffer bb) {
- int keylength=bb.getInt();
- int valuelength=bb.getInt();
- byte[] key=new byte[keylength];
- byte[] value=new byte[valuelength];
+ int keylength = bb.getInt();
+ int valuelength = bb.getInt();
+ byte[] key = new byte[keylength];
+ byte[] value = new byte[valuelength];
bb.get(key);
bb.get(value);
return new KeyValue(IoTString.shallow(key), IoTString.shallow(value));
}
public int getSize() {
- return 2*Integer.BYTES+key.length()+value.length();
+ return 2 * Integer.BYTES + key.length() + value.length();
}
public String toString() {
return value.toString();
}
+
+ public KeyValue getCopy() {
+ return new KeyValue(key, value);
+ }
}
import java.util.Random;
import java.util.Queue;
import java.util.LinkedList;
+import java.util.List;
+
/**
* IoTTable data structure. Provides client inferface.
* @author Brian Demsky
private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+ private List<Commit> commitList = null; // List of all the most recent live commits
+ private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ private List<Transaction> uncommittedTransactionsList = null; //
+ private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+ private Set<Abort> arbitratorTable = null; // Table of arbitrators
public Table(String baseurl, String password, long _localmachineid) {
lastliveslotseqn = 1;
pendingTransQueue = new LinkedList<PendingTransaction>();
+ commitList = new LinkedList<Commit>();
+ commitedTable = new HashMap<IoTString, KeyValue>();
+ uncommittedTransactionsList = new LinkedList<Transaction>();
+ arbitratorTable = new HashMap<IoTString, Long>();
}
public Table(CloudComm _cloud, long _localmachineid) {
cloud = _cloud;
pendingTransQueue = new LinkedList<PendingTransaction>();
+ commitList = new LinkedList<Commit>();
+ commitedTable = new HashMap<IoTString, KeyValue>();
+ uncommittedTransactionsList = new LinkedList<Transaction>();
+ arbitratorTable = new HashMap<IoTString, Long>();
}
public void rebuild() {
pendingTransBuild.addKV(kv);
}
-
-
-
-
-
void decrementLiveCount() {
liveslotcount--;
}
boolean insertedTrans = false;
if (s.hasSpace(trans)) {
s.addEntry(trans);
- insertedTrans=true;
+ insertedTrans = true;
}
/* now go through live entries from least to greatest sequence number until
setResizeThreshold();
}
- // private void processEntry(KeyValue entry, SlotIndexer indexer) {
- // IoTString key=entry.getKey();
- // KeyValue oldvalue=table.get(key);
- // if (oldvalue != null) {
- // oldvalue.setDead();
- // }
- // table.put(key, entry);
- // }
-
- private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
entry.setWatchSet(watchset);
}
+ private void processEntry(NewKey entry) {
+ arbitratorTable.put(entry.getKey(), entry.getMachineID());
+ }
+
+ private void processEntry(Transaction entry) {
+ uncommittedTransactionsList.add(entry);
+ }
+
+ private void processEntry(Abort entry) {
+ for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next();
+ if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
+ uncommittedTransactionsList.remove(prevtrans);
+ prevtrans.setDead();
+ return;
+ }
+ }
+ }
+
+ private void processEntry(Commit entry) {
+
+ for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+ Commit prevcommit = i.next();
+ prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+ if (!prevcommit.isLive()) {
+ commitList.remove(prevcommit);
+ }
+ }
+
+ commitList.add(entry);
+
+ // Update the committed table list
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ IoTString key = kv.getKey();
+ commitedTable.put(key, kv);
+ }
+
+ long committedTransSeq = entry.getTransSequenceNumber();
+
+ // Make dead the transactions
+ for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next();
+
+ if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+ uncommittedTransactionsList.remove(prevtrans);
+ prevtrans.setDead();
+ }
+ }
+ }
+
private void addWatchList(long machineid, RejectedMessage entry) {
HashSet<RejectedMessage> entries = watchlist.get(machineid);
if (entries == null)
entries.add(entry);
}
- private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
updateCurrMaxSize(newnumslots);
if (lastTableStatus != null)
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
for (Entry entry : slot.getEntries()) {
switch (entry.getType()) {
- // case Entry.TypeKeyValue:
- // processEntry((KeyValue)entry, indexer);
- // break;
+
+ case Entry.TypeNewKey:
+ processEntry((NewKey)entry);
+ break;
+
+ case Entry.TypeCommit:
+ processEntry((Commit)entry);
+ break;
+
+ case Entry.TypeAbort:
+ processEntry((Abort)entry);
+ break;
+
+ case Entry.TypeTransaction:
+ processEntry((Transaction)entry);
+ break;
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer, machineSet);
+ processEntry((LastMessage)entry, machineSet);
break;
case Entry.TypeRejectedMessage:
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry, indexer);
+ processEntry((TableStatus)entry);
break;
default:
private long seqnum;
private long machineid;
- private Set<KeyValue> keyValueUpdateSet;
+ private Set<KeyValue> keyValueUpdateSet = null;
private Guard guard;
public Transaction(Slot slot, long _seqnum, long _machineid, Set<KeyValue> _keyValueUpdateSet, Guard _guard) {
super(slot);
seqnum = _seqnum;
machineid = _machineid;
- keyValueUpdateSet = _keyValueUpdateSet;
- guard = _guard;
+
+ for (KeyValue kv : _keyValueUpdateSet) {
+ KeyValue kvCopy = kv.getCopy();
+ keyValueUpdateSet.add(kvCopy);
+ }
+
+ guard = _guard.getCopy();
}
public long getMachineID() {
return seqnum;
}
+ public Set<KeyValue> getkeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+
public byte getType() {
- return Entry.TypeLastMessage;
+ return Entry.TypeTransaction;
}
public int getSize() {
bb.putLong(seqnum);
bb.putLong(machineid);
+ bb.putInt(keyValueUpdateSet.size());
for (KeyValue kv : keyValueUpdateSet) {
kv.encode(bb);
}
int numberOfKeys = bb.getInt();
Set<KeyValue> kvSet = new HashSet<KeyValue>();
-
for (int i = 0; i < numberOfKeys; i++) {
KeyValue kv = KeyValue.decode(bb);
kvSet.add(kv);
return new Transaction(slot, seqnum, machineid, kvSet, guard);
}
-
-
public Entry getCopy(Slot s) {
return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard);
}