add check for missing messages
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.HashSet;
4 import java.util.Arrays;
5 import java.util.Vector;
6
7 /**
8  * IoTTable data structure.  Provides client inferface.
9  * @author Brian Demsky
10  * @version 1.0
11  */
12
13
14 final public class Table {
15         private int numslots;
16         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
17         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
18         private SlotBuffer buffer;
19         private CloudComm cloud;
20         private long sequencenumber;
21         private long localmachineid;
22         private TableStatus lastTableStatus;
23         static final int FREE_SLOTS = 10;
24         static final int FORCED_RESIZE_INCREMENT = 20;
25
26         public Table(String baseurl, String password, long _localmachineid) {
27                 localmachineid=_localmachineid;
28                 buffer = new SlotBuffer();
29                 numslots = buffer.capacity();
30                 sequencenumber = 0;
31                 cloud=new CloudComm(baseurl, password);
32         }
33
34         public Table(CloudComm _cloud, long _localmachineid) {
35                 localmachineid=_localmachineid;
36                 buffer = new SlotBuffer();
37                 numslots = buffer.capacity();
38                 sequencenumber = 0;
39                 cloud=_cloud;
40         }
41
42         public void rebuild() {
43                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
44                 validateandupdate(newslots, true);
45         }
46
47         public void update() {
48                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
49
50                 validateandupdate(newslots, false);
51         }
52
53         public IoTString get(IoTString key) {
54                 KeyValue kv=table.get(key);
55                 if (kv != null)
56                         return kv.getValue();
57                 else
58                         return null;
59         }
60
61         public void initTable() {
62                 Slot s=new Slot(1, localmachineid);
63                 TableStatus status=new TableStatus(s, numslots);
64                 s.addEntry(status);
65                 Slot[] array=cloud.putSlot(s, numslots);
66                 if (array == null) {
67                         array = new Slot[] {s};
68                         /* update data structure */
69                         validateandupdate(array, true);
70                 } else {
71                         throw new Error("Error on initialization");
72                 }
73         }
74
75         public IoTString put(IoTString key, IoTString value) {
76                 while(true) {
77                         KeyValue oldvalue=table.get(key);
78                         if (tryput(key, value, false)) {
79                                 if (oldvalue==null)
80                                         return null;
81                                 else
82                                         return oldvalue.getValue();
83                         }
84                 }
85         }
86
87         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
88                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
89                 long seqn = buffer.getOldestSeqNum();
90
91                 if (forcedresize) {
92                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
93                         s.addEntry(status);
94                 }
95
96                 if ((numslots - buffer.size()) < FREE_SLOTS) {
97                         /* have to check whether we have enough free slots */
98                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
99                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
100                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
101                                 Slot prevslot=buffer.getSlot(seqn);
102                                 if (!prevslot.isLive())
103                                         continue;
104                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
105                                 for(Entry liveentry:liveentries) {
106                                         if (redundant(liveentry))
107                                                 continue;
108                                         if (s.hasSpace(liveentry))
109                                                 s.addEntry(liveentry);
110                                         else if (i==0) {
111                                                 if (s.canFit(liveentry))
112                                                         s.addEntry(liveentry);
113                                                 else if (!forcedresize) {
114                                                         return tryput(key, value, true);
115                                                 }
116                                         }
117                                 }
118                         }
119                 }
120                 KeyValue kv=new KeyValue(s, key, value);
121                 boolean insertedkv=false;
122                 if (s.hasSpace(kv)) {
123                         s.addEntry(kv);
124                         insertedkv=true;
125                 }
126
127                 long newestseqnum=buffer.getNewestSeqNum();
128 search:
129                 for(; seqn<=newestseqnum; seqn++) {
130                         Slot prevslot=buffer.getSlot(seqn);
131                         if (!prevslot.isLive())
132                                 continue;
133                         Vector<Entry> liveentries = prevslot.getLiveEntries();
134                         for(Entry liveentry:liveentries) {
135                                 if (redundant(liveentry))
136                                         continue;
137                                 if (s.hasSpace(liveentry))
138                                         s.addEntry(liveentry);
139                                 else
140                                         break search;
141                         }
142                 }
143
144                 int max=0;
145                 if (forcedresize)
146                         max = numslots + FORCED_RESIZE_INCREMENT;
147                 Slot[] array=cloud.putSlot(s, max);
148                 if (array == null)
149                         array = new Slot[] {s};
150                 else
151                         insertedkv=false;
152
153                 /* update data structure */
154                 validateandupdate(array, true);
155
156                 return insertedkv;
157         }
158
159         boolean redundant(Entry liveentry) {
160                 if (liveentry.getType()==Entry.TypeLastMessage) {
161                         LastMessage lastmsg=(LastMessage) liveentry;
162                         return lastmsg.getMachineID() == localmachineid;
163                 }
164                 return false;
165         }
166
167
168         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
169                 /* The cloud communication layer has checked slot HMACs already
170                          before decoding */
171                 if (newslots.length==0)
172                         return;
173
174                 long firstseqnum=newslots[0].getSequenceNumber();
175                 if (firstseqnum <= sequencenumber)
176                         throw new Error("Server Error: Sent older slots!");
177
178                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
179                 checkHMACChain(indexer, newslots);
180
181                 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
182                 
183                 initExpectedSize();
184                 for(Slot slot: newslots) {
185                         updateExpectedSize();
186                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
187                 }
188
189                 /* If there is a gap, check to see if the server sent us everything. */
190                 if (firstseqnum != (sequencenumber+1)) {
191                         checkNumSlots(newslots.length);
192                         if (!machineSet.isEmpty())
193                                 throw new Error("Missing record for machines: "+machineSet);
194                 }
195
196                 commitNewMaxSize();
197
198                 /* Commit new to slots. */
199                 for(Slot slot:newslots) {
200                         buffer.putSlot(slot);
201                 }
202                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
203         }
204
205         private int expectedsize, currmaxsize;
206
207         private void checkNumSlots(int numslots) {
208                 if (numslots != expectedsize)
209                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
210         }
211
212         private void initExpectedSize() {
213                 long prevslots = sequencenumber;
214                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
215                 currmaxsize = numslots;
216         }
217
218         private void updateExpectedSize() {
219                 expectedsize++;
220                 if (expectedsize > currmaxsize)
221                         expectedsize = currmaxsize;
222         }
223
224         private void updateCurrMaxSize(int newmaxsize) {
225                 currmaxsize=newmaxsize;
226         }
227
228         private void commitNewMaxSize() {
229                 if (numslots != currmaxsize)
230                         buffer.resize(currmaxsize);
231
232                 numslots=currmaxsize;
233         }
234
235         private void processEntry(KeyValue entry, SlotIndexer indexer) {
236                 IoTString key=entry.getKey();
237                 KeyValue oldvalue=table.get(key);
238                 if (oldvalue != null) {
239                         oldvalue.setDead();
240                 }
241                 table.put(key, entry);
242         }
243
244         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
245                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
246         }
247
248         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
249                 long oldseqnum=entry.getOldSeqNum();
250                 long newseqnum=entry.getNewSeqNum();
251                 boolean isequal=entry.getEqual();
252                 long machineid=entry.getMachineID();
253                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
254                         Slot slot=indexer.getSlot(seqnum);
255                         if (slot != null) {
256                                 long slotmachineid=slot.getMachineID();
257                                 if (isequal!=(slotmachineid==machineid)) {
258                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
259                                 }
260                         }
261                 }
262         }
263
264         private void processEntry(TableStatus entry, SlotIndexer indexer) {
265                 int newnumslots=entry.getMaxSlots();
266                 updateCurrMaxSize(newnumslots);
267                 if (lastTableStatus != null)
268                         lastTableStatus.setDead();
269                 lastTableStatus = entry;
270         }
271
272         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
273                 machineSet.remove(machineid);
274                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
275                 if (lastmsgentry == null)
276                         return;
277
278                 long lastmsgseqnum = lastmsgentry.getFirst();
279                 Liveness lastentry = lastmsgentry.getSecond();
280                 if (lastentry instanceof LastMessage) {
281                         ((LastMessage)lastentry).setDead();
282                 } else if (lastentry instanceof Slot) {
283                         ((Slot)lastentry).setDead();
284                 } else {
285                         throw new Error("Unrecognized type");
286                 }
287
288                 if (machineid == localmachineid) {
289                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
290                                 throw new Error("Server Error: Mismatch on local machine sequence number");
291                 } else {
292                         if (lastmsgseqnum > seqnum)
293                                 throw new Error("Server Error: Rollback on remote machine sequence number");
294                 }
295         }
296
297         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
298                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
299
300                 for(Entry entry : slot.getEntries()) {
301                         switch(entry.getType()) {
302                         case Entry.TypeKeyValue:
303                                 processEntry((KeyValue)entry, indexer);
304                                 break;
305
306                         case Entry.TypeLastMessage:
307                                 processEntry((LastMessage)entry, indexer, machineSet);
308                                 break;
309
310                         case Entry.TypeRejectedMessage:
311                                 processEntry((RejectedMessage)entry, indexer);
312                                 break;
313
314                         case Entry.TypeTableStatus:
315                                 processEntry((TableStatus)entry, indexer);
316                                 break;
317
318                         default:
319                                 throw new Error("Unrecognized type: "+entry.getType());
320                         }
321                 }
322         }
323
324         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
325                 for(int i=0; i < newslots.length; i++) {
326                         Slot currslot=newslots[i];
327                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
328                         if (prevslot != null &&
329                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
330                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
331                 }
332         }
333 }