debugging code
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8
9 /**
10  * IoTTable data structure.  Provides client inferface.
11  * @author Brian Demsky
12  * @version 1.0
13  */
14
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private SlotBuffer buffer;
22         private CloudComm cloud;
23         private long sequencenumber;
24         private long localmachineid;
25         private TableStatus lastTableStatus;
26         static final int FREE_SLOTS = 10;
27         static final int FORCED_RESIZE_INCREMENT = 20;
28
29         public Table(String baseurl, String password, long _localmachineid) {
30                 localmachineid=_localmachineid;
31                 buffer = new SlotBuffer();
32                 numslots = buffer.capacity();
33                 sequencenumber = 0;
34                 cloud=new CloudComm(baseurl, password);
35         }
36
37         public Table(CloudComm _cloud, long _localmachineid) {
38                 localmachineid=_localmachineid;
39                 buffer = new SlotBuffer();
40                 numslots = buffer.capacity();
41                 sequencenumber = 0;
42                 cloud=_cloud;
43         }
44
45         public void rebuild() {
46                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
47                 validateandupdate(newslots, true);
48         }
49
50         public void update() {
51                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
52
53                 validateandupdate(newslots, false);
54         }
55
56         public IoTString get(IoTString key) {
57                 KeyValue kv=table.get(key);
58                 if (kv != null)
59                         return kv.getValue();
60                 else
61                         return null;
62         }
63
64         public void initTable() {
65                 Slot s=new Slot(1, localmachineid);
66                 TableStatus status=new TableStatus(s, numslots);
67                 s.addEntry(status);
68                 Slot[] array=cloud.putSlot(s, numslots);
69                 if (array == null) {
70                         array = new Slot[] {s};
71                         /* update data structure */
72                         validateandupdate(array, true);
73                 } else {
74                         throw new Error("Error on initialization");
75                 }
76         }
77
78         public IoTString put(IoTString key, IoTString value) {
79                 while(true) {
80                         KeyValue oldvalue=table.get(key);
81                         if (tryput(key, value, false)) {
82                                 if (oldvalue==null)
83                                         return null;
84                                 else
85                                         return oldvalue.getValue();
86                         }
87                 }
88         }
89
90         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
91                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
92                 long seqn = buffer.getOldestSeqNum();
93
94                 if (forcedresize) {
95                         System.out.println("A");
96                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
97                         s.addEntry(status);
98                 }
99
100                 if ((numslots - buffer.size()) < FREE_SLOTS) {
101                         /* have to check whether we have enough free slots */
102                         System.out.println("B");
103                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
104                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
105                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
106                                 Slot prevslot=buffer.getSlot(seqn);
107                                 System.out.println(i);
108                                 if (!prevslot.isLive())
109                                         continue;
110                                 System.out.println("islive");
111                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
112                                 for(Entry liveentry:liveentries) {
113                                         if (redundant(liveentry))
114                                                 continue;
115                                         if (s.hasSpace(liveentry))
116                                                 s.addEntry(liveentry);
117                                         else if (i==0) {
118                                                 if (s.canFit(liveentry))
119                                                         s.addEntry(liveentry);
120                                                 else if (!forcedresize) {
121                                                         return tryput(key, value, true);
122                                                 }
123                                         }
124                                 }
125                         }
126                 }
127                 KeyValue kv=new KeyValue(s, key, value);
128                 boolean insertedkv=false;
129                 if (s.hasSpace(kv)) {
130                         s.addEntry(kv);
131                         insertedkv=true;
132                 }
133
134                 long newestseqnum=buffer.getNewestSeqNum();
135 search:
136                 for(; seqn<=newestseqnum; seqn++) {
137                         Slot prevslot=buffer.getSlot(seqn);
138                         if (!prevslot.isLive())
139                                 continue;
140                         Vector<Entry> liveentries = prevslot.getLiveEntries();
141                         for(Entry liveentry:liveentries) {
142                                 if (redundant(liveentry))
143                                         continue;
144                                 if (s.hasSpace(liveentry))
145                                         s.addEntry(liveentry);
146                                 else
147                                         break search;
148                         }
149                 }
150
151                 int max=0;
152                 if (forcedresize)
153                         max = numslots + FORCED_RESIZE_INCREMENT;
154                 Slot[] array=cloud.putSlot(s, max);
155                 if (array == null)
156                         array = new Slot[] {s};
157                 else
158                         insertedkv=false;
159
160                 /* update data structure */
161                 validateandupdate(array, true);
162
163                 return insertedkv;
164         }
165
166         boolean redundant(Entry liveentry) {
167                 if (liveentry.getType()==Entry.TypeLastMessage) {
168                         LastMessage lastmsg=(LastMessage) liveentry;
169                         return lastmsg.getMachineID() == localmachineid;
170                 }
171                 return false;
172         }
173
174
175         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
176                 /* The cloud communication layer has checked slot HMACs already
177                          before decoding */
178                 if (newslots.length==0)
179                         return;
180
181                 long firstseqnum=newslots[0].getSequenceNumber();
182                 if (firstseqnum <= sequencenumber)
183                         throw new Error("Server Error: Sent older slots!");
184
185                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
186                 checkHMACChain(indexer, newslots);
187
188                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
189
190                 initExpectedSize();
191                 for(Slot slot: newslots) {
192                         updateExpectedSize();
193                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
194                 }
195
196                 /* If there is a gap, check to see if the server sent us everything. */
197                 if (firstseqnum != (sequencenumber+1)) {
198                         checkNumSlots(newslots.length);
199                         if (!machineSet.isEmpty())
200                                 throw new Error("Missing record for machines: "+machineSet);
201                 }
202
203                 commitNewMaxSize();
204
205                 /* Commit new to slots. */
206                 for(Slot slot:newslots) {
207                         buffer.putSlot(slot);
208                 }
209                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
210         }
211
212         private int expectedsize, currmaxsize;
213
214         private void checkNumSlots(int numslots) {
215                 if (numslots != expectedsize)
216                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
217         }
218
219         private void initExpectedSize() {
220                 long prevslots = sequencenumber;
221                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
222                 currmaxsize = numslots;
223         }
224
225         private void updateExpectedSize() {
226                 expectedsize++;
227                 if (expectedsize > currmaxsize)
228                         expectedsize = currmaxsize;
229         }
230
231         private void updateCurrMaxSize(int newmaxsize) {
232                 currmaxsize=newmaxsize;
233         }
234
235         private void commitNewMaxSize() {
236                 if (numslots != currmaxsize)
237                         buffer.resize(currmaxsize);
238
239                 numslots=currmaxsize;
240         }
241
242         private void processEntry(KeyValue entry, SlotIndexer indexer) {
243                 IoTString key=entry.getKey();
244                 KeyValue oldvalue=table.get(key);
245                 if (oldvalue != null) {
246                         oldvalue.setDead();
247                 }
248                 table.put(key, entry);
249         }
250
251         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
252                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
253         }
254
255         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
256                 long oldseqnum=entry.getOldSeqNum();
257                 long newseqnum=entry.getNewSeqNum();
258                 boolean isequal=entry.getEqual();
259                 long machineid=entry.getMachineID();
260                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
261                         Slot slot=indexer.getSlot(seqnum);
262                         if (slot != null) {
263                                 long slotmachineid=slot.getMachineID();
264                                 if (isequal!=(slotmachineid==machineid)) {
265                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
266                                 }
267                         }
268                 }
269
270                 HashSet<Long> watchset=new HashSet<Long>();
271                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
272                         long entry_mid=lastmsg_entry.getKey();
273                         /* We've seen it, don't need to continue to watch.  Our next
274                          * message will implicitly acknowledge it. */
275                         if (entry_mid == localmachineid)
276                                 continue;
277                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
278                         long entry_seqn=v.getFirst();
279                         if (entry_seqn < newseqnum) {
280                                 addWatchList(entry_mid, entry);
281                                 watchset.add(entry_mid);
282                         }
283                 }
284                 if (watchset.isEmpty())
285                         entry.setDead();
286                 else
287                         entry.setWatchSet(watchset);
288         }
289
290         private void addWatchList(long machineid, RejectedMessage entry) {
291                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
292                 if (entries == null)
293                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
294                 entries.add(entry);
295         }
296
297         private void processEntry(TableStatus entry, SlotIndexer indexer) {
298                 int newnumslots=entry.getMaxSlots();
299                 updateCurrMaxSize(newnumslots);
300                 if (lastTableStatus != null)
301                         lastTableStatus.setDead();
302                 lastTableStatus = entry;
303         }
304
305         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
306                 machineSet.remove(machineid);
307
308                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
309                 if (watchset != null) {
310                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
311                                 RejectedMessage rm=rmit.next();
312                                 if (rm.getNewSeqNum() <= seqnum) {
313                                         /* Remove it from our watchlist */
314                                         rmit.remove();
315                                         /* Decrement machines that need to see this notification */
316                                         rm.removeWatcher(machineid);
317                                 }
318                         }
319                 }
320
321                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
322                 if (lastmsgentry == null)
323                         return;
324
325                 long lastmsgseqnum = lastmsgentry.getFirst();
326                 Liveness lastentry = lastmsgentry.getSecond();
327                 if (lastentry instanceof LastMessage) {
328                         ((LastMessage)lastentry).setDead();
329                 } else if (lastentry instanceof Slot) {
330                         ((Slot)lastentry).setDead();
331                 } else {
332                         throw new Error("Unrecognized type");
333                 }
334
335                 if (machineid == localmachineid) {
336                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
337                                 throw new Error("Server Error: Mismatch on local machine sequence number");
338                 } else {
339                         if (lastmsgseqnum > seqnum)
340                                 throw new Error("Server Error: Rollback on remote machine sequence number");
341                 }
342         }
343
344         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
345                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
346
347                 for(Entry entry : slot.getEntries()) {
348                         switch(entry.getType()) {
349                         case Entry.TypeKeyValue:
350                                 processEntry((KeyValue)entry, indexer);
351                                 break;
352
353                         case Entry.TypeLastMessage:
354                                 processEntry((LastMessage)entry, indexer, machineSet);
355                                 break;
356
357                         case Entry.TypeRejectedMessage:
358                                 processEntry((RejectedMessage)entry, indexer);
359                                 break;
360
361                         case Entry.TypeTableStatus:
362                                 processEntry((TableStatus)entry, indexer);
363                                 break;
364
365                         default:
366                                 throw new Error("Unrecognized type: "+entry.getType());
367                         }
368                 }
369         }
370
371         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
372                 for(int i=0; i < newslots.length; i++) {
373                         Slot currslot=newslots[i];
374                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
375                         if (prevslot != null &&
376                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
377                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
378                 }
379         }
380 }