*/
final public class Table {
- private int numslots;
+ private int numslots; //number of slots stored in buffer
+
+ //table of key-value pairs
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+
+ // machine id -> (sequence number, Slot or LastMessage); records last message by each client
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ // machine id -> ...
private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+ private Vector<Long> rejectedmessagelist=new Vector<Long>();
private SlotBuffer buffer;
private CloudComm cloud;
- private long sequencenumber;
+ private long sequencenumber; //Largest sequence number a client has received
private long localmachineid;
private TableStatus lastTableStatus;
- static final int FREE_SLOTS = 10;
+ static final int FREE_SLOTS = 10; //number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
private long liveslotcount=0;
private int chance;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
+ static final int REJECTED_THRESHOLD = 5;
private int resizethreshold;
- private long lastliveslotseqn;
+ private long lastliveslotseqn; //smallest sequence number with a live entry
private Random random=new Random();
public Table(String baseurl, String password, long _localmachineid) {
liveslotcount--;
}
+
private void setResizeThreshold() {
int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
private boolean tryput(IoTString key, IoTString value, boolean resize) {
Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
- if (liveslotcount > resizethreshold) {
- System.out.print("A");
- resize=true;
- newsize = (int) (numslots * RESIZE_MULTIPLE);
+ if (liveslotcount > resizethreshold) {
+ resize=true; //Resize is forced
}
if (resize) {
s.addEntry(status);
}
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn=rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn=rejectedmessagelist.lastElement();
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i=0;
+ /* Go through list of missing messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn=curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn=rejectedmessagelist.get(i);
+ Slot s_msg=buffer.getSlot(curr_seqn);
+ long machineid=s_msg.getMachineID();
+ RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
long newestseqnum = buffer.getNewestSeqNum();
long oldestseqnum = buffer.getOldestSeqNum();
if (lastliveslotseqn < oldestseqnum)
long seqn = lastliveslotseqn;
boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots;
- long threshold = firstiffull + FREE_SLOTS;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
for(; seqn < threshold; seqn++) {
Slot prevslot=buffer.getSlot(seqn);
//Push slot number forward
- if (!seenliveslot)
+ if (! seenliveslot)
lastliveslotseqn = seqn;
- if (!prevslot.isLive())
+ if (! prevslot.isLive())
continue;
seenliveslot = true;
Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
for(Entry liveentry:liveentries) {
if (s.hasSpace(liveentry)) {
s.addEntry(liveentry);
- } else if (seqn==firstiffull) {
+ } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
if (!resize) {
- System.out.print("B");
+ System.out.print("B"); //?
return tryput(key, value, true);
}
}
insertedkv=true;
}
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
int skipcount=0;
search:
for(; seqn <= newestseqnum; seqn++) {
if (resize)
max = newsize;
Slot[] array=cloud.putSlot(s, max);
- if (array == null)
+ if (array == null) {
array = new Slot[] {s};
- else
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
insertedkv=false;
-
+ }
+
/* update data structure */
validateandupdate(array, true);
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
/* The cloud communication layer has checked slot HMACs already
before decoding */
- if (newslots.length==0)
- return;
+ if (newslots.length==0) return;
long firstseqnum=newslots[0].getSequenceNumber();
if (firstseqnum <= sequencenumber)
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
- HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet()); //
initExpectedSize(firstseqnum);
for(Slot slot: newslots) {
private void initExpectedSize(long firstsequencenumber) {
long prevslots = firstsequencenumber;
- expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+ expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
currmaxsize = numslots;
}
long newseqnum=entry.getNewSeqNum();
boolean isequal=entry.getEqual();
long machineid=entry.getMachineID();
- for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
+ for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
Slot slot=indexer.getSlot(seqnum);
if (slot != null) {
long slotmachineid=slot.getMachineID();
- if (isequal!=(slotmachineid==machineid)) {
+ if (isequal != (slotmachineid==machineid)) {
throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
}
}
}
HashSet<Long> watchset=new HashSet<Long>();
- for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+ for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
long entry_mid=lastmsg_entry.getKey();
/* We've seen it, don't need to continue to watch. Our next
* message will implicitly acknowledge it. */