bug fixes
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9
10 /**
11  * IoTTable data structure.  Provides client inferface.
12  * @author Brian Demsky
13  * @version 1.0
14  */
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private SlotBuffer buffer;
22         private CloudComm cloud;
23         private long sequencenumber;
24         private long localmachineid;
25         private TableStatus lastTableStatus;
26         static final int FREE_SLOTS = 10;
27         static final int SKIP_THRESHOLD = 10;
28         private long liveslotcount=0;
29         private int chance;
30         static final double RESIZE_MULTIPLE = 1.2;
31         static final double RESIZE_THRESHOLD = 0.75;
32         private int resizethreshold;
33         private long lastliveslotseqn;
34         private Random random=new Random();
35         
36         public Table(String baseurl, String password, long _localmachineid) {
37                 localmachineid=_localmachineid;
38                 buffer = new SlotBuffer();
39                 numslots = buffer.capacity();
40                 setResizeThreshold();
41                 sequencenumber = 0;
42                 cloud=new CloudComm(this, baseurl, password);
43                 lastliveslotseqn = 1;
44         }
45
46         public Table(CloudComm _cloud, long _localmachineid) {
47                 localmachineid=_localmachineid;
48                 buffer = new SlotBuffer();
49                 numslots = buffer.capacity();
50                 setResizeThreshold();
51                 sequencenumber = 0;
52                 cloud=_cloud;
53         }
54
55         public void rebuild() {
56                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
57                 validateandupdate(newslots, true);
58         }
59
60         public void update() {
61                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
62
63                 validateandupdate(newslots, false);
64         }
65
66         public IoTString get(IoTString key) {
67                 KeyValue kv=table.get(key);
68                 if (kv != null)
69                         return kv.getValue();
70                 else
71                         return null;
72         }
73
74         public void initTable() {
75                 cloud.setSalt();//Set the salt
76                 Slot s=new Slot(this, 1, localmachineid);
77                 TableStatus status=new TableStatus(s, numslots);
78                 s.addEntry(status);
79                 Slot[] array=cloud.putSlot(s, numslots);
80                 if (array == null) {
81                         array = new Slot[] {s};
82                         /* update data structure */
83                         validateandupdate(array, true);
84                 } else {
85                         throw new Error("Error on initialization");
86                 }
87         }
88
89         public String toString() {
90                 return table.toString();
91         }
92         
93         public IoTString put(IoTString key, IoTString value) {
94                 while(true) {
95                         KeyValue oldvalue=table.get(key);
96                         if (tryput(key, value, false)) {
97                                 if (oldvalue==null)
98                                         return null;
99                                 else
100                                         return oldvalue.getValue();
101                         }
102                 }
103         }
104
105         void decrementLiveCount() {
106                 liveslotcount--;
107         }
108         
109         private void setResizeThreshold() {
110                 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
111                 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
112         }
113         
114         private boolean tryput(IoTString key, IoTString value, boolean resize) {
115                 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
116                 int newsize = 0;
117                 if (liveslotcount > resizethreshold) {
118                         System.out.print("A");
119                         resize=true;
120                         newsize = (int) (numslots * RESIZE_MULTIPLE);
121                 }
122                 
123                 if (resize) {
124                         newsize = (int) (numslots * RESIZE_MULTIPLE);
125                         TableStatus status=new TableStatus(s, newsize);
126                         s.addEntry(status);
127                 }
128
129                 long newestseqnum = buffer.getNewestSeqNum();
130                 long oldestseqnum = buffer.getOldestSeqNum();
131                 if (lastliveslotseqn < oldestseqnum)
132                         lastliveslotseqn = oldestseqnum;
133
134                 long seqn = lastliveslotseqn;
135                 boolean seenliveslot = false;
136                 long firstiffull = newestseqnum + 1 - numslots;
137                 long threshold = firstiffull + FREE_SLOTS;
138                 
139                 for(; seqn < threshold; seqn++) {
140                         Slot prevslot=buffer.getSlot(seqn);
141                         //Push slot number forward
142                         if (!seenliveslot)
143                                 lastliveslotseqn = seqn;
144
145                         if (!prevslot.isLive())
146                                 continue;
147                         seenliveslot = true;
148                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
149                         for(Entry liveentry:liveentries) {
150                                 if (s.hasSpace(liveentry)) {
151                                         s.addEntry(liveentry);
152                                 } else if (seqn==firstiffull) {
153                                         if (!resize) {
154                                                 System.out.print("B");
155                                                 return tryput(key, value, true);
156                                         }
157                                 }
158                         }
159                 }
160
161                 KeyValue kv=new KeyValue(s, key, value);
162                 boolean insertedkv=false;
163                 if (s.hasSpace(kv)) {
164                         s.addEntry(kv);
165                         insertedkv=true;
166                 }
167
168                 int skipcount=0;
169                 search:
170                 for(; seqn <= newestseqnum; seqn++) {
171                         Slot prevslot=buffer.getSlot(seqn);
172                         //Push slot number forward
173                         if (!seenliveslot)
174                                 lastliveslotseqn = seqn;
175                         
176                         if (!prevslot.isLive())
177                                 continue;
178                         seenliveslot = true;
179                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
180                         for(Entry liveentry:liveentries) {
181                                 if (s.hasSpace(liveentry))
182                                         s.addEntry(liveentry);
183                                 else {
184                                         skipcount++;
185                                         if (skipcount > SKIP_THRESHOLD)
186                                                 break search;
187                                 }
188                         }
189                 }
190
191                 int max=0;
192                 if (resize)
193                         max = newsize;
194                 Slot[] array=cloud.putSlot(s, max);
195                 if (array == null)
196                         array = new Slot[] {s};
197                 else
198                         insertedkv=false;
199
200                 /* update data structure */
201                 validateandupdate(array, true);
202
203                 return insertedkv;
204         }
205
206         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
207                 /* The cloud communication layer has checked slot HMACs already
208                          before decoding */
209                 if (newslots.length==0)
210                         return;
211
212                 long firstseqnum=newslots[0].getSequenceNumber();
213                 if (firstseqnum <= sequencenumber)
214                         throw new Error("Server Error: Sent older slots!");
215
216                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
217                 checkHMACChain(indexer, newslots);
218
219                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
220
221                 initExpectedSize(firstseqnum);
222                 for(Slot slot: newslots) {
223                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
224                         updateExpectedSize();
225                 }
226
227                 /* If there is a gap, check to see if the server sent us everything. */
228                 if (firstseqnum != (sequencenumber+1)) {
229                         checkNumSlots(newslots.length);
230                         if (!machineSet.isEmpty())
231                                 throw new Error("Missing record for machines: "+machineSet);
232                 }
233
234                 commitNewMaxSize();
235
236                 /* Commit new to slots. */
237                 for(Slot slot:newslots) {
238                         buffer.putSlot(slot);
239                         liveslotcount++;
240                 }
241                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
242         }
243
244         private int expectedsize, currmaxsize;
245
246         private void checkNumSlots(int numslots) {
247                 if (numslots != expectedsize)
248                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
249         }
250
251         private void initExpectedSize(long firstsequencenumber) {
252                 long prevslots = firstsequencenumber;
253                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
254                 currmaxsize = numslots;
255         }
256
257         private void updateExpectedSize() {
258                 expectedsize++;
259                 if (expectedsize > currmaxsize)
260                         expectedsize = currmaxsize;
261         }
262
263         private void updateCurrMaxSize(int newmaxsize) {
264                 currmaxsize=newmaxsize;
265         }
266
267         private void commitNewMaxSize() {
268                 if (numslots != currmaxsize)
269                         buffer.resize(currmaxsize);
270
271                 numslots=currmaxsize;
272                 setResizeThreshold();
273         }
274
275         private void processEntry(KeyValue entry, SlotIndexer indexer) {
276                 IoTString key=entry.getKey();
277                 KeyValue oldvalue=table.get(key);
278                 if (oldvalue != null) {
279                         oldvalue.setDead();
280                 }
281                 table.put(key, entry);
282         }
283
284         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
285                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
286         }
287
288         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
289                 long oldseqnum=entry.getOldSeqNum();
290                 long newseqnum=entry.getNewSeqNum();
291                 boolean isequal=entry.getEqual();
292                 long machineid=entry.getMachineID();
293                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
294                         Slot slot=indexer.getSlot(seqnum);
295                         if (slot != null) {
296                                 long slotmachineid=slot.getMachineID();
297                                 if (isequal!=(slotmachineid==machineid)) {
298                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
299                                 }
300                         }
301                 }
302
303                 HashSet<Long> watchset=new HashSet<Long>();
304                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
305                         long entry_mid=lastmsg_entry.getKey();
306                         /* We've seen it, don't need to continue to watch.  Our next
307                          * message will implicitly acknowledge it. */
308                         if (entry_mid == localmachineid)
309                                 continue;
310                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
311                         long entry_seqn=v.getFirst();
312                         if (entry_seqn < newseqnum) {
313                                 addWatchList(entry_mid, entry);
314                                 watchset.add(entry_mid);
315                         }
316                 }
317                 if (watchset.isEmpty())
318                         entry.setDead();
319                 else
320                         entry.setWatchSet(watchset);
321         }
322
323         private void addWatchList(long machineid, RejectedMessage entry) {
324                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
325                 if (entries == null)
326                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
327                 entries.add(entry);
328         }
329
330         private void processEntry(TableStatus entry, SlotIndexer indexer) {
331                 int newnumslots=entry.getMaxSlots();
332                 updateCurrMaxSize(newnumslots);
333                 if (lastTableStatus != null)
334                         lastTableStatus.setDead();
335                 lastTableStatus = entry;
336         }
337
338         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
339                 machineSet.remove(machineid);
340
341                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
342                 if (watchset != null) {
343                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
344                                 RejectedMessage rm=rmit.next();
345                                 if (rm.getNewSeqNum() <= seqnum) {
346                                         /* Remove it from our watchlist */
347                                         rmit.remove();
348                                         /* Decrement machines that need to see this notification */
349                                         rm.removeWatcher(machineid);
350                                 }
351                         }
352                 }
353                 
354                 if (machineid == localmachineid) {
355                         /* Our own messages are immediately dead. */
356                         if (liveness instanceof LastMessage) {
357                                 ((LastMessage)liveness).setDead();
358                         } else if (liveness instanceof Slot) {
359                                 ((Slot)liveness).setDead();
360                         } else {
361                                 throw new Error("Unrecognized type");
362                         }
363                 }
364                 
365                 
366                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
367                 if (lastmsgentry == null)
368                         return;
369
370                 long lastmsgseqnum = lastmsgentry.getFirst();
371                 Liveness lastentry = lastmsgentry.getSecond();
372                 if (machineid != localmachineid) {
373                         if (lastentry instanceof LastMessage) {
374                                 ((LastMessage)lastentry).setDead();
375                         } else if (lastentry instanceof Slot) {
376                                 ((Slot)lastentry).setDead();
377                         } else {
378                                 throw new Error("Unrecognized type");
379                         }
380                 }
381                 
382                 if (machineid == localmachineid) {
383                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
384                                 throw new Error("Server Error: Mismatch on local machine sequence number");
385                 } else {
386                         if (lastmsgseqnum > seqnum)
387                                 throw new Error("Server Error: Rollback on remote machine sequence number");
388                 }
389         }
390
391         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
392                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
393                 for(Entry entry : slot.getEntries()) {
394                         switch(entry.getType()) {
395                         case Entry.TypeKeyValue:
396                                 processEntry((KeyValue)entry, indexer);
397                                 break;
398
399                         case Entry.TypeLastMessage:
400                                 processEntry((LastMessage)entry, indexer, machineSet);
401                                 break;
402
403                         case Entry.TypeRejectedMessage:
404                                 processEntry((RejectedMessage)entry, indexer);
405                                 break;
406
407                         case Entry.TypeTableStatus:
408                                 processEntry((TableStatus)entry, indexer);
409                                 break;
410
411                         default:
412                                 throw new Error("Unrecognized type: "+entry.getType());
413                         }
414                 }
415         }
416
417         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
418                 for(int i=0; i < newslots.length; i++) {
419                         Slot currslot=newslots[i];
420                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
421                         if (prevslot != null &&
422                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
423                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
424                 }
425         }
426 }