028025f04ffcb8720f05e623ea55dc8d0efaa361
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8
9 /**
10  * IoTTable data structure.  Provides client inferface.
11  * @author Brian Demsky
12  * @version 1.0
13  */
14
15
16 final public class Table {
17         private int numslots;
18         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21         private SlotBuffer buffer;
22         private CloudComm cloud;
23         private long sequencenumber;
24         private long localmachineid;
25         private TableStatus lastTableStatus;
26         static final int FREE_SLOTS = 10;
27         static final int FORCED_RESIZE_INCREMENT = 20;
28
29         public Table(String baseurl, String password, long _localmachineid) {
30                 localmachineid=_localmachineid;
31                 buffer = new SlotBuffer();
32                 numslots = buffer.capacity();
33                 sequencenumber = 0;
34                 cloud=new CloudComm(baseurl, password);
35         }
36
37         public Table(CloudComm _cloud, long _localmachineid) {
38                 localmachineid=_localmachineid;
39                 buffer = new SlotBuffer();
40                 numslots = buffer.capacity();
41                 sequencenumber = 0;
42                 cloud=_cloud;
43         }
44
45         public void rebuild() {
46                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
47                 validateandupdate(newslots, true);
48         }
49
50         public void update() {
51                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
52
53                 validateandupdate(newslots, false);
54         }
55
56         public IoTString get(IoTString key) {
57                 KeyValue kv=table.get(key);
58                 if (kv != null)
59                         return kv.getValue();
60                 else
61                         return null;
62         }
63
64         public void initTable() {
65                 cloud.setSalt();//Set the salt
66                 Slot s=new Slot(1, localmachineid);
67                 TableStatus status=new TableStatus(s, numslots);
68                 s.addEntry(status);
69                 Slot[] array=cloud.putSlot(s, numslots);
70                 if (array == null) {
71                         array = new Slot[] {s};
72                         /* update data structure */
73                         validateandupdate(array, true);
74                 } else {
75                         throw new Error("Error on initialization");
76                 }
77         }
78
79         public String toString() {
80                 return table.toString();
81         }
82         
83         public IoTString put(IoTString key, IoTString value) {
84                 while(true) {
85                         KeyValue oldvalue=table.get(key);
86                         if (tryput(key, value, false)) {
87                                 if (oldvalue==null)
88                                         return null;
89                                 else
90                                         return oldvalue.getValue();
91                         }
92                 }
93         }
94
95         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
96                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
97                 long seqn = buffer.getOldestSeqNum();
98
99                 if (forcedresize) {
100                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
101                         s.addEntry(status);
102                 }
103
104                 if ((numslots - buffer.size()) < FREE_SLOTS) {
105                         /* have to check whether we have enough free slots */
106                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
107                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
108                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
109                                 Slot prevslot=buffer.getSlot(seqn);
110                                 if (!prevslot.isLive())
111                                         continue;
112                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
113                                 for(Entry liveentry:liveentries) {
114                                         if (s.hasSpace(liveentry))
115                                                 s.addEntry(liveentry);
116                                         else if (i==0) {
117                                                 if (s.canFit(liveentry))
118                                                         s.addEntry(liveentry);
119                                                 else if (!forcedresize) {
120                                                         return tryput(key, value, true);
121                                                 }
122                                         }
123                                 }
124                         }
125                 }
126                 KeyValue kv=new KeyValue(s, key, value);
127                 boolean insertedkv=false;
128                 if (s.hasSpace(kv)) {
129                         s.addEntry(kv);
130                         insertedkv=true;
131                 }
132
133                 long newestseqnum=buffer.getNewestSeqNum();
134 search:
135                 for(; seqn<=newestseqnum; seqn++) {
136                         Slot prevslot=buffer.getSlot(seqn);
137                         if (!prevslot.isLive())
138                                 continue;
139                         Vector<Entry> liveentries = prevslot.getLiveEntries();
140                         for(Entry liveentry:liveentries) {
141                                 if (s.hasSpace(liveentry))
142                                         s.addEntry(liveentry);
143                                 else
144                                         break search;
145                         }
146                 }
147
148                 int max=0;
149                 if (forcedresize)
150                         max = numslots + FORCED_RESIZE_INCREMENT;
151                 Slot[] array=cloud.putSlot(s, max);
152                 if (array == null)
153                         array = new Slot[] {s};
154                 else
155                         insertedkv=false;
156
157                 /* update data structure */
158                 validateandupdate(array, true);
159
160                 return insertedkv;
161         }
162
163         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
164                 /* The cloud communication layer has checked slot HMACs already
165                          before decoding */
166                 if (newslots.length==0)
167                         return;
168
169                 long firstseqnum=newslots[0].getSequenceNumber();
170                 if (firstseqnum <= sequencenumber)
171                         throw new Error("Server Error: Sent older slots!");
172
173                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
174                 checkHMACChain(indexer, newslots);
175
176                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
177
178                 initExpectedSize();
179                 for(Slot slot: newslots) {
180                         updateExpectedSize();
181                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
182                 }
183
184                 /* If there is a gap, check to see if the server sent us everything. */
185                 if (firstseqnum != (sequencenumber+1)) {
186                         checkNumSlots(newslots.length);
187                         if (!machineSet.isEmpty())
188                                 throw new Error("Missing record for machines: "+machineSet);
189                 }
190
191                 commitNewMaxSize();
192
193                 /* Commit new to slots. */
194                 for(Slot slot:newslots) {
195                         buffer.putSlot(slot);
196                 }
197                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
198         }
199
200         private int expectedsize, currmaxsize;
201
202         private void checkNumSlots(int numslots) {
203                 if (numslots != expectedsize)
204                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
205         }
206
207         private void initExpectedSize() {
208                 long prevslots = sequencenumber;
209                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
210                 currmaxsize = numslots;
211         }
212
213         private void updateExpectedSize() {
214                 expectedsize++;
215                 if (expectedsize > currmaxsize)
216                         expectedsize = currmaxsize;
217         }
218
219         private void updateCurrMaxSize(int newmaxsize) {
220                 currmaxsize=newmaxsize;
221         }
222
223         private void commitNewMaxSize() {
224                 if (numslots != currmaxsize)
225                         buffer.resize(currmaxsize);
226
227                 numslots=currmaxsize;
228         }
229
230         private void processEntry(KeyValue entry, SlotIndexer indexer) {
231                 IoTString key=entry.getKey();
232                 KeyValue oldvalue=table.get(key);
233                 if (oldvalue != null) {
234                         oldvalue.setDead();
235                 }
236                 table.put(key, entry);
237         }
238
239         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
240                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
241         }
242
243         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
244                 long oldseqnum=entry.getOldSeqNum();
245                 long newseqnum=entry.getNewSeqNum();
246                 boolean isequal=entry.getEqual();
247                 long machineid=entry.getMachineID();
248                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
249                         Slot slot=indexer.getSlot(seqnum);
250                         if (slot != null) {
251                                 long slotmachineid=slot.getMachineID();
252                                 if (isequal!=(slotmachineid==machineid)) {
253                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
254                                 }
255                         }
256                 }
257
258                 HashSet<Long> watchset=new HashSet<Long>();
259                 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
260                         long entry_mid=lastmsg_entry.getKey();
261                         /* We've seen it, don't need to continue to watch.  Our next
262                          * message will implicitly acknowledge it. */
263                         if (entry_mid == localmachineid)
264                                 continue;
265                         Pair<Long, Liveness> v=lastmsg_entry.getValue();
266                         long entry_seqn=v.getFirst();
267                         if (entry_seqn < newseqnum) {
268                                 addWatchList(entry_mid, entry);
269                                 watchset.add(entry_mid);
270                         }
271                 }
272                 if (watchset.isEmpty())
273                         entry.setDead();
274                 else
275                         entry.setWatchSet(watchset);
276         }
277
278         private void addWatchList(long machineid, RejectedMessage entry) {
279                 HashSet<RejectedMessage> entries=watchlist.get(machineid);
280                 if (entries == null)
281                         watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
282                 entries.add(entry);
283         }
284
285         private void processEntry(TableStatus entry, SlotIndexer indexer) {
286                 int newnumslots=entry.getMaxSlots();
287                 updateCurrMaxSize(newnumslots);
288                 if (lastTableStatus != null)
289                         lastTableStatus.setDead();
290                 lastTableStatus = entry;
291         }
292
293         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
294                 machineSet.remove(machineid);
295
296                 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
297                 if (watchset != null) {
298                         for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
299                                 RejectedMessage rm=rmit.next();
300                                 if (rm.getNewSeqNum() <= seqnum) {
301                                         /* Remove it from our watchlist */
302                                         rmit.remove();
303                                         /* Decrement machines that need to see this notification */
304                                         rm.removeWatcher(machineid);
305                                 }
306                         }
307                 }
308                 
309                 if (machineid == localmachineid) {
310                         /* Our own messages are immediately dead. */
311                         if (liveness instanceof LastMessage) {
312                                 ((LastMessage)liveness).setDead();
313                         } else if (liveness instanceof Slot) {
314                                 ((Slot)liveness).setDead();
315                         } else {
316                                 throw new Error("Unrecognized type");
317                         }
318                 }
319                 
320                 
321                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
322                 if (lastmsgentry == null)
323                         return;
324
325                 long lastmsgseqnum = lastmsgentry.getFirst();
326                 Liveness lastentry = lastmsgentry.getSecond();
327                 if (machineid != localmachineid) {
328                         if (lastentry instanceof LastMessage) {
329                                 ((LastMessage)lastentry).setDead();
330                         } else if (lastentry instanceof Slot) {
331                                 ((Slot)lastentry).setDead();
332                         } else {
333                                 throw new Error("Unrecognized type");
334                         }
335                 }
336                 
337                 if (machineid == localmachineid) {
338                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
339                                 throw new Error("Server Error: Mismatch on local machine sequence number");
340                 } else {
341                         if (lastmsgseqnum > seqnum)
342                                 throw new Error("Server Error: Rollback on remote machine sequence number");
343                 }
344         }
345
346         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
347                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
348                 for(Entry entry : slot.getEntries()) {
349                         switch(entry.getType()) {
350                         case Entry.TypeKeyValue:
351                                 processEntry((KeyValue)entry, indexer);
352                                 break;
353
354                         case Entry.TypeLastMessage:
355                                 processEntry((LastMessage)entry, indexer, machineSet);
356                                 break;
357
358                         case Entry.TypeRejectedMessage:
359                                 processEntry((RejectedMessage)entry, indexer);
360                                 break;
361
362                         case Entry.TypeTableStatus:
363                                 processEntry((TableStatus)entry, indexer);
364                                 break;
365
366                         default:
367                                 throw new Error("Unrecognized type: "+entry.getType());
368                         }
369                 }
370         }
371
372         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
373                 for(int i=0; i < newslots.length; i++) {
374                         Slot currslot=newslots[i];
375                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
376                         if (prevslot != null &&
377                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
378                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
379                 }
380         }
381 }