Adding missing instructions to load missing Apache2 modules for Fidelius to work!
[iotcloud.git] / version1 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9
10 /**
11  * IoTTable data structure.  Provides client inferface.
12  * @author Brian Demsky
13  * @version 1.0
14  */
15
16 final public class Table {
17         private int numslots;   //number of slots stored in buffer
18
19         //table of key-value pairs
20         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
21
22         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
23         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
24         // machine id -> ...
25         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
26         private Vector<Long> rejectedmessagelist=new Vector<Long>();
27         private SlotBuffer buffer;
28         private CloudComm cloud;
29         private long sequencenumber; //Largest sequence number a client has received
30         private long localmachineid;
31         private TableStatus lastTableStatus;
32         static final int FREE_SLOTS = 10; //number of slots that should be kept free
33         static final int SKIP_THRESHOLD = 10;
34         private long liveslotcount=0;
35         private int chance;
36         static final double RESIZE_MULTIPLE = 1.2;
37         static final double RESIZE_THRESHOLD = 0.75;
38         static final int REJECTED_THRESHOLD = 5;
39         private int resizethreshold;
40         private long lastliveslotseqn;  //smallest sequence number with a live entry
41         private Random random=new Random();
42         
43         public Table(String baseurl, String password, long _localmachineid) {
44                 localmachineid=_localmachineid;
45                 buffer = new SlotBuffer();
46                 numslots = buffer.capacity();
47                 setResizeThreshold();
48                 sequencenumber = 0;
49                 cloud=new CloudComm(this, baseurl, password);
50                 lastliveslotseqn = 1;
51         }
52
53         public Table(CloudComm _cloud, long _localmachineid) {
54                 localmachineid=_localmachineid;
55                 buffer = new SlotBuffer();
56                 numslots = buffer.capacity();
57                 setResizeThreshold();
58                 sequencenumber = 0;
59                 cloud=_cloud;
60         }
61
62         public void rebuild() {
63                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
64                 validateandupdate(newslots, true);
65         }
66
67         public void update() {
68                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
69
70                 validateandupdate(newslots, false);
71         }
72
73         public IoTString get(IoTString key) {
74                 KeyValue kv=table.get(key);
75                 if (kv != null)
76                         return kv.getValue();
77                 else
78                         return null;
79         }
80
81         public void initTable() {
82                 cloud.setSalt();//Set the salt
83                 Slot s=new Slot(this, 1, localmachineid);
84                 TableStatus status=new TableStatus(s, numslots);
85                 s.addEntry(status);
86                 Slot[] array=cloud.putSlot(s, numslots);
87                 if (array == null) {
88                         array = new Slot[] {s};
89                         /* update data structure */
90                         validateandupdate(array, true);
91                 } else {
92                         throw new Error("Error on initialization");
93                 }
94         }
95
96         public String toString() {
97                 return table.toString();
98         }
99         
100         public IoTString put(IoTString key, IoTString value) {
101                 while(true) {
102                         KeyValue oldvalue=table.get(key);
103                         if (tryput(key, value, false)) {
104                                 if (oldvalue==null)
105                                         return null;
106                                 else
107                                         return oldvalue.getValue();
108                         }
109                 }
110         }
111
112         void decrementLiveCount() {
113                 liveslotcount--;
114         }
115         
116         
117         private void setResizeThreshold() {
118                 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
119                 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
120         }
121         
122         private boolean tryput(IoTString key, IoTString value, boolean resize) {
123                 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
124                 int newsize = 0;
125                 if (liveslotcount > resizethreshold) { 
126                         resize=true; //Resize is forced
127                 }
128                 
129                 if (resize) {
130                         newsize = (int) (numslots * RESIZE_MULTIPLE);
131                         TableStatus status=new TableStatus(s, newsize);
132                         s.addEntry(status);
133                 }
134
135                 if (! rejectedmessagelist.isEmpty()) {
136                         /* TODO: We should avoid generating a rejected message entry if
137                          * there is already a sufficient entry in the queue (e.g.,
138                          * equalsto value of true and same sequence number).  */
139
140                         long old_seqn=rejectedmessagelist.firstElement();
141                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
142                                 long new_seqn=rejectedmessagelist.lastElement();
143                                 RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
144                                 s.addEntry(rm);
145                         } else {
146                                 long prev_seqn = -1;
147                                 int i=0;
148                                 /* Go through list of missing messages */
149                                 for(; i<rejectedmessagelist.size(); i++) {
150                                         long curr_seqn = rejectedmessagelist.get(i);
151                                         Slot s_msg = buffer.getSlot(curr_seqn);
152                                         if (s_msg != null)
153                                                 break;
154                                         prev_seqn=curr_seqn;
155                                 }
156                                 /* Generate rejected message entry for missing messages */
157                                 if (prev_seqn != -1) {
158                                         RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
159                                         s.addEntry(rm);
160                                 }
161                                 /* Generate rejected message entries for present messages */
162                                 for(; i<rejectedmessagelist.size(); i++) {
163                                         long curr_seqn=rejectedmessagelist.get(i);
164                                         Slot s_msg=buffer.getSlot(curr_seqn);
165                                         long machineid=s_msg.getMachineID();
166                                         RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
167                                         s.addEntry(rm);
168                                 }
169                         }
170                 }
171                 
172                 long newestseqnum = buffer.getNewestSeqNum();
173                 long oldestseqnum = buffer.getOldestSeqNum();
174                 if (lastliveslotseqn < oldestseqnum)
175                         lastliveslotseqn = oldestseqnum;
176
177                 long seqn = lastliveslotseqn;
178                 boolean seenliveslot = false;
179                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
180                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
181                 
182                 for(; seqn < threshold; seqn++) {
183                         Slot prevslot=buffer.getSlot(seqn);
184                         //Push slot number forward
185                         if (! seenliveslot)
186                                 lastliveslotseqn = seqn;
187
188                         if (! prevslot.isLive())
189                                 continue;
190                         seenliveslot = true;
191                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
192                         for(Entry liveentry:liveentries) {
193                                 if (s.hasSpace(liveentry)) {
194                                         s.addEntry(liveentry);
195                                 } else if (seqn==firstiffull) {   //if there's no space but the entry is about to fall off the queue
196                                         if (!resize) {
197                                                 System.out.print("B"); //?
198                                                 return tryput(key, value, true);
199                                         }
200                                 }
201                         }
202                 }
203
204                 KeyValue kv=new KeyValue(s, key, value);
205                 boolean insertedkv=false;
206                 if (s.hasSpace(kv)) {
207                         s.addEntry(kv);
208                         insertedkv=true;
209                 }
210
211                 /* now go through live entries from least to greatest sequence number until
212                  * either all live slots added, or the slot doesn't have enough room
213                  * for SKIP_THRESHOLD consecutive entries*/
214                 int skipcount=0;
215                 search:
216                 for(; seqn <= newestseqnum; seqn++) {
217                         Slot prevslot=buffer.getSlot(seqn);
218                         //Push slot number forward
219                         if (!seenliveslot)
220                                 lastliveslotseqn = seqn;
221                         
222                         if (!prevslot.isLive())
223                                 continue;
224                         seenliveslot = true;
225                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
226                         for(Entry liveentry:liveentries) {
227                                 if (s.hasSpace(liveentry))
228                                         s.addEntry(liveentry);
229                                 else {
230                                         skipcount++;
231                                         if (skipcount > SKIP_THRESHOLD)
232                                                 break search;
233                                 }
234                         }
235                 }
236
237                 int max=0;
238                 if (resize)
239                         max = newsize;
240                 Slot[] array=cloud.putSlot(s, max);
241                 if (array == null) {
242                         array = new Slot[] {s};
243                         rejectedmessagelist.clear();
244                 }       else {
245                         if (array.length == 0)
246                                 throw new Error("Server Error: Did not send any slots");
247                         rejectedmessagelist.add(s.getSequenceNumber());
248                         insertedkv=false;
249                 }
250                 
251                 /* update data structure */
252                 validateandupdate(array, true);
253
254                 return insertedkv;
255         }
256
257         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
258                 /* The cloud communication layer has checked slot HMACs already
259                          before decoding */
260                 if (newslots.length==0) return;
261
262                 long firstseqnum=newslots[0].getSequenceNumber();
263                 if (firstseqnum <= sequencenumber)
264                         throw new Error("Server Error: Sent older slots!");
265
266                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
267                 checkHMACChain(indexer, newslots);
268
269                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());  //
270
271                 initExpectedSize(firstseqnum);
272                 for(Slot slot: newslots) {
273                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
274                         updateExpectedSize();
275                 }
276
277                 /* If there is a gap, check to see if the server sent us everything. */
278                 if (firstseqnum != (sequencenumber+1)) {
279                         checkNumSlots(newslots.length);
280                         if (!machineSet.isEmpty())
281                                 throw new Error("Missing record for machines: "+machineSet);
282                 }
283
284                 commitNewMaxSize();
285
286                 /* Commit new to slots. */
287                 for(Slot slot:newslots) {
288                         buffer.putSlot(slot);
289                         liveslotcount++;
290                 }
291                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
292         }
293
294         private int expectedsize, currmaxsize;
295
296         private void checkNumSlots(int numslots) {
297                 if (numslots != expectedsize)
298                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
299         }
300
301         private void initExpectedSize(long firstsequencenumber) {
302                 long prevslots = firstsequencenumber;
303                 expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
304                 currmaxsize = numslots;
305         }
306
307         private void updateExpectedSize() {
308                 expectedsize++;
309                 if (expectedsize > currmaxsize)
310                         expectedsize = currmaxsize;
311         }
312
313         private void updateCurrMaxSize(int newmaxsize) {
314                 currmaxsize=newmaxsize;
315         }
316
317         private void commitNewMaxSize() {
318                 if (numslots != currmaxsize)
319                         buffer.resize(currmaxsize);
320
321                 numslots=currmaxsize;
322                 setResizeThreshold();
323         }
324
325         private void processEntry(KeyValue entry, SlotIndexer indexer) {
326                 IoTString key=entry.getKey();
327                 KeyValue oldvalue=table.get(key);
328                 if (oldvalue != null) {
329                         oldvalue.setDead();
330                 }
331                 table.put(key, entry);
332         }
333
334         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
335                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
336         }
337
338         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
339                 long oldseqnum=entry.getOldSeqNum();
340                 long newseqnum=entry.getNewSeqNum();
341                 boolean isequal=entry.getEqual();
342                 long machineid=entry.getMachineID();
343                 for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
344                         Slot slot=indexer.getSlot(seqnum);
345                         if (slot != null) {
346                                 long slotmachineid=slot.getMachineID();
347                                 if (isequal != (slotmachineid==machineid)) {
348                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
349                                 }
350                         }
351                 }
352
353                 HashSet<Long> watchset=new HashSet<Long>();
354                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
355                         long entry_mid=lastmsg_entry.getKey();
356                         /* We've seen it, don't need to continue to watch.  Our next
357                          * message will implicitly acknowledge it. */
358                         if (entry_mid == localmachineid)
359                                 continue;
360                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
361                         long entry_seqn=v.getFirst();
362                         if (entry_seqn < newseqnum) {
363                                 addWatchList(entry_mid, entry);
364                                 watchset.add(entry_mid);
365                         }
366                 }
367                 if (watchset.isEmpty())
368                         entry.setDead();
369                 else
370                         entry.setWatchSet(watchset);
371         }
372
373         private void addWatchList(long machineid, RejectedMessage entry) {
374                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
375                 if (entries == null)
376                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
377                 entries.add(entry);
378         }
379
380         private void processEntry(TableStatus entry, SlotIndexer indexer) {
381                 int newnumslots=entry.getMaxSlots();
382                 updateCurrMaxSize(newnumslots);
383                 if (lastTableStatus != null)
384                         lastTableStatus.setDead();
385                 lastTableStatus = entry;
386         }
387
388         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
389                 machineSet.remove(machineid);
390
391                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
392                 if (watchset != null) {
393                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
394                                 RejectedMessage rm=rmit.next();
395                                 if (rm.getNewSeqNum() <= seqnum) {
396                                         /* Remove it from our watchlist */
397                                         rmit.remove();
398                                         /* Decrement machines that need to see this notification */
399                                         rm.removeWatcher(machineid);
400                                 }
401                         }
402                 }
403                 
404                 if (machineid == localmachineid) {
405                         /* Our own messages are immediately dead. */
406                         if (liveness instanceof LastMessage) {
407                                 ((LastMessage)liveness).setDead();
408                         } else if (liveness instanceof Slot) {
409                                 ((Slot)liveness).setDead();
410                         } else {
411                                 throw new Error("Unrecognized type");
412                         }
413                 }
414                 
415                 
416                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
417                 if (lastmsgentry == null)
418                         return;
419
420                 long lastmsgseqnum = lastmsgentry.getFirst();
421                 Liveness lastentry = lastmsgentry.getSecond();
422                 if (machineid != localmachineid) {
423                         if (lastentry instanceof LastMessage) {
424                                 ((LastMessage)lastentry).setDead();
425                         } else if (lastentry instanceof Slot) {
426                                 ((Slot)lastentry).setDead();
427                         } else {
428                                 throw new Error("Unrecognized type");
429                         }
430                 }
431                 
432                 if (machineid == localmachineid) {
433                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
434                                 throw new Error("Server Error: Mismatch on local machine sequence number");
435                 } else {
436                         if (lastmsgseqnum > seqnum)
437                                 throw new Error("Server Error: Rollback on remote machine sequence number");
438                 }
439         }
440
441         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
442                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
443                 for(Entry entry : slot.getEntries()) {
444                         switch(entry.getType()) {
445                         case Entry.TypeKeyValue:
446                                 processEntry((KeyValue)entry, indexer);
447                                 break;
448
449                         case Entry.TypeLastMessage:
450                                 processEntry((LastMessage)entry, indexer, machineSet);
451                                 break;
452
453                         case Entry.TypeRejectedMessage:
454                                 processEntry((RejectedMessage)entry, indexer);
455                                 break;
456
457                         case Entry.TypeTableStatus:
458                                 processEntry((TableStatus)entry, indexer);
459                                 break;
460
461                         default:
462                                 throw new Error("Unrecognized type: "+entry.getType());
463                         }
464                 }
465         }
466
467         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
468                 for(int i=0; i < newslots.length; i++) {
469                         Slot currslot=newslots[i];
470                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
471                         if (prevslot != null &&
472                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
473                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
474                 }
475         }
476 }