Cleaned up git
[iotcloud.git] / src / java / iotcloud / Table.java
diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java
deleted file mode 100644 (file)
index 1d6259a..0000000
+++ /dev/null
@@ -1,476 +0,0 @@
-package iotcloud;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Iterator;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.Vector;
-import java.util.Random;
-
-/**
- * IoTTable data structure.  Provides client inferface.
- * @author Brian Demsky
- * @version 1.0
- */
-
-final public class Table {
-       private int numslots;   //number of slots stored in buffer
-
-       //table of key-value pairs
-       private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
-
-       // machine id -> (sequence number, Slot or LastMessage); records last message by each client
-       private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
-       // machine id -> ...
-       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
-       private Vector<Long> rejectedmessagelist=new Vector<Long>();
-       private SlotBuffer buffer;
-       private CloudComm cloud;
-       private long sequencenumber; //Largest sequence number a client has received
-       private long localmachineid;
-       private TableStatus lastTableStatus;
-       static final int FREE_SLOTS = 10; //number of slots that should be kept free
-       static final int SKIP_THRESHOLD = 10;
-       private long liveslotcount=0;
-       private int chance;
-       static final double RESIZE_MULTIPLE = 1.2;
-       static final double RESIZE_THRESHOLD = 0.75;
-       static final int REJECTED_THRESHOLD = 5;
-       private int resizethreshold;
-       private long lastliveslotseqn;  //smallest sequence number with a live entry
-       private Random random=new Random();
-       
-       public Table(String baseurl, String password, long _localmachineid) {
-               localmachineid=_localmachineid;
-               buffer = new SlotBuffer();
-               numslots = buffer.capacity();
-               setResizeThreshold();
-               sequencenumber = 0;
-               cloud=new CloudComm(this, baseurl, password);
-               lastliveslotseqn = 1;
-       }
-
-       public Table(CloudComm _cloud, long _localmachineid) {
-               localmachineid=_localmachineid;
-               buffer = new SlotBuffer();
-               numslots = buffer.capacity();
-               setResizeThreshold();
-               sequencenumber = 0;
-               cloud=_cloud;
-       }
-
-       public void rebuild() {
-               Slot[] newslots=cloud.getSlots(sequencenumber+1);
-               validateandupdate(newslots, true);
-       }
-
-       public void update() {
-               Slot[] newslots=cloud.getSlots(sequencenumber+1);
-
-               validateandupdate(newslots, false);
-       }
-
-       public IoTString get(IoTString key) {
-               KeyValue kv=table.get(key);
-               if (kv != null)
-                       return kv.getValue();
-               else
-                       return null;
-       }
-
-       public void initTable() {
-               cloud.setSalt();//Set the salt
-               Slot s=new Slot(this, 1, localmachineid);
-               TableStatus status=new TableStatus(s, numslots);
-               s.addEntry(status);
-               Slot[] array=cloud.putSlot(s, numslots);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       /* update data structure */
-                       validateandupdate(array, true);
-               } else {
-                       throw new Error("Error on initialization");
-               }
-       }
-
-       public String toString() {
-               return table.toString();
-       }
-       
-       public IoTString put(IoTString key, IoTString value) {
-               while(true) {
-                       KeyValue oldvalue=table.get(key);
-                       if (tryput(key, value, false)) {
-                               if (oldvalue==null)
-                                       return null;
-                               else
-                                       return oldvalue.getValue();
-                       }
-               }
-       }
-
-       void decrementLiveCount() {
-               liveslotcount--;
-       }
-       
-       
-       private void setResizeThreshold() {
-               int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
-               resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
-       }
-       
-       private boolean tryput(IoTString key, IoTString value, boolean resize) {
-               Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-               int newsize = 0;
-               if (liveslotcount > resizethreshold) { 
-                       resize=true; //Resize is forced
-               }
-               
-               if (resize) {
-                       newsize = (int) (numslots * RESIZE_MULTIPLE);
-                       TableStatus status=new TableStatus(s, newsize);
-                       s.addEntry(status);
-               }
-
-               if (! rejectedmessagelist.isEmpty()) {
-                       /* TODO: We should avoid generating a rejected message entry if
-                        * there is already a sufficient entry in the queue (e.g.,
-                        * equalsto value of true and same sequence number).  */
-
-                       long old_seqn=rejectedmessagelist.firstElement();
-                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
-                               long new_seqn=rejectedmessagelist.lastElement();
-                               RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
-                               s.addEntry(rm);
-                       } else {
-                               long prev_seqn = -1;
-                               int i=0;
-                               /* Go through list of missing messages */
-                               for(; i<rejectedmessagelist.size(); i++) {
-                                       long curr_seqn = rejectedmessagelist.get(i);
-                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                       if (s_msg != null)
-                                               break;
-                                       prev_seqn=curr_seqn;
-                               }
-                               /* Generate rejected message entry for missing messages */
-                               if (prev_seqn != -1) {
-                                       RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
-                                       s.addEntry(rm);
-                               }
-                               /* Generate rejected message entries for present messages */
-                               for(; i<rejectedmessagelist.size(); i++) {
-                                       long curr_seqn=rejectedmessagelist.get(i);
-                                       Slot s_msg=buffer.getSlot(curr_seqn);
-                                       long machineid=s_msg.getMachineID();
-                                       RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
-                                       s.addEntry(rm);
-                               }
-                       }
-               }
-               
-               long newestseqnum = buffer.getNewestSeqNum();
-               long oldestseqnum = buffer.getOldestSeqNum();
-               if (lastliveslotseqn < oldestseqnum)
-                       lastliveslotseqn = oldestseqnum;
-
-               long seqn = lastliveslotseqn;
-               boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
-               
-               for(; seqn < threshold; seqn++) {
-                       Slot prevslot=buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (! seenliveslot)
-                               lastliveslotseqn = seqn;
-
-                       if (! prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for(Entry liveentry:liveentries) {
-                               if (s.hasSpace(liveentry)) {
-                                       s.addEntry(liveentry);
-                               } else if (seqn==firstiffull) {   //if there's no space but the entry is about to fall off the queue
-                                       if (!resize) {
-                                               System.out.print("B"); //?
-                                               return tryput(key, value, true);
-                                       }
-                               }
-                       }
-               }
-
-               KeyValue kv=new KeyValue(s, key, value);
-               boolean insertedkv=false;
-               if (s.hasSpace(kv)) {
-                       s.addEntry(kv);
-                       insertedkv=true;
-               }
-
-               /* now go through live entries from least to greatest sequence number until
-                * either all live slots added, or the slot doesn't have enough room
-                * for SKIP_THRESHOLD consecutive entries*/
-               int skipcount=0;
-               search:
-               for(; seqn <= newestseqnum; seqn++) {
-                       Slot prevslot=buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (!seenliveslot)
-                               lastliveslotseqn = seqn;
-                       
-                       if (!prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for(Entry liveentry:liveentries) {
-                               if (s.hasSpace(liveentry))
-                                       s.addEntry(liveentry);
-                               else {
-                                       skipcount++;
-                                       if (skipcount > SKIP_THRESHOLD)
-                                               break search;
-                               }
-                       }
-               }
-
-               int max=0;
-               if (resize)
-                       max = newsize;
-               Slot[] array=cloud.putSlot(s, max);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       rejectedmessagelist.clear();
-               }       else {
-                       if (array.length == 0)
-                               throw new Error("Server Error: Did not send any slots");
-                       rejectedmessagelist.add(s.getSequenceNumber());
-                       insertedkv=false;
-               }
-               
-               /* update data structure */
-               validateandupdate(array, true);
-
-               return insertedkv;
-       }
-
-       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
-               /* The cloud communication layer has checked slot HMACs already
-                        before decoding */
-               if (newslots.length==0) return;
-
-               long firstseqnum=newslots[0].getSequenceNumber();
-               if (firstseqnum <= sequencenumber)
-                       throw new Error("Server Error: Sent older slots!");
-
-               SlotIndexer indexer = new SlotIndexer(newslots, buffer);
-               checkHMACChain(indexer, newslots);
-
-               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());  //
-
-               initExpectedSize(firstseqnum);
-               for(Slot slot: newslots) {
-                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
-                       updateExpectedSize();
-               }
-
-               /* If there is a gap, check to see if the server sent us everything. */
-               if (firstseqnum != (sequencenumber+1)) {
-                       checkNumSlots(newslots.length);
-                       if (!machineSet.isEmpty())
-                               throw new Error("Missing record for machines: "+machineSet);
-               }
-
-               commitNewMaxSize();
-
-               /* Commit new to slots. */
-               for(Slot slot:newslots) {
-                       buffer.putSlot(slot);
-                       liveslotcount++;
-               }
-               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
-       }
-
-       private int expectedsize, currmaxsize;
-
-       private void checkNumSlots(int numslots) {
-               if (numslots != expectedsize)
-                       throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
-       }
-
-       private void initExpectedSize(long firstsequencenumber) {
-               long prevslots = firstsequencenumber;
-               expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
-               currmaxsize = numslots;
-       }
-
-       private void updateExpectedSize() {
-               expectedsize++;
-               if (expectedsize > currmaxsize)
-                       expectedsize = currmaxsize;
-       }
-
-       private void updateCurrMaxSize(int newmaxsize) {
-               currmaxsize=newmaxsize;
-       }
-
-       private void commitNewMaxSize() {
-               if (numslots != currmaxsize)
-                       buffer.resize(currmaxsize);
-
-               numslots=currmaxsize;
-               setResizeThreshold();
-       }
-
-       private void processEntry(KeyValue entry, SlotIndexer indexer) {
-               IoTString key=entry.getKey();
-               KeyValue oldvalue=table.get(key);
-               if (oldvalue != null) {
-                       oldvalue.setDead();
-               }
-               table.put(key, entry);
-       }
-
-       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
-       }
-
-       private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
-               long oldseqnum=entry.getOldSeqNum();
-               long newseqnum=entry.getNewSeqNum();
-               boolean isequal=entry.getEqual();
-               long machineid=entry.getMachineID();
-               for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
-                       Slot slot=indexer.getSlot(seqnum);
-                       if (slot != null) {
-                               long slotmachineid=slot.getMachineID();
-                               if (isequal != (slotmachineid==machineid)) {
-                                       throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
-                               }
-                       }
-               }
-
-               HashSet<Long> watchset=new HashSet<Long>();
-               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
-                       long entry_mid=lastmsg_entry.getKey();
-                       /* We've seen it, don't need to continue to watch.  Our next
-                        * message will implicitly acknowledge it. */
-                       if (entry_mid == localmachineid)
-                               continue;
-                       Pair<Long, Liveness> v=lastmsg_entry.getValue();
-                       long entry_seqn=v.getFirst();
-                       if (entry_seqn < newseqnum) {
-                               addWatchList(entry_mid, entry);
-                               watchset.add(entry_mid);
-                       }
-               }
-               if (watchset.isEmpty())
-                       entry.setDead();
-               else
-                       entry.setWatchSet(watchset);
-       }
-
-       private void addWatchList(long machineid, RejectedMessage entry) {
-               HashSet<RejectedMessage> entries=watchlist.get(machineid);
-               if (entries == null)
-                       watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
-               entries.add(entry);
-       }
-
-       private void processEntry(TableStatus entry, SlotIndexer indexer) {
-               int newnumslots=entry.getMaxSlots();
-               updateCurrMaxSize(newnumslots);
-               if (lastTableStatus != null)
-                       lastTableStatus.setDead();
-               lastTableStatus = entry;
-       }
-
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
-               machineSet.remove(machineid);
-
-               HashSet<RejectedMessage> watchset=watchlist.get(machineid);
-               if (watchset != null) {
-                       for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
-                               RejectedMessage rm=rmit.next();
-                               if (rm.getNewSeqNum() <= seqnum) {
-                                       /* Remove it from our watchlist */
-                                       rmit.remove();
-                                       /* Decrement machines that need to see this notification */
-                                       rm.removeWatcher(machineid);
-                               }
-                       }
-               }
-               
-               if (machineid == localmachineid) {
-                       /* Our own messages are immediately dead. */
-                       if (liveness instanceof LastMessage) {
-                               ((LastMessage)liveness).setDead();
-                       } else if (liveness instanceof Slot) {
-                               ((Slot)liveness).setDead();
-                       } else {
-                               throw new Error("Unrecognized type");
-                       }
-               }
-               
-               
-               Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
-               if (lastmsgentry == null)
-                       return;
-
-               long lastmsgseqnum = lastmsgentry.getFirst();
-               Liveness lastentry = lastmsgentry.getSecond();
-               if (machineid != localmachineid) {
-                       if (lastentry instanceof LastMessage) {
-                               ((LastMessage)lastentry).setDead();
-                       } else if (lastentry instanceof Slot) {
-                               ((Slot)lastentry).setDead();
-                       } else {
-                               throw new Error("Unrecognized type");
-                       }
-               }
-               
-               if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
-                               throw new Error("Server Error: Mismatch on local machine sequence number");
-               } else {
-                       if (lastmsgseqnum > seqnum)
-                               throw new Error("Server Error: Rollback on remote machine sequence number");
-               }
-       }
-
-       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
-               for(Entry entry : slot.getEntries()) {
-                       switch(entry.getType()) {
-                       case Entry.TypeKeyValue:
-                               processEntry((KeyValue)entry, indexer);
-                               break;
-
-                       case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer, machineSet);
-                               break;
-
-                       case Entry.TypeRejectedMessage:
-                               processEntry((RejectedMessage)entry, indexer);
-                               break;
-
-                       case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry, indexer);
-                               break;
-
-                       default:
-                               throw new Error("Unrecognized type: "+entry.getType());
-                       }
-               }
-       }
-
-       private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
-               for(int i=0; i < newslots.length; i++) {
-                       Slot currslot=newslots[i];
-                       Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
-                       if (prevslot != null &&
-                                       !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
-                               throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
-               }
-       }
-}