import java.util.Queue;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.Collection;
/**
* IoTTable data structure. Provides client inferface.
private int numslots; //number of slots stored in buffer
//table of key-value pairs
- private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
+ //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
// machine id -> (sequence number, Slot or LastMessage); records last message by each client
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
private List<Commit> commitList = null; // List of all the most recent live commits
+ private Set<Abort> abortSet = null; // Set of the live aborts
private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
private List<Transaction> uncommittedTransactionsList = null; //
private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
- private Set<Abort> arbitratorTable = null; // Table of arbitrators
+ // private Set<Abort> arbitratorTable = null; // Table of arbitrators
public Table(String baseurl, String password, long _localmachineid) {
pendingTransQueue = new LinkedList<PendingTransaction>();
commitList = new LinkedList<Commit>();
+ abortSet = new HashSet<Abort>();
commitedTable = new HashMap<IoTString, KeyValue>();
+ speculativeTable = new HashMap<IoTString, KeyValue>();
uncommittedTransactionsList = new LinkedList<Transaction>();
arbitratorTable = new HashMap<IoTString, Long>();
}
pendingTransQueue = new LinkedList<PendingTransaction>();
commitList = new LinkedList<Commit>();
+ abortSet = new HashSet<Abort>();
commitedTable = new HashMap<IoTString, KeyValue>();
+ speculativeTable = new HashMap<IoTString, KeyValue>();
uncommittedTransactionsList = new LinkedList<Transaction>();
arbitratorTable = new HashMap<IoTString, Long>();
}
validateandupdate(newslots, true);
}
- public void update() {
- Slot[] newslots = cloud.getSlots(sequencenumber + 1);
- validateandupdate(newslots, false);
+
+ public IoTString getCommitted(IoTString key) {
+ KeyValue kv = commitedTable.get(key);
+ if (kv != null) {
+ return kv.getValue();
+ } else {
+ return null;
+ }
}
- public IoTString get(IoTString key) {
- KeyValue kv = table.get(key);
- if (kv != null)
+ public IoTString getSpeculative(IoTString key) {
+ KeyValue kv = speculativeTable.get(key);
+ if (kv != null) {
return kv.getValue();
- else
+ } else {
return null;
+ }
}
+
public void initTable() {
cloud.setSalt();//Set the salt
Slot s = new Slot(this, 1, localmachineid);
}
public String toString() {
- return table.toString();
+
+
+ String retString = " Committed Table: \n";
+ retString += "---------------------------\n";
+ retString += commitedTable.toString();
+
+ retString += "\n\n";
+
+ retString += " Speculative Table: \n";
+ retString += "---------------------------\n";
+ retString += speculativeTable.toString();
+
+ return retString;
}
+
+
+
+
+
public void startTransaction() {
// Create a new transaction, invalidates any old pending transactions.
pendingTransBuild = new PendingTransaction();
public void commitTransaction() {
+ if (pendingTransBuild.getKVUpdates().size() == 0) {
+ // If no updates are made then there is no point inserting into the chain
+ return;
+ }
+
// Add the pending transaction to the queue
pendingTransQueue.add(pendingTransBuild);
}
public void addKV(IoTString key, IoTString value) {
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
+ // TODO: Maybe not throw and error
+ throw new Error("Not all Key Values match");
+ }
+
+
+
KeyValue kv = new KeyValue(key, value);
pendingTransBuild.addKV(kv);
}
+
+ // TODo: FIx Guard
public void addGuard(IoTString key, IoTString value) {
KeyValue kv = new KeyValue(key, value);
pendingTransBuild.addKV(kv);
}
+ public void update() {
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+
+ validateandupdate(newslots, false);
+ }
+
+ public boolean createNewKey(IoTString keyName, long machineId) {
+
+ while (true) {
+ if (arbitratorTable.get(keyName) != null) {
+ // There is already an arbitrator
+ return false;
+ }
+
+ if (tryput(keyName, machineId, false)) {
+
+ // If successfully inserted
+ return true;
+ }
+ }
+ }
+
void decrementLiveCount() {
liveslotcount--;
}
long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+ // Mandatory Rescue
for (; seqn < threshold; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
//Push slot number forward
}
+ // Arbitrate
+ Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ for (Transaction ut : uncommittedTransactionsList) {
+
+ KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+ // Check if this machine arbitrates for this transaction
+ if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ continue;
+ }
+
+ Entry newEntry = null;
+
+ try {
+ if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ // Guard evaluated as true
+
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+
+ // create the commit
+ newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+ } else {
+ // Guard was false
+
+ // create the abort
+ newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if ((newEntry != null) && s.hasSpace(newEntry)) {
+ s.addEntry(newEntry);
+ } else {
+ break;
+ }
+ }
+
Transaction trans = new Transaction(s,
s.getSequenceNumber(),
localmachineid,
return insertedTrans;
}
+ private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn = rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn = rejectedmessagelist.lastElement();
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i = 0;
+ /* Go through list of missing messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn = curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ long machineid = s_msg.getMachineID();
+ RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+
+ // Mandatory Rescue
+ for (; seqn < threshold; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.print("B"); //?
+ return tryput(keyName, arbMachineid, true);
+ }
+ }
+ }
+ }
+
+
+ // Arbitrate
+ Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ for (Transaction ut : uncommittedTransactionsList) {
+
+ KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+ // Check if this machine arbitrates for this transaction
+ if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ continue;
+ }
+
+ Entry newEntry = null;
+
+ try {
+ if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ // Guard evaluated as true
+
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+
+ // create the commit
+ newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+ } else {
+ // Guard was false
+
+ // create the abort
+ newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if ((newEntry != null) && s.hasSpace(newEntry)) {
+ s.addEntry(newEntry);
+ } else {
+ break;
+ }
+ }
+
+
+ NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+ boolean insertedNewKey = false;
+ if (s.hasSpace(newKey)) {
+ s.addEntry(newKey);
+ insertedNewKey = true;
+ }
+
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount = 0;
+ search:
+ for (; seqn <= newestseqnum; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+
+ int max = 0;
+ if (resize)
+ max = newsize;
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ insertedNewKey = false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+
+ return insertedNewKey;
+ }
+
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
/* The cloud communication layer has checked slot HMACs already
before decoding */
if (newslots.length == 0) return;
long firstseqnum = newslots[0].getSequenceNumber();
- if (firstseqnum <= sequencenumber)
+ if (firstseqnum <= sequencenumber) {
throw new Error("Server Error: Sent older slots!");
+ }
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
/* If there is a gap, check to see if the server sent us everything. */
if (firstseqnum != (sequencenumber + 1)) {
+
+ // TODO: Check size
checkNumSlots(newslots.length);
- if (!machineSet.isEmpty())
+ if (!machineSet.isEmpty()) {
throw new Error("Missing record for machines: " + machineSet);
+ }
}
commitNewMaxSize();
liveslotcount++;
}
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+
+ // Speculate on key value pairs
+ createSpeculativeTable();
+ }
+
+ private void createSpeculativeTable() {
+ Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+ for (Transaction trans : uncommittedTransactionsList) {
+
+ try {
+ if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ speculativeTable = speculativeTableTmp;
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
- if (numslots != expectedsize)
+ if (numslots != expectedsize) {
throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+ }
}
private void initExpectedSize(long firstsequencenumber) {
private void updateExpectedSize() {
expectedsize++;
- if (expectedsize > currmaxsize)
+ if (expectedsize > currmaxsize) {
expectedsize = currmaxsize;
+ }
}
private void updateCurrMaxSize(int newmaxsize) {
setResizeThreshold();
}
+
+
+
+
private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
}
private void processEntry(Abort entry) {
+
+
+ if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
+ // Abort has not been seen yet so we need to keep track of it
+ abortSet.add(entry);
+ } else {
+ // The machine already saw this so it is dead
+ entry.setDead();
+ }
+
for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
Transaction prevtrans = i.next();
if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
}
}
- private void addWatchList(long machineid, RejectedMessage entry) {
- HashSet<RejectedMessage> entries = watchlist.get(machineid);
- if (entries == null)
- watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
- entries.add(entry);
- }
-
private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
updateCurrMaxSize(newnumslots);
lastTableStatus = entry;
}
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries = watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+ entries.add(entry);
+ }
+
private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
machineSet.remove(machineid);
}
}
+ // Set dead the abort
+ for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
+ Abort abort = ait.next();
+
+ if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
+ abort.setDead();
+ ait.remove();
+ }
+ }
+
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)