3afb1a9c061f378d2fc1df54291ee91f24f6e959
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9
10 /**
11  * IoTTable data structure.  Provides client inferface.
12  * @author Brian Demsky
13  * @version 1.0
14  */
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private SlotBuffer buffer;
22         private CloudComm cloud;
23         private long sequencenumber;
24         private long localmachineid;
25         private TableStatus lastTableStatus;
26         static final int FREE_SLOTS = 10;
27         static final int SKIP_THRESHOLD = 10;
28         private long liveslotcount=0;
29         private int chance;
30         static final double RESIZE_MULTIPLE = 1.2;
31         static final double RESIZE_THRESHOLD = 0.75;
32         private int resizethreshold;
33         private long lastliveslotseqn;
34         private Random random=new Random();
35         
36         public Table(String baseurl, String password, long _localmachineid) {
37                 localmachineid=_localmachineid;
38                 buffer = new SlotBuffer();
39                 numslots = buffer.capacity();
40                 setResizeThreshold();
41                 sequencenumber = 0;
42                 cloud=new CloudComm(this, baseurl, password);
43                 lastliveslotseqn = 1;
44         }
45
46         public Table(CloudComm _cloud, long _localmachineid) {
47                 localmachineid=_localmachineid;
48                 buffer = new SlotBuffer();
49                 numslots = buffer.capacity();
50                 setResizeThreshold();
51                 sequencenumber = 0;
52                 cloud=_cloud;
53         }
54
55         public void rebuild() {
56                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
57                 validateandupdate(newslots, true);
58         }
59
60         public void update() {
61                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
62
63                 validateandupdate(newslots, false);
64         }
65
66         public IoTString get(IoTString key) {
67                 KeyValue kv=table.get(key);
68                 if (kv != null)
69                         return kv.getValue();
70                 else
71                         return null;
72         }
73
74         public void initTable() {
75                 cloud.setSalt();//Set the salt
76                 Slot s=new Slot(this, 1, localmachineid);
77                 TableStatus status=new TableStatus(s, numslots);
78                 s.addEntry(status);
79                 Slot[] array=cloud.putSlot(s, numslots);
80                 if (array == null) {
81                         array = new Slot[] {s};
82                         /* update data structure */
83                         validateandupdate(array, true);
84                 } else {
85                         throw new Error("Error on initialization");
86                 }
87         }
88
89         public String toString() {
90                 return table.toString();
91         }
92         
93         public IoTString put(IoTString key, IoTString value) {
94                 while(true) {
95                         KeyValue oldvalue=table.get(key);
96                         if (tryput(key, value, false)) {
97                                 if (oldvalue==null)
98                                         return null;
99                                 else
100                                         return oldvalue.getValue();
101                         }
102                 }
103         }
104
105         void decrementLiveCount() {
106                 liveslotcount--;
107         }
108         
109         private void setResizeThreshold() {
110                 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
111                 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
112         }
113         
114         private boolean tryput(IoTString key, IoTString value, boolean resize) {
115                 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
116                 int newsize = 0;
117                 if (liveslotcount > resizethreshold) {
118                         resize=true;
119                         newsize = (int) (numslots * RESIZE_MULTIPLE);
120                 }
121                 
122                 if (resize) {
123                         newsize = (int) (numslots * RESIZE_MULTIPLE);
124                         TableStatus status=new TableStatus(s, newsize);
125                         s.addEntry(status);
126                 }
127
128                 long newestseqnum = buffer.getNewestSeqNum();
129                 long oldestseqnum = buffer.getOldestSeqNum();
130                 if (lastliveslotseqn < oldestseqnum)
131                         lastliveslotseqn = oldestseqnum;
132
133                 long seqn = lastliveslotseqn;
134                 boolean seenliveslot = false;
135                 long firstiffull = newestseqnum + 1 - numslots;
136                 long threshold = firstiffull + FREE_SLOTS;
137                 
138                 for(; seqn < threshold; seqn++) {
139                         Slot prevslot=buffer.getSlot(seqn);
140                         //Push slot number forward
141                         if (!seenliveslot)
142                                 lastliveslotseqn = seqn;
143
144                         if (!prevslot.isLive())
145                                 continue;
146                         seenliveslot = true;
147                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
148                         for(Entry liveentry:liveentries) {
149                                 if (s.hasSpace(liveentry)) {
150                                         s.addEntry(liveentry);
151                                 } else if (seqn==firstiffull) {
152                                         if (!resize) {
153                                                 System.out.print("B");
154                                                 return tryput(key, value, true);
155                                         }
156                                 }
157                         }
158                 }
159
160                 KeyValue kv=new KeyValue(s, key, value);
161                 boolean insertedkv=false;
162                 if (s.hasSpace(kv)) {
163                         s.addEntry(kv);
164                         insertedkv=true;
165                 }
166
167                 int skipcount=0;
168                 search:
169                 for(; seqn <= newestseqnum; seqn++) {
170                         Slot prevslot=buffer.getSlot(seqn);
171                         //Push slot number forward
172                         if (!seenliveslot)
173                                 lastliveslotseqn = seqn;
174                         
175                         if (!prevslot.isLive())
176                                 continue;
177                         seenliveslot = true;
178                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
179                         for(Entry liveentry:liveentries) {
180                                 if (s.hasSpace(liveentry))
181                                         s.addEntry(liveentry);
182                                 else {
183                                         skipcount++;
184                                         if (skipcount > SKIP_THRESHOLD)
185                                                 break search;
186                                 }
187                         }
188                 }
189
190                 int max=0;
191                 if (resize)
192                         max = newsize;
193                 Slot[] array=cloud.putSlot(s, max);
194                 if (array == null)
195                         array = new Slot[] {s};
196                 else
197                         insertedkv=false;
198
199                 /* update data structure */
200                 validateandupdate(array, true);
201
202                 return insertedkv;
203         }
204
205         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
206                 /* The cloud communication layer has checked slot HMACs already
207                          before decoding */
208                 if (newslots.length==0)
209                         return;
210
211                 long firstseqnum=newslots[0].getSequenceNumber();
212                 if (firstseqnum <= sequencenumber)
213                         throw new Error("Server Error: Sent older slots!");
214
215                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
216                 checkHMACChain(indexer, newslots);
217
218                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
219
220                 initExpectedSize(firstseqnum);
221                 for(Slot slot: newslots) {
222                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
223                         updateExpectedSize();
224                 }
225
226                 /* If there is a gap, check to see if the server sent us everything. */
227                 if (firstseqnum != (sequencenumber+1)) {
228                         checkNumSlots(newslots.length);
229                         if (!machineSet.isEmpty())
230                                 throw new Error("Missing record for machines: "+machineSet);
231                 }
232
233                 commitNewMaxSize();
234
235                 /* Commit new to slots. */
236                 for(Slot slot:newslots) {
237                         buffer.putSlot(slot);
238                         liveslotcount++;
239                 }
240                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
241         }
242
243         private int expectedsize, currmaxsize;
244
245         private void checkNumSlots(int numslots) {
246                 if (numslots != expectedsize)
247                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
248         }
249
250         private void initExpectedSize(long firstsequencenumber) {
251                 long prevslots = firstsequencenumber;
252                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
253                 currmaxsize = numslots;
254         }
255
256         private void updateExpectedSize() {
257                 expectedsize++;
258                 if (expectedsize > currmaxsize)
259                         expectedsize = currmaxsize;
260         }
261
262         private void updateCurrMaxSize(int newmaxsize) {
263                 currmaxsize=newmaxsize;
264         }
265
266         private void commitNewMaxSize() {
267                 if (numslots != currmaxsize)
268                         buffer.resize(currmaxsize);
269
270                 numslots=currmaxsize;
271                 setResizeThreshold();
272         }
273
274         private void processEntry(KeyValue entry, SlotIndexer indexer) {
275                 IoTString key=entry.getKey();
276                 KeyValue oldvalue=table.get(key);
277                 if (oldvalue != null) {
278                         oldvalue.setDead();
279                 }
280                 table.put(key, entry);
281         }
282
283         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
284                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
285         }
286
287         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
288                 long oldseqnum=entry.getOldSeqNum();
289                 long newseqnum=entry.getNewSeqNum();
290                 boolean isequal=entry.getEqual();
291                 long machineid=entry.getMachineID();
292                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
293                         Slot slot=indexer.getSlot(seqnum);
294                         if (slot != null) {
295                                 long slotmachineid=slot.getMachineID();
296                                 if (isequal!=(slotmachineid==machineid)) {
297                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
298                                 }
299                         }
300                 }
301
302                 HashSet<Long> watchset=new HashSet<Long>();
303                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
304                         long entry_mid=lastmsg_entry.getKey();
305                         /* We've seen it, don't need to continue to watch.  Our next
306                          * message will implicitly acknowledge it. */
307                         if (entry_mid == localmachineid)
308                                 continue;
309                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
310                         long entry_seqn=v.getFirst();
311                         if (entry_seqn < newseqnum) {
312                                 addWatchList(entry_mid, entry);
313                                 watchset.add(entry_mid);
314                         }
315                 }
316                 if (watchset.isEmpty())
317                         entry.setDead();
318                 else
319                         entry.setWatchSet(watchset);
320         }
321
322         private void addWatchList(long machineid, RejectedMessage entry) {
323                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
324                 if (entries == null)
325                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
326                 entries.add(entry);
327         }
328
329         private void processEntry(TableStatus entry, SlotIndexer indexer) {
330                 int newnumslots=entry.getMaxSlots();
331                 updateCurrMaxSize(newnumslots);
332                 if (lastTableStatus != null)
333                         lastTableStatus.setDead();
334                 lastTableStatus = entry;
335         }
336
337         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
338                 machineSet.remove(machineid);
339
340                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
341                 if (watchset != null) {
342                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
343                                 RejectedMessage rm=rmit.next();
344                                 if (rm.getNewSeqNum() <= seqnum) {
345                                         /* Remove it from our watchlist */
346                                         rmit.remove();
347                                         /* Decrement machines that need to see this notification */
348                                         rm.removeWatcher(machineid);
349                                 }
350                         }
351                 }
352                 
353                 if (machineid == localmachineid) {
354                         /* Our own messages are immediately dead. */
355                         if (liveness instanceof LastMessage) {
356                                 ((LastMessage)liveness).setDead();
357                         } else if (liveness instanceof Slot) {
358                                 ((Slot)liveness).setDead();
359                         } else {
360                                 throw new Error("Unrecognized type");
361                         }
362                 }
363                 
364                 
365                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
366                 if (lastmsgentry == null)
367                         return;
368
369                 long lastmsgseqnum = lastmsgentry.getFirst();
370                 Liveness lastentry = lastmsgentry.getSecond();
371                 if (machineid != localmachineid) {
372                         if (lastentry instanceof LastMessage) {
373                                 ((LastMessage)lastentry).setDead();
374                         } else if (lastentry instanceof Slot) {
375                                 ((Slot)lastentry).setDead();
376                         } else {
377                                 throw new Error("Unrecognized type");
378                         }
379                 }
380                 
381                 if (machineid == localmachineid) {
382                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
383                                 throw new Error("Server Error: Mismatch on local machine sequence number");
384                 } else {
385                         if (lastmsgseqnum > seqnum)
386                                 throw new Error("Server Error: Rollback on remote machine sequence number");
387                 }
388         }
389
390         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
391                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
392                 for(Entry entry : slot.getEntries()) {
393                         switch(entry.getType()) {
394                         case Entry.TypeKeyValue:
395                                 processEntry((KeyValue)entry, indexer);
396                                 break;
397
398                         case Entry.TypeLastMessage:
399                                 processEntry((LastMessage)entry, indexer, machineSet);
400                                 break;
401
402                         case Entry.TypeRejectedMessage:
403                                 processEntry((RejectedMessage)entry, indexer);
404                                 break;
405
406                         case Entry.TypeTableStatus:
407                                 processEntry((TableStatus)entry, indexer);
408                                 break;
409
410                         default:
411                                 throw new Error("Unrecognized type: "+entry.getType());
412                         }
413                 }
414         }
415
416         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
417                 for(int i=0; i < newslots.length; i++) {
418                         Slot currslot=newslots[i];
419                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
420                         if (prevslot != null &&
421                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
422                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
423                 }
424         }
425 }