forget rejected message list
[iotcloud.git] / src / java / iotcloud / Table.java
index 3afb1a9c061f378d2fc1df54291ee91f24f6e959..cf7095e42c15784277b95e9420792c0d655deded 100644 (file)
@@ -18,6 +18,7 @@ final public class Table {
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
        private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+       private Vector<Long> rejectedmessagelist=new Vector<Long>();
        private SlotBuffer buffer;
        private CloudComm cloud;
        private long sequencenumber;
@@ -29,6 +30,7 @@ final public class Table {
        private int chance;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
+       static final int REJECTED_THRESHOLD = 5;
        private int resizethreshold;
        private long lastliveslotseqn;
        private Random random=new Random();
@@ -125,6 +127,37 @@ final public class Table {
                        s.addEntry(status);
                }
 
+               if (!rejectedmessagelist.isEmpty()) {
+                       long old_seqn=rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn=rejectedmessagelist.lastElement();
+                               RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn=old_seqn;
+                               for(int i=0; i<rejectedmessagelist.size();i++) {
+                                       long curr_seqn=rejectedmessagelist.get(i);
+                                       Slot s_msg=buffer.getSlot(curr_seqn);
+                                       if (s_msg!=null) {
+                                               long machineid=s_msg.getMachineID();
+                                               RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                               s.addEntry(rm);
+                                               if (old_seqn != -1 && old_seqn != curr_seqn) {
+                                                       RejectedMessage rmprev=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                                       s.addEntry(rmprev);
+                                               }
+                                               old_seqn = -1;
+                                       } else {
+                                               prev_seqn=curr_seqn;
+                                       }
+                               }
+                               if (old_seqn != -1) {
+                                       RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+               
                long newestseqnum = buffer.getNewestSeqNum();
                long oldestseqnum = buffer.getOldestSeqNum();
                if (lastliveslotseqn < oldestseqnum)
@@ -191,11 +224,16 @@ final public class Table {
                if (resize)
                        max = newsize;
                Slot[] array=cloud.putSlot(s, max);
-               if (array == null)
+               if (array == null) {
                        array = new Slot[] {s};
-               else
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
                        insertedkv=false;
-
+               }
+               
                /* update data structure */
                validateandupdate(array, true);