package iotcloud;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
import java.util.Arrays;
import java.util.Vector;
+import java.util.Random;
+
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
final public class Table {
private int numslots;
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
private SlotBuffer buffer;
private CloudComm cloud;
private long sequencenumber;
private long localmachineid;
private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10;
- static final int FORCED_RESIZE_INCREMENT = 20;
-
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount=0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ private int resizethreshold;
+ private long lastliveslotseqn;
+ private Random random=new Random();
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
- cloud=new CloudComm(baseurl, password);
+ cloud=new CloudComm(this, baseurl, password);
+ lastliveslotseqn = 1;
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
cloud=_cloud;
}
Slot[] newslots=cloud.getSlots(sequencenumber+1);
validateandupdate(newslots, true);
}
-
+
public void update() {
Slot[] newslots=cloud.getSlots(sequencenumber+1);
}
public void initTable() {
- Slot s=new Slot(1, localmachineid);
+ cloud.setSalt();//Set the salt
+ Slot s=new Slot(this, 1, localmachineid);
TableStatus status=new TableStatus(s, numslots);
s.addEntry(status);
Slot[] array=cloud.putSlot(s, numslots);
if (array == null) {
array = new Slot[] {s};
- validateandupdate(array, true); // update data structure
+ /* update data structure */
+ validateandupdate(array, true);
} else {
throw new Error("Error on initialization");
}
}
+ public String toString() {
+ return table.toString();
+ }
+
public IoTString put(IoTString key, IoTString value) {
while(true) {
KeyValue oldvalue=table.get(key);
}
}
- private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
- Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- long seqn = buffer.getOldestSeqNum();
-
- if (forcedresize) {
- TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+ void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+ private void setResizeThreshold() {
+ int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
+ resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean resize) {
+ Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize=true;
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status=new TableStatus(s, newsize);
s.addEntry(status);
}
- if ((numslots - buffer.size()) < FREE_SLOTS) {
- //have to check whether we have enough free slots
- long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
- seqn = fullfirstseqn < 1?1:fullfirstseqn;
- for(int i=0; i < FREE_SLOTS; i++, seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- if (!prevslot.isLive())
- continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
- for(Entry liveentry:liveentries) {
- if (redundant(liveentry))
- continue;
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else if (i==0) {
- if (s.canFit(liveentry))
- s.addEntry(liveentry);
- else if (!forcedresize) {
- return tryput(key, value, true);
- }
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots;
+ long threshold = firstiffull + FREE_SLOTS;
+
+ for(; seqn < threshold; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn==firstiffull) {
+ if (!resize) {
+ System.out.print("B");
+ return tryput(key, value, true);
}
}
}
}
+
KeyValue kv=new KeyValue(s, key, value);
boolean insertedkv=false;
if (s.hasSpace(kv)) {
insertedkv=true;
}
- long newestseqnum=buffer.getNewestSeqNum();
-search:
- for(; seqn<=newestseqnum; seqn++) {
+ int skipcount=0;
+ search:
+ for(; seqn <= newestseqnum; seqn++) {
Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
if (!prevslot.isLive())
continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
for(Entry liveentry:liveentries) {
- if (redundant(liveentry))
- continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
- else
- break search;
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
}
}
int max=0;
- if (forcedresize)
- max = numslots + FORCED_RESIZE_INCREMENT;
+ if (resize)
+ max = newsize;
Slot[] array=cloud.putSlot(s, max);
if (array == null)
array = new Slot[] {s};
else
insertedkv=false;
- validateandupdate(array, true); // update data structure
+ /* update data structure */
+ validateandupdate(array, true);
return insertedkv;
}
- boolean redundant(Entry liveentry) {
- if (liveentry.getType()==Entry.TypeLastMessage) {
- LastMessage lastmsg=(LastMessage) liveentry;
- return lastmsg.getMachineID() == localmachineid;
- }
- return false;
- }
-
-
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
- //The cloud communication layer has checked slot HMACs already
- //before decoding
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
if (newslots.length==0)
return;
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
- initExpectedSize();
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+
+ initExpectedSize(firstseqnum);
for(Slot slot: newslots) {
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
updateExpectedSize();
- processSlot(indexer, slot, acceptupdatestolocal);
}
- //If there is a gap, check to see if the server sent us everything
- if (firstseqnum != (sequencenumber+1))
+ /* If there is a gap, check to see if the server sent us everything. */
+ if (firstseqnum != (sequencenumber+1)) {
checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty())
+ throw new Error("Missing record for machines: "+machineSet);
+ }
commitNewMaxSize();
- //commit new to slots
+ /* Commit new to slots. */
for(Slot slot:newslots) {
buffer.putSlot(slot);
+ liveslotcount++;
}
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
}
- private void initExpectedSize() {
- long prevslots = sequencenumber;
+ private void initExpectedSize(long firstsequencenumber) {
+ long prevslots = firstsequencenumber;
expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
currmaxsize = numslots;
}
buffer.resize(currmaxsize);
numslots=currmaxsize;
+ setResizeThreshold();
}
private void processEntry(KeyValue entry, SlotIndexer indexer) {
table.put(key, entry);
}
- private void processEntry(LastMessage entry, SlotIndexer indexer) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
+ private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
}
}
}
+
+ HashSet<Long> watchset=new HashSet<Long>();
+ for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+ long entry_mid=lastmsg_entry.getKey();
+ /* We've seen it, don't need to continue to watch. Our next
+ * message will implicitly acknowledge it. */
+ if (entry_mid == localmachineid)
+ continue;
+ Pair<Long, Liveness> v=lastmsg_entry.getValue();
+ long entry_seqn=v.getFirst();
+ if (entry_seqn < newseqnum) {
+ addWatchList(entry_mid, entry);
+ watchset.add(entry_mid);
+ }
+ }
+ if (watchset.isEmpty())
+ entry.setDead();
+ else
+ entry.setWatchSet(watchset);
+ }
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries=watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+ entries.add(entry);
}
private void processEntry(TableStatus entry, SlotIndexer indexer) {
lastTableStatus = entry;
}
- private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
+
+ HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+ if (watchset != null) {
+ for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+ RejectedMessage rm=rmit.next();
+ if (rm.getNewSeqNum() <= seqnum) {
+ /* Remove it from our watchlist */
+ rmit.remove();
+ /* Decrement machines that need to see this notification */
+ rm.removeWatcher(machineid);
+ }
+ }
+ }
+
+ if (machineid == localmachineid) {
+ /* Our own messages are immediately dead. */
+ if (liveness instanceof LastMessage) {
+ ((LastMessage)liveness).setDead();
+ } else if (liveness instanceof Slot) {
+ ((Slot)liveness).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
long lastmsgseqnum = lastmsgentry.getFirst();
Liveness lastentry = lastmsgentry.getSecond();
- if (lastentry instanceof LastMessage) {
- ((LastMessage)lastentry).setDead();
- } else if (lastentry instanceof Slot) {
- ((Slot)lastentry).setDead();
- } else {
- throw new Error("Unrecognized type");
+ if (machineid != localmachineid) {
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).setDead();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
}
-
+
if (machineid == localmachineid) {
if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
throw new Error("Server Error: Mismatch on local machine sequence number");
}
}
- private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
-
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue:
break;
case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer);
+ processEntry((LastMessage)entry, indexer, machineSet);
break;
case Entry.TypeRejectedMessage: