edits
authorbdemsky <bdemsky@uci.edu>
Sun, 24 Jul 2016 06:58:15 +0000 (23:58 -0700)
committerbdemsky <bdemsky@uci.edu>
Sun, 24 Jul 2016 06:58:15 +0000 (23:58 -0700)
src/java/iotcloud/CloudComm.java
src/java/iotcloud/Pair.java
src/java/iotcloud/RejectedMessage.java
src/java/iotcloud/Slot.java
src/java/iotcloud/SlotBuffer.java
src/java/iotcloud/Table.java
src/java/iotcloud/TableStatus.java
src/java/iotcloud/Test.java
src/java/iotcloud/TestCloudComm.java
src/server/iotquery.cpp

index 6d421b5..84d7ae4 100644 (file)
@@ -12,7 +12,7 @@ class CloudComm {
 
        CloudComm() {
        }
-       
+
        CloudComm(String _baseurl, Cipher _encrypt, Cipher _decrypt, Mac _mac) {
                this.baseurl=_baseurl;
                this.encryptcipher = _encrypt;
index 5cbe1db..73ed6bd 100644 (file)
@@ -3,7 +3,7 @@ package iotcloud;
 class Pair<A,B> {
        private A a;
        private B b;
-  
+
        Pair(A a, B b) {
                this.a=a;
                this.b=b;
index bd52ed0..167521f 100644 (file)
@@ -3,9 +3,9 @@ import java.nio.ByteBuffer;
 
 class RejectedMessage extends Entry {
        private long machineid;
-       private long oldseqnum;                                                                                         //Oldest seqnum in range
-       private long newseqnum;                                                                                         //Newest seqnum in range (inclusive)
-       private boolean equalto;                                                                                                                        //Is message sent or not sent by machineid
+       private long oldseqnum;                                                                                                                                                                                                                                                                                                                                                                                         //Oldest seqnum in range
+       private long newseqnum;                                                                                                                                                                                                                                                                                                                                                                                         //Newest seqnum in range (inclusive)
+       private boolean equalto;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        //Is message sent or not sent by machineid
 
        RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
                super(slot);
index 115c761..ea30ddb 100644 (file)
@@ -17,7 +17,7 @@ class Slot implements Liveness {
        private int livecount;
        private boolean seqnumlive;
        private int freespace;
-       
+
        Slot(long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
                seqnum=_seqnum;
                machineid=_machineid;
@@ -60,7 +60,7 @@ class Slot implements Liveness {
                int newfreespace = freespace - e.getSize();
                return newfreespace >= 0;
        }
-       
+
        Vector<Entry> getEntries() {
                return entries;
        }
@@ -92,7 +92,7 @@ class Slot implements Liveness {
        byte[] encode(Mac mac) {
                byte[] array=new byte[SLOT_SIZE];
                ByteBuffer bb=ByteBuffer.wrap(array);
-               bb.position(HMAC_SIZE);                                                                                                                                                                                                                                                 //Leave space for the HMACs
+               bb.position(HMAC_SIZE);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 //Leave space for the HMACs
                bb.put(prevhmac);
                bb.putLong(seqnum);
                bb.putLong(machineid);
@@ -111,7 +111,7 @@ class Slot implements Liveness {
        int getBaseSize() {
                return 2*HMAC_SIZE+2*Long.BYTES+Integer.BYTES;
        }
-       
+
        Vector<Entry> getLiveEntries() {
                Vector<Entry> liveEntries=new Vector<Entry>();
                for(Entry entry: entries)
@@ -120,10 +120,10 @@ class Slot implements Liveness {
 
                if (seqnumlive)
                        liveEntries.add(new LastMessage(this, machineid, seqnum));
-               
+
                return liveEntries;
        }
-       
+
        long getSequenceNumber() {
                return seqnum;
        }
@@ -140,7 +140,7 @@ class Slot implements Liveness {
                decrementLiveCount();
                seqnumlive=false;
        }
-       
+
        void decrementLiveCount() {
                livecount--;
        }
index 054d4fc..3d5a465 100644 (file)
@@ -23,7 +23,7 @@ class SlotBuffer {
        int capacity() {
                return array.length - 1;
        }
-       
+
        void resize(int newsize) {
                if (newsize == (array.length-1))
                        return;
@@ -40,20 +40,27 @@ class SlotBuffer {
                head = 0;
        }
 
-       void putSlot(Slot s) {
-               array[head]=s;
+       private void incrementHead() {
                head++;
                if (head >= array.length)
                        head=0;
-               
+       }
+
+       private void incrementTail() {
+               tail++;
+               if (tail >= array.length)
+                       tail=0;
+       }
+
+       void putSlot(Slot s) {
+               array[head]=s;
+               incrementHead();
+
                if (oldestseqn==0)
                        oldestseqn = s.getSequenceNumber();
 
                if (head==tail) {
-                       tail++;
-                       if (tail >= array.length)
-                               tail=0;
-                       
+                       incrementTail();
                        oldestseqn++;
                }
        }
@@ -61,24 +68,18 @@ class SlotBuffer {
        Slot getSlot(long seqnum) {
                int diff=(int) (seqnum-oldestseqn);
                int index=diff + tail;
-               if (index > array.length) {
+               if (index >= array.length) {
                        if (head >= tail)
                                return null;
                        index-= array.length;
                }
 
-               if (index >= array.length ||
-                               index >= head)
+               if (index >= array.length)
                        return null;
-               if (index < 0) {
-                       System.out.println("seqnum="+seqnum);
-                       System.out.println("olestseqn="+oldestseqn);
-                       System.out.println("diff="+diff);
-                       System.out.println("tail="+tail);
 
+               if (head >= tail && index >= head)
+                       return null;
 
-               }
-               
                return array[index];
        }
 
index 72b4927..c92aef9 100644 (file)
@@ -14,10 +14,10 @@ final public class Table {
        private Mac hmac;
        private long sequencenumber;
        private long localmachineid;
-  private TableStatus lastTableStatus;
-  static final int FREE_SLOTS = 10;
-  static final int FORCED_RESIZE_INCREMENT = 20;
-  
+       private TableStatus lastTableStatus;
+       static final int FREE_SLOTS = 10;
+       static final int FORCED_RESIZE_INCREMENT = 20;
+
        public Table(String baseurl, String password, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
@@ -33,7 +33,7 @@ final public class Table {
                sequencenumber = 0;
                cloud=_cloud;
        }
-       
+
        private void initCloud(String baseurl, String password) {
                try {
                        SecretKeySpec secret=getKey(password);
@@ -62,6 +62,7 @@ final public class Table {
 
        public void update() {
                Slot[] newslots=cloud.getSlots(sequencenumber+1);
+
                validateandupdate(newslots, false);
        }
 
@@ -77,96 +78,96 @@ final public class Table {
                Slot s=new Slot(1, localmachineid);
                TableStatus status=new TableStatus(s, numslots);
                s.addEntry(status);
-    Slot[] array=cloud.putSlot(s, numslots);
-    if (array == null) {
-      array = new Slot[] {s};
-                       validateandupdate(array, true); // update data structure
+               Slot[] array=cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       validateandupdate(array, true);                                                                         // update data structure
                } else {
                        throw new Error("Error on initialization");
                }
        }
-       
+
        public IoTString put(IoTString key, IoTString value) {
-    while(true) {
-      KeyValue oldvalue=table.get(key);
-      if (tryput(key, value, false)) {
+               while(true) {
+                       KeyValue oldvalue=table.get(key);
+                       if (tryput(key, value, false)) {
                                if (oldvalue==null)
                                        return null;
                                else
                                        return oldvalue.getValue();
-      }
-    }
-  }
+                       }
+               }
+       }
 
-  private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
+       private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
                Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-    long seqn = buffer.getOldestSeqNum();
+               long seqn = buffer.getOldestSeqNum();
 
                if (forcedresize) {
                        TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
                        s.addEntry(status);
                }
-               
-    if ((numslots - buffer.size()) < FREE_SLOTS) {
-      //have to check whether we have enough free slots
-      long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
-                       seqn = fullfirstseqn < 1 ? 1: fullfirstseqn;
-      for(int i=0; i < FREE_SLOTS; i++, seqn++) {
-        Slot prevslot=buffer.getSlot(seqn);
-        if (!prevslot.isLive())
-          continue;
-        Vector<Entry> liveentries = prevslot.getLiveEntries();
-        for(Entry liveentry:liveentries) {
+
+               if ((numslots - buffer.size()) < FREE_SLOTS) {
+                       //have to check whether we have enough free slots
+                       long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
+                       seqn = fullfirstseqn < 1?1:fullfirstseqn;
+                       for(int i=0; i < FREE_SLOTS; i++, seqn++) {
+                               Slot prevslot=buffer.getSlot(seqn);
+                               if (!prevslot.isLive())
+                                       continue;
+                               Vector<Entry> liveentries = prevslot.getLiveEntries();
+                               for(Entry liveentry:liveentries) {
                                        if (redundant(liveentry))
                                                continue;
-          if (s.hasSpace(liveentry))
-            s.addEntry(liveentry);
-          else if (i==0) {
-            if (s.canFit(liveentry))
-              s.addEntry(liveentry);
-            else if (!forcedresize) {
-              return tryput(key, value, true);
+                                       if (s.hasSpace(liveentry))
+                                               s.addEntry(liveentry);
+                                       else if (i==0) {
+                                               if (s.canFit(liveentry))
+                                                       s.addEntry(liveentry);
+                                               else if (!forcedresize) {
+                                                       return tryput(key, value, true);
                                                }
                                        }
-        }
-      }
-    }
-    KeyValue kv=new KeyValue(s, key, value);
-    boolean insertedkv=false;
-    if (s.hasSpace(kv)) {
-      s.addEntry(kv);
-      insertedkv=true;
-    }
-
-    long newestseqnum=buffer.getNewestSeqNum();
-    search:
-    for(;seqn<=newestseqnum;seqn++) {
-      Slot prevslot=buffer.getSlot(seqn);
-      if (!prevslot.isLive())
-        continue;
-      Vector<Entry> liveentries = prevslot.getLiveEntries();
-      for(Entry liveentry:liveentries) {
+                               }
+                       }
+               }
+               KeyValue kv=new KeyValue(s, key, value);
+               boolean insertedkv=false;
+               if (s.hasSpace(kv)) {
+                       s.addEntry(kv);
+                       insertedkv=true;
+               }
+
+               long newestseqnum=buffer.getNewestSeqNum();
+search:
+               for(; seqn<=newestseqnum; seqn++) {
+                       Slot prevslot=buffer.getSlot(seqn);
+                       if (!prevslot.isLive())
+                               continue;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries();
+                       for(Entry liveentry:liveentries) {
                                if (redundant(liveentry))
                                        continue;
-        if (s.hasSpace(liveentry))
-          s.addEntry(liveentry);
-        else
-          break search;
-      }
-    }
-    
-    int max=0;
-    if (forcedresize)
-      max = numslots + FORCED_RESIZE_INCREMENT;
-    Slot[] array=cloud.putSlot(s, max);
-    if (array == null)
-      array = new Slot[] {s};
-    else
-      insertedkv=false;
-    
-               validateandupdate(array, true); // update data structure
-    
-    return insertedkv;
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else
+                                       break search;
+                       }
+               }
+
+               int max=0;
+               if (forcedresize)
+                       max = numslots + FORCED_RESIZE_INCREMENT;
+               Slot[] array=cloud.putSlot(s, max);
+               if (array == null)
+                       array = new Slot[] {s};
+               else
+                       insertedkv=false;
+
+               validateandupdate(array, true);                                                 // update data structure
+
+               return insertedkv;
        }
 
        boolean redundant(Entry liveentry) {
@@ -177,7 +178,7 @@ final public class Table {
                return false;
        }
 
-       
+
        private void validateandupdate(Slot[] newslots, boolean isput) {
                //The cloud communication layer has checked slot HMACs already
                //before decoding
@@ -191,55 +192,55 @@ final public class Table {
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
-    initExpectedSize();
-    for(Slot slot: newslots) {
-      updateExpectedSize();
+               initExpectedSize();
+               for(Slot slot: newslots) {
+                       updateExpectedSize();
                        processSlot(indexer, slot, isput);
                }
 
                //If there is a gap, check to see if the server sent us everything
                if (firstseqnum != (sequencenumber+1))
                        checkNumSlots(newslots.length);
-               
-    commitNewMaxSize();
 
-    //commit new to slots
-    for(Slot slot:newslots) {
-      buffer.putSlot(slot);
-    }
+               commitNewMaxSize();
+
+               //commit new to slots
+               for(Slot slot:newslots) {
+                       buffer.putSlot(slot);
+               }
                sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
        }
 
-  private int expectedsize, currmaxsize;
+       private int expectedsize, currmaxsize;
+
+       private void checkNumSlots(int numslots) {
+               if (numslots != expectedsize)
+                       throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
+       }
 
-  private void checkNumSlots(int numslots) {
-    if (numslots != expectedsize)
-      throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
-  }
-  
-  private void initExpectedSize() {
+       private void initExpectedSize() {
                long prevslots = sequencenumber;
-    expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
-    currmaxsize = numslots;
-  }
-
-  private void updateExpectedSize() {
-    expectedsize++;
-    if (expectedsize > currmaxsize)
-      expectedsize = currmaxsize;
-  }
-
-  private void updateCurrMaxSize(int newmaxsize) {
-    currmaxsize=newmaxsize;
-  }
-
-  private void commitNewMaxSize() {
-    if (numslots != currmaxsize)
-      buffer.resize(currmaxsize);
-
-    numslots=currmaxsize;
-  }
-  
+               expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+               currmaxsize = numslots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currmaxsize)
+                       expectedsize = currmaxsize;
+       }
+
+       private void updateCurrMaxSize(int newmaxsize) {
+               currmaxsize=newmaxsize;
+       }
+
+       private void commitNewMaxSize() {
+               if (numslots != currmaxsize)
+                       buffer.resize(currmaxsize);
+
+               numslots=currmaxsize;
+       }
+
        private void processEntry(KeyValue entry, SlotIndexer indexer) {
                IoTString key=entry.getKey();
                KeyValue oldvalue=table.get(key);
@@ -270,11 +271,11 @@ final public class Table {
        }
 
        private void processEntry(TableStatus entry, SlotIndexer indexer) {
-    int newnumslots=entry.getMaxSlots();
-    updateCurrMaxSize(newnumslots);
-    if (lastTableStatus != null)
-      lastTableStatus.setDead();
-    lastTableStatus = entry;
+               int newnumslots=entry.getMaxSlots();
+               updateCurrMaxSize(newnumslots);
+               if (lastTableStatus != null)
+                       lastTableStatus.setDead();
+               lastTableStatus = entry;
        }
 
        private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
@@ -303,7 +304,7 @@ final public class Table {
 
        private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
                updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
-    
+
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
index 07f282c..fb50a7d 100644 (file)
@@ -12,7 +12,7 @@ class TableStatus extends Entry {
        int getMaxSlots() {
                return maxslots;
        }
-  
+
        static Entry decode(Slot slot, ByteBuffer bb) {
                int maxslots=bb.getInt();
                return new TableStatus(slot, maxslots);
index 97057dd..101ca77 100644 (file)
@@ -2,12 +2,38 @@ package iotcloud;
 
 public class Test {
        public static void main(String[] args) {
+               if (args[0].equals("1"))
+                       test1();
+               else if(args[0].equals("2"))
+                       test2();
+
+       }
+
+       static void test2() {
+               Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+               t1.initTable();
+               Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+               t2.update();
+               for(int i=0; i<600; i++) {
+                       String a="STR"+i;
+                       String b="ABR"+i;
+                       IoTString ia=new IoTString(a);
+                       IoTString ib=new IoTString(b);
+                       t1.put(ia, ia);
+                       t2.put(ib, ib);
+                       t1.update();
+                       System.out.println(ib+"->"+t1.get(ib));
+                       System.out.println(ia+"->"+t2.get(ia));
+               }
+       }
+
+       static void test1() {
                TestCloudComm cc=new TestCloudComm();
                Table t1=new Table(cc, 6513);
                t1.initTable();
                Table t2=new Table(cc, 6512);
                t2.update();
-               for(int i=0;i<100;i++) {
+               for(int i=0; i<600; i++) {
                        String a="STR"+i;
                        String b="ABR"+i;
                        IoTString ia=new IoTString(a);
@@ -18,5 +44,16 @@ public class Test {
                        System.out.println(ib+"->"+t1.get(ib));
                        System.out.println(ia+"->"+t2.get(ia));
                }
+               for(int i=0; i<600; i++) {
+                       String a="STR"+i;
+                       String b="ABR"+i;
+                       IoTString ia=new IoTString(a);
+                       IoTString ib=new IoTString(b);
+                       System.out.println(ib+"->"+t1.get(ib));
+                       System.out.println(ia+"->"+t2.get(ia));
+                       System.out.println(ib+"->"+t2.get(ib));
+                       System.out.println(ia+"->"+t1.get(ia));
+               }
+
        }
 }
index 6c18314..b668237 100644 (file)
@@ -29,7 +29,7 @@ class TestCloudComm extends CloudComm {
                        sequencenumber=oldestseqnum;
                int numslots=(int)((newestseqnum - sequencenumber)+1);
                Slot[] slots=new Slot[numslots];
-               for(int i=0;i<numslots;i++,sequencenumber++)
+               for(int i=0; i<numslots; i++,sequencenumber++)
                        slots[i]=buffer.getSlot(sequencenumber);
                return slots;
        }
index e865468..faec3b5 100644 (file)
@@ -144,7 +144,7 @@ void IoTQuery::getSlot() {
                delete filename;
        }
        const char header[]="getslot";
-       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;                                                                                                                         //header + payload + file count + sizes
+       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         //header + payload + file count + sizes
        char * response = new char[size];
        long long offset=0;
        memcpy(response, header, sizeof(header)-1);
@@ -196,8 +196,8 @@ void IoTQuery::putSlot() {
        doWrite(slotfd, data, length);
        close(slotfd);
        delete filename;
-       newestentry = requestsequencenumber;                                                                                                    // update sequence number
-       updateStatusFile();                                                                                                                                             // update counts
+       newestentry = requestsequencenumber;                                                                                                                                                                                                                                                                                                                                                                                                                                    // update sequence number
+       updateStatusFile();                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             // update counts
        char command[]="putslot";
        sendResponse(command, sizeof(command)-1);
 }
@@ -211,7 +211,7 @@ void IoTQuery::sendResponse(char * bytes, int len) {
 
 char * IoTQuery::getSlotFileName(long long slot) {
        int directorylen=strlen(directory);
-       char * filename=new char[25+directorylen];                                                                                                              //19 digits for long number + 4 characters for SLOT + 1 character for null termination
+       char * filename=new char[25+directorylen];                                                                                                                                                                                                                                                                                                                                                                                                                                                                              //19 digits for long number + 4 characters for SLOT + 1 character for null termination
        snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
        return filename;
 }