2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
11 * IoTTable data structure. Provides client inferface.
12 * @author Brian Demsky
16 final public class Table {
18 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21 private SlotBuffer buffer;
22 private CloudComm cloud;
23 private long sequencenumber;
24 private long localmachineid;
25 private TableStatus lastTableStatus;
26 static final int FREE_SLOTS = 10;
27 static final int SKIP_THRESHOLD = 10;
28 private long liveslotcount=0;
30 static final double RESIZE_MULTIPLE = 1.2;
31 static final double RESIZE_THRESHOLD = 0.75;
32 private int resizethreshold;
33 private long lastliveslotseqn;
34 private Random random;
36 public Table(String baseurl, String password, long _localmachineid) {
37 localmachineid=_localmachineid;
38 buffer = new SlotBuffer();
39 numslots = buffer.capacity();
42 cloud=new CloudComm(this, baseurl, password);
46 public Table(CloudComm _cloud, long _localmachineid) {
47 localmachineid=_localmachineid;
48 buffer = new SlotBuffer();
49 numslots = buffer.capacity();
55 public void rebuild() {
56 Slot[] newslots=cloud.getSlots(sequencenumber+1);
57 validateandupdate(newslots, true);
60 public void update() {
61 Slot[] newslots=cloud.getSlots(sequencenumber+1);
63 validateandupdate(newslots, false);
66 public IoTString get(IoTString key) {
67 KeyValue kv=table.get(key);
74 public void initTable() {
75 cloud.setSalt();//Set the salt
76 Slot s=new Slot(this, 1, localmachineid);
77 TableStatus status=new TableStatus(s, numslots);
79 Slot[] array=cloud.putSlot(s, numslots);
81 array = new Slot[] {s};
82 /* update data structure */
83 validateandupdate(array, true);
85 throw new Error("Error on initialization");
89 public String toString() {
90 return table.toString();
93 public IoTString put(IoTString key, IoTString value) {
95 KeyValue oldvalue=table.get(key);
96 if (tryput(key, value, false)) {
100 return oldvalue.getValue();
105 void decrementLiveCount() {
109 private void setResizeThreshold() {
110 int resize_lower=(int) RESIZE_THRESHOLD * numslots;
111 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
114 private boolean tryput(IoTString key, IoTString value, boolean resize) {
115 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
117 if (liveslotcount > resizethreshold) {
119 newsize = (int) (numslots * RESIZE_MULTIPLE);
124 newsize = (int) (numslots * RESIZE_MULTIPLE);
125 TableStatus status=new TableStatus(s, newsize);
129 long newestseqnum = buffer.getNewestSeqNum();
130 long oldestseqnum = buffer.getOldestSeqNum();
131 if (lastliveslotseqn < oldestseqnum)
132 lastliveslotseqn = oldestseqnum;
134 long seqn = lastliveslotseqn;
135 boolean seenliveslot = false;
136 long firstiffull = newestseqnum + 1 - numslots;
137 long threshold = firstiffull + FREE_SLOTS;
139 for(; seqn < threshold; seqn++) {
140 Slot prevslot=buffer.getSlot(seqn);
141 //Push slot number forward
143 lastliveslotseqn = seqn;
145 if (!prevslot.isLive())
148 Vector<Entry> liveentries = prevslot.getLiveEntries();
149 for(Entry liveentry:liveentries) {
150 if (s.hasSpace(liveentry)) {
151 s.addEntry(liveentry);
152 } else if (seqn==firstiffull) {
154 return tryput(key, value, true);
160 KeyValue kv=new KeyValue(s, key, value);
161 boolean insertedkv=false;
162 if (s.hasSpace(kv)) {
169 for(; seqn <= newestseqnum; seqn++) {
170 Slot prevslot=buffer.getSlot(seqn);
171 //Push slot number forward
173 lastliveslotseqn = seqn;
175 if (!prevslot.isLive())
178 Vector<Entry> liveentries = prevslot.getLiveEntries();
179 for(Entry liveentry:liveentries) {
180 if (s.hasSpace(liveentry))
181 s.addEntry(liveentry);
184 if (skipcount > SKIP_THRESHOLD)
193 Slot[] array=cloud.putSlot(s, max);
195 array = new Slot[] {s};
199 /* update data structure */
200 validateandupdate(array, true);
205 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
206 /* The cloud communication layer has checked slot HMACs already
208 if (newslots.length==0)
211 long firstseqnum=newslots[0].getSequenceNumber();
212 if (firstseqnum <= sequencenumber)
213 throw new Error("Server Error: Sent older slots!");
215 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
216 checkHMACChain(indexer, newslots);
218 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
221 for(Slot slot: newslots) {
222 updateExpectedSize();
223 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
226 /* If there is a gap, check to see if the server sent us everything. */
227 if (firstseqnum != (sequencenumber+1)) {
228 checkNumSlots(newslots.length);
229 if (!machineSet.isEmpty())
230 throw new Error("Missing record for machines: "+machineSet);
235 /* Commit new to slots. */
236 for(Slot slot:newslots) {
237 buffer.putSlot(slot);
240 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
243 private int expectedsize, currmaxsize;
245 private void checkNumSlots(int numslots) {
246 if (numslots != expectedsize)
247 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
250 private void initExpectedSize() {
251 long prevslots = sequencenumber;
252 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
253 currmaxsize = numslots;
256 private void updateExpectedSize() {
258 if (expectedsize > currmaxsize)
259 expectedsize = currmaxsize;
262 private void updateCurrMaxSize(int newmaxsize) {
263 currmaxsize=newmaxsize;
266 private void commitNewMaxSize() {
267 if (numslots != currmaxsize)
268 buffer.resize(currmaxsize);
270 numslots=currmaxsize;
271 setResizeThreshold();
274 private void processEntry(KeyValue entry, SlotIndexer indexer) {
275 IoTString key=entry.getKey();
276 KeyValue oldvalue=table.get(key);
277 if (oldvalue != null) {
280 table.put(key, entry);
283 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
284 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
287 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
288 long oldseqnum=entry.getOldSeqNum();
289 long newseqnum=entry.getNewSeqNum();
290 boolean isequal=entry.getEqual();
291 long machineid=entry.getMachineID();
292 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
293 Slot slot=indexer.getSlot(seqnum);
295 long slotmachineid=slot.getMachineID();
296 if (isequal!=(slotmachineid==machineid)) {
297 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
302 HashSet<Long> watchset=new HashSet<Long>();
303 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
304 long entry_mid=lastmsg_entry.getKey();
305 /* We've seen it, don't need to continue to watch. Our next
306 * message will implicitly acknowledge it. */
307 if (entry_mid == localmachineid)
309 Pair<Long, Liveness> v=lastmsg_entry.getValue();
310 long entry_seqn=v.getFirst();
311 if (entry_seqn < newseqnum) {
312 addWatchList(entry_mid, entry);
313 watchset.add(entry_mid);
316 if (watchset.isEmpty())
319 entry.setWatchSet(watchset);
322 private void addWatchList(long machineid, RejectedMessage entry) {
323 HashSet<RejectedMessage> entries=watchlist.get(machineid);
325 watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
329 private void processEntry(TableStatus entry, SlotIndexer indexer) {
330 int newnumslots=entry.getMaxSlots();
331 updateCurrMaxSize(newnumslots);
332 if (lastTableStatus != null)
333 lastTableStatus.setDead();
334 lastTableStatus = entry;
337 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
338 machineSet.remove(machineid);
340 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
341 if (watchset != null) {
342 for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
343 RejectedMessage rm=rmit.next();
344 if (rm.getNewSeqNum() <= seqnum) {
345 /* Remove it from our watchlist */
347 /* Decrement machines that need to see this notification */
348 rm.removeWatcher(machineid);
353 if (machineid == localmachineid) {
354 /* Our own messages are immediately dead. */
355 if (liveness instanceof LastMessage) {
356 ((LastMessage)liveness).setDead();
357 } else if (liveness instanceof Slot) {
358 ((Slot)liveness).setDead();
360 throw new Error("Unrecognized type");
365 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
366 if (lastmsgentry == null)
369 long lastmsgseqnum = lastmsgentry.getFirst();
370 Liveness lastentry = lastmsgentry.getSecond();
371 if (machineid != localmachineid) {
372 if (lastentry instanceof LastMessage) {
373 ((LastMessage)lastentry).setDead();
374 } else if (lastentry instanceof Slot) {
375 ((Slot)lastentry).setDead();
377 throw new Error("Unrecognized type");
381 if (machineid == localmachineid) {
382 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
383 throw new Error("Server Error: Mismatch on local machine sequence number");
385 if (lastmsgseqnum > seqnum)
386 throw new Error("Server Error: Rollback on remote machine sequence number");
390 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
391 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
392 for(Entry entry : slot.getEntries()) {
393 switch(entry.getType()) {
394 case Entry.TypeKeyValue:
395 processEntry((KeyValue)entry, indexer);
398 case Entry.TypeLastMessage:
399 processEntry((LastMessage)entry, indexer, machineSet);
402 case Entry.TypeRejectedMessage:
403 processEntry((RejectedMessage)entry, indexer);
406 case Entry.TypeTableStatus:
407 processEntry((TableStatus)entry, indexer);
411 throw new Error("Unrecognized type: "+entry.getType());
416 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
417 for(int i=0; i < newslots.length; i++) {
418 Slot currslot=newslots[i];
419 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
420 if (prevslot != null &&
421 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
422 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);