+++ /dev/null
-package iotcloud;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Iterator;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.Vector;
-import java.util.Random;
-
-/**
- * IoTTable data structure. Provides client inferface.
- * @author Brian Demsky
- * @version 1.0
- */
-
-final public class Table {
- private int numslots; //number of slots stored in buffer
-
- //table of key-value pairs
- private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
-
- // machine id -> (sequence number, Slot or LastMessage); records last message by each client
- private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
- // machine id -> ...
- private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
- private Vector<Long> rejectedmessagelist=new Vector<Long>();
- private SlotBuffer buffer;
- private CloudComm cloud;
- private long sequencenumber; //Largest sequence number a client has received
- private long localmachineid;
- private TableStatus lastTableStatus;
- static final int FREE_SLOTS = 10; //number of slots that should be kept free
- static final int SKIP_THRESHOLD = 10;
- private long liveslotcount=0;
- private int chance;
- static final double RESIZE_MULTIPLE = 1.2;
- static final double RESIZE_THRESHOLD = 0.75;
- static final int REJECTED_THRESHOLD = 5;
- private int resizethreshold;
- private long lastliveslotseqn; //smallest sequence number with a live entry
- private Random random=new Random();
-
- public Table(String baseurl, String password, long _localmachineid) {
- localmachineid=_localmachineid;
- buffer = new SlotBuffer();
- numslots = buffer.capacity();
- setResizeThreshold();
- sequencenumber = 0;
- cloud=new CloudComm(this, baseurl, password);
- lastliveslotseqn = 1;
- }
-
- public Table(CloudComm _cloud, long _localmachineid) {
- localmachineid=_localmachineid;
- buffer = new SlotBuffer();
- numslots = buffer.capacity();
- setResizeThreshold();
- sequencenumber = 0;
- cloud=_cloud;
- }
-
- public void rebuild() {
- Slot[] newslots=cloud.getSlots(sequencenumber+1);
- validateandupdate(newslots, true);
- }
-
- public void update() {
- Slot[] newslots=cloud.getSlots(sequencenumber+1);
-
- validateandupdate(newslots, false);
- }
-
- public IoTString get(IoTString key) {
- KeyValue kv=table.get(key);
- if (kv != null)
- return kv.getValue();
- else
- return null;
- }
-
- public void initTable() {
- cloud.setSalt();//Set the salt
- Slot s=new Slot(this, 1, localmachineid);
- TableStatus status=new TableStatus(s, numslots);
- s.addEntry(status);
- Slot[] array=cloud.putSlot(s, numslots);
- if (array == null) {
- array = new Slot[] {s};
- /* update data structure */
- validateandupdate(array, true);
- } else {
- throw new Error("Error on initialization");
- }
- }
-
- public String toString() {
- return table.toString();
- }
-
- public IoTString put(IoTString key, IoTString value) {
- while(true) {
- KeyValue oldvalue=table.get(key);
- if (tryput(key, value, false)) {
- if (oldvalue==null)
- return null;
- else
- return oldvalue.getValue();
- }
- }
- }
-
- void decrementLiveCount() {
- liveslotcount--;
- }
-
-
- private void setResizeThreshold() {
- int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
- resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
- }
-
- private boolean tryput(IoTString key, IoTString value, boolean resize) {
- Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- int newsize = 0;
- if (liveslotcount > resizethreshold) {
- resize=true; //Resize is forced
- }
-
- if (resize) {
- newsize = (int) (numslots * RESIZE_MULTIPLE);
- TableStatus status=new TableStatus(s, newsize);
- s.addEntry(status);
- }
-
- if (! rejectedmessagelist.isEmpty()) {
- /* TODO: We should avoid generating a rejected message entry if
- * there is already a sufficient entry in the queue (e.g.,
- * equalsto value of true and same sequence number). */
-
- long old_seqn=rejectedmessagelist.firstElement();
- if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
- long new_seqn=rejectedmessagelist.lastElement();
- RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
- s.addEntry(rm);
- } else {
- long prev_seqn = -1;
- int i=0;
- /* Go through list of missing messages */
- for(; i<rejectedmessagelist.size(); i++) {
- long curr_seqn = rejectedmessagelist.get(i);
- Slot s_msg = buffer.getSlot(curr_seqn);
- if (s_msg != null)
- break;
- prev_seqn=curr_seqn;
- }
- /* Generate rejected message entry for missing messages */
- if (prev_seqn != -1) {
- RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
- s.addEntry(rm);
- }
- /* Generate rejected message entries for present messages */
- for(; i<rejectedmessagelist.size(); i++) {
- long curr_seqn=rejectedmessagelist.get(i);
- Slot s_msg=buffer.getSlot(curr_seqn);
- long machineid=s_msg.getMachineID();
- RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
- s.addEntry(rm);
- }
- }
- }
-
- long newestseqnum = buffer.getNewestSeqNum();
- long oldestseqnum = buffer.getOldestSeqNum();
- if (lastliveslotseqn < oldestseqnum)
- lastliveslotseqn = oldestseqnum;
-
- long seqn = lastliveslotseqn;
- boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
- long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
-
- for(; seqn < threshold; seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- //Push slot number forward
- if (! seenliveslot)
- lastliveslotseqn = seqn;
-
- if (! prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for(Entry liveentry:liveentries) {
- if (s.hasSpace(liveentry)) {
- s.addEntry(liveentry);
- } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
- if (!resize) {
- System.out.print("B"); //?
- return tryput(key, value, true);
- }
- }
- }
- }
-
- KeyValue kv=new KeyValue(s, key, value);
- boolean insertedkv=false;
- if (s.hasSpace(kv)) {
- s.addEntry(kv);
- insertedkv=true;
- }
-
- /* now go through live entries from least to greatest sequence number until
- * either all live slots added, or the slot doesn't have enough room
- * for SKIP_THRESHOLD consecutive entries*/
- int skipcount=0;
- search:
- for(; seqn <= newestseqnum; seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- //Push slot number forward
- if (!seenliveslot)
- lastliveslotseqn = seqn;
-
- if (!prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for(Entry liveentry:liveentries) {
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else {
- skipcount++;
- if (skipcount > SKIP_THRESHOLD)
- break search;
- }
- }
- }
-
- int max=0;
- if (resize)
- max = newsize;
- Slot[] array=cloud.putSlot(s, max);
- if (array == null) {
- array = new Slot[] {s};
- rejectedmessagelist.clear();
- } else {
- if (array.length == 0)
- throw new Error("Server Error: Did not send any slots");
- rejectedmessagelist.add(s.getSequenceNumber());
- insertedkv=false;
- }
-
- /* update data structure */
- validateandupdate(array, true);
-
- return insertedkv;
- }
-
- private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
- /* The cloud communication layer has checked slot HMACs already
- before decoding */
- if (newslots.length==0) return;
-
- long firstseqnum=newslots[0].getSequenceNumber();
- if (firstseqnum <= sequencenumber)
- throw new Error("Server Error: Sent older slots!");
-
- SlotIndexer indexer = new SlotIndexer(newslots, buffer);
- checkHMACChain(indexer, newslots);
-
- HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet()); //
-
- initExpectedSize(firstseqnum);
- for(Slot slot: newslots) {
- processSlot(indexer, slot, acceptupdatestolocal, machineSet);
- updateExpectedSize();
- }
-
- /* If there is a gap, check to see if the server sent us everything. */
- if (firstseqnum != (sequencenumber+1)) {
- checkNumSlots(newslots.length);
- if (!machineSet.isEmpty())
- throw new Error("Missing record for machines: "+machineSet);
- }
-
- commitNewMaxSize();
-
- /* Commit new to slots. */
- for(Slot slot:newslots) {
- buffer.putSlot(slot);
- liveslotcount++;
- }
- sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
- }
-
- private int expectedsize, currmaxsize;
-
- private void checkNumSlots(int numslots) {
- if (numslots != expectedsize)
- throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
- }
-
- private void initExpectedSize(long firstsequencenumber) {
- long prevslots = firstsequencenumber;
- expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
- currmaxsize = numslots;
- }
-
- private void updateExpectedSize() {
- expectedsize++;
- if (expectedsize > currmaxsize)
- expectedsize = currmaxsize;
- }
-
- private void updateCurrMaxSize(int newmaxsize) {
- currmaxsize=newmaxsize;
- }
-
- private void commitNewMaxSize() {
- if (numslots != currmaxsize)
- buffer.resize(currmaxsize);
-
- numslots=currmaxsize;
- setResizeThreshold();
- }
-
- private void processEntry(KeyValue entry, SlotIndexer indexer) {
- IoTString key=entry.getKey();
- KeyValue oldvalue=table.get(key);
- if (oldvalue != null) {
- oldvalue.setDead();
- }
- table.put(key, entry);
- }
-
- private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
- updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
- }
-
- private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
- long oldseqnum=entry.getOldSeqNum();
- long newseqnum=entry.getNewSeqNum();
- boolean isequal=entry.getEqual();
- long machineid=entry.getMachineID();
- for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
- Slot slot=indexer.getSlot(seqnum);
- if (slot != null) {
- long slotmachineid=slot.getMachineID();
- if (isequal != (slotmachineid==machineid)) {
- throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
- }
- }
- }
-
- HashSet<Long> watchset=new HashSet<Long>();
- for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
- long entry_mid=lastmsg_entry.getKey();
- /* We've seen it, don't need to continue to watch. Our next
- * message will implicitly acknowledge it. */
- if (entry_mid == localmachineid)
- continue;
- Pair<Long, Liveness> v=lastmsg_entry.getValue();
- long entry_seqn=v.getFirst();
- if (entry_seqn < newseqnum) {
- addWatchList(entry_mid, entry);
- watchset.add(entry_mid);
- }
- }
- if (watchset.isEmpty())
- entry.setDead();
- else
- entry.setWatchSet(watchset);
- }
-
- private void addWatchList(long machineid, RejectedMessage entry) {
- HashSet<RejectedMessage> entries=watchlist.get(machineid);
- if (entries == null)
- watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
- entries.add(entry);
- }
-
- private void processEntry(TableStatus entry, SlotIndexer indexer) {
- int newnumslots=entry.getMaxSlots();
- updateCurrMaxSize(newnumslots);
- if (lastTableStatus != null)
- lastTableStatus.setDead();
- lastTableStatus = entry;
- }
-
- private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
- machineSet.remove(machineid);
-
- HashSet<RejectedMessage> watchset=watchlist.get(machineid);
- if (watchset != null) {
- for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
- RejectedMessage rm=rmit.next();
- if (rm.getNewSeqNum() <= seqnum) {
- /* Remove it from our watchlist */
- rmit.remove();
- /* Decrement machines that need to see this notification */
- rm.removeWatcher(machineid);
- }
- }
- }
-
- if (machineid == localmachineid) {
- /* Our own messages are immediately dead. */
- if (liveness instanceof LastMessage) {
- ((LastMessage)liveness).setDead();
- } else if (liveness instanceof Slot) {
- ((Slot)liveness).setDead();
- } else {
- throw new Error("Unrecognized type");
- }
- }
-
-
- Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
- if (lastmsgentry == null)
- return;
-
- long lastmsgseqnum = lastmsgentry.getFirst();
- Liveness lastentry = lastmsgentry.getSecond();
- if (machineid != localmachineid) {
- if (lastentry instanceof LastMessage) {
- ((LastMessage)lastentry).setDead();
- } else if (lastentry instanceof Slot) {
- ((Slot)lastentry).setDead();
- } else {
- throw new Error("Unrecognized type");
- }
- }
-
- if (machineid == localmachineid) {
- if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
- throw new Error("Server Error: Mismatch on local machine sequence number");
- } else {
- if (lastmsgseqnum > seqnum)
- throw new Error("Server Error: Rollback on remote machine sequence number");
- }
- }
-
- private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
- updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
- for(Entry entry : slot.getEntries()) {
- switch(entry.getType()) {
- case Entry.TypeKeyValue:
- processEntry((KeyValue)entry, indexer);
- break;
-
- case Entry.TypeLastMessage:
- processEntry((LastMessage)entry, indexer, machineSet);
- break;
-
- case Entry.TypeRejectedMessage:
- processEntry((RejectedMessage)entry, indexer);
- break;
-
- case Entry.TypeTableStatus:
- processEntry((TableStatus)entry, indexer);
- break;
-
- default:
- throw new Error("Unrecognized type: "+entry.getType());
- }
- }
- }
-
- private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
- for(int i=0; i < newslots.length; i++) {
- Slot currslot=newslots[i];
- Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
- if (prevslot != null &&
- !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
- throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
- }
- }
-}