2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
11 * IoTTable data structure. Provides client inferface.
12 * @author Brian Demsky
16 final public class Table {
18 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21 private SlotBuffer buffer;
22 private CloudComm cloud;
23 private long sequencenumber;
24 private long localmachineid;
25 private TableStatus lastTableStatus;
26 static final int FREE_SLOTS = 10;
27 static final int SKIP_THRESHOLD = 10;
28 private long liveslotcount=0;
30 static final double RESIZE_MULTIPLE = 1.2;
31 static final double RESIZE_THRESHOLD = 0.75;
32 private int resizethreshold;
33 private long lastliveslotseqn;
34 private Random random=new Random();
36 public Table(String baseurl, String password, long _localmachineid) {
37 localmachineid=_localmachineid;
38 buffer = new SlotBuffer();
39 numslots = buffer.capacity();
42 cloud=new CloudComm(this, baseurl, password);
46 public Table(CloudComm _cloud, long _localmachineid) {
47 localmachineid=_localmachineid;
48 buffer = new SlotBuffer();
49 numslots = buffer.capacity();
55 public void rebuild() {
56 Slot[] newslots=cloud.getSlots(sequencenumber+1);
57 validateandupdate(newslots, true);
60 public void update() {
61 Slot[] newslots=cloud.getSlots(sequencenumber+1);
63 validateandupdate(newslots, false);
66 public IoTString get(IoTString key) {
67 KeyValue kv=table.get(key);
74 public void initTable() {
75 cloud.setSalt();//Set the salt
76 Slot s=new Slot(this, 1, localmachineid);
77 TableStatus status=new TableStatus(s, numslots);
79 Slot[] array=cloud.putSlot(s, numslots);
81 array = new Slot[] {s};
82 /* update data structure */
83 validateandupdate(array, true);
85 throw new Error("Error on initialization");
89 public String toString() {
90 return table.toString();
93 public IoTString put(IoTString key, IoTString value) {
95 KeyValue oldvalue=table.get(key);
96 if (tryput(key, value, false)) {
100 return oldvalue.getValue();
105 void decrementLiveCount() {
109 private void setResizeThreshold() {
110 int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
111 resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
114 private boolean tryput(IoTString key, IoTString value, boolean resize) {
115 Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
117 if (liveslotcount > resizethreshold) {
118 System.out.print("A");
120 newsize = (int) (numslots * RESIZE_MULTIPLE);
124 newsize = (int) (numslots * RESIZE_MULTIPLE);
125 TableStatus status=new TableStatus(s, newsize);
129 long newestseqnum = buffer.getNewestSeqNum();
130 long oldestseqnum = buffer.getOldestSeqNum();
131 if (lastliveslotseqn < oldestseqnum)
132 lastliveslotseqn = oldestseqnum;
134 long seqn = lastliveslotseqn;
135 boolean seenliveslot = false;
136 long firstiffull = newestseqnum + 1 - numslots;
137 long threshold = firstiffull + FREE_SLOTS;
139 for(; seqn < threshold; seqn++) {
140 Slot prevslot=buffer.getSlot(seqn);
141 //Push slot number forward
143 lastliveslotseqn = seqn;
145 if (!prevslot.isLive())
148 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
149 for(Entry liveentry:liveentries) {
150 if (s.hasSpace(liveentry)) {
151 s.addEntry(liveentry);
152 } else if (seqn==firstiffull) {
154 System.out.print("B");
155 return tryput(key, value, true);
161 KeyValue kv=new KeyValue(s, key, value);
162 boolean insertedkv=false;
163 if (s.hasSpace(kv)) {
170 for(; seqn <= newestseqnum; seqn++) {
171 Slot prevslot=buffer.getSlot(seqn);
172 //Push slot number forward
174 lastliveslotseqn = seqn;
176 if (!prevslot.isLive())
179 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
180 for(Entry liveentry:liveentries) {
181 if (s.hasSpace(liveentry))
182 s.addEntry(liveentry);
185 if (skipcount > SKIP_THRESHOLD)
194 Slot[] array=cloud.putSlot(s, max);
196 array = new Slot[] {s};
200 /* update data structure */
201 validateandupdate(array, true);
206 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
207 /* The cloud communication layer has checked slot HMACs already
209 if (newslots.length==0)
212 long firstseqnum=newslots[0].getSequenceNumber();
213 if (firstseqnum <= sequencenumber)
214 throw new Error("Server Error: Sent older slots!");
216 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
217 checkHMACChain(indexer, newslots);
219 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
221 initExpectedSize(firstseqnum);
222 for(Slot slot: newslots) {
223 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
224 updateExpectedSize();
227 /* If there is a gap, check to see if the server sent us everything. */
228 if (firstseqnum != (sequencenumber+1)) {
229 checkNumSlots(newslots.length);
230 if (!machineSet.isEmpty())
231 throw new Error("Missing record for machines: "+machineSet);
236 /* Commit new to slots. */
237 for(Slot slot:newslots) {
238 buffer.putSlot(slot);
241 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
244 private int expectedsize, currmaxsize;
246 private void checkNumSlots(int numslots) {
247 if (numslots != expectedsize)
248 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
251 private void initExpectedSize(long firstsequencenumber) {
252 long prevslots = firstsequencenumber;
253 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
254 currmaxsize = numslots;
257 private void updateExpectedSize() {
259 if (expectedsize > currmaxsize)
260 expectedsize = currmaxsize;
263 private void updateCurrMaxSize(int newmaxsize) {
264 currmaxsize=newmaxsize;
267 private void commitNewMaxSize() {
268 if (numslots != currmaxsize)
269 buffer.resize(currmaxsize);
271 numslots=currmaxsize;
272 setResizeThreshold();
275 private void processEntry(KeyValue entry, SlotIndexer indexer) {
276 IoTString key=entry.getKey();
277 KeyValue oldvalue=table.get(key);
278 if (oldvalue != null) {
281 table.put(key, entry);
284 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
285 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
288 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
289 long oldseqnum=entry.getOldSeqNum();
290 long newseqnum=entry.getNewSeqNum();
291 boolean isequal=entry.getEqual();
292 long machineid=entry.getMachineID();
293 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
294 Slot slot=indexer.getSlot(seqnum);
296 long slotmachineid=slot.getMachineID();
297 if (isequal!=(slotmachineid==machineid)) {
298 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
303 HashSet<Long> watchset=new HashSet<Long>();
304 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
305 long entry_mid=lastmsg_entry.getKey();
306 /* We've seen it, don't need to continue to watch. Our next
307 * message will implicitly acknowledge it. */
308 if (entry_mid == localmachineid)
310 Pair<Long, Liveness> v=lastmsg_entry.getValue();
311 long entry_seqn=v.getFirst();
312 if (entry_seqn < newseqnum) {
313 addWatchList(entry_mid, entry);
314 watchset.add(entry_mid);
317 if (watchset.isEmpty())
320 entry.setWatchSet(watchset);
323 private void addWatchList(long machineid, RejectedMessage entry) {
324 HashSet<RejectedMessage> entries=watchlist.get(machineid);
326 watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
330 private void processEntry(TableStatus entry, SlotIndexer indexer) {
331 int newnumslots=entry.getMaxSlots();
332 updateCurrMaxSize(newnumslots);
333 if (lastTableStatus != null)
334 lastTableStatus.setDead();
335 lastTableStatus = entry;
338 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
339 machineSet.remove(machineid);
341 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
342 if (watchset != null) {
343 for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
344 RejectedMessage rm=rmit.next();
345 if (rm.getNewSeqNum() <= seqnum) {
346 /* Remove it from our watchlist */
348 /* Decrement machines that need to see this notification */
349 rm.removeWatcher(machineid);
354 if (machineid == localmachineid) {
355 /* Our own messages are immediately dead. */
356 if (liveness instanceof LastMessage) {
357 ((LastMessage)liveness).setDead();
358 } else if (liveness instanceof Slot) {
359 ((Slot)liveness).setDead();
361 throw new Error("Unrecognized type");
366 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
367 if (lastmsgentry == null)
370 long lastmsgseqnum = lastmsgentry.getFirst();
371 Liveness lastentry = lastmsgentry.getSecond();
372 if (machineid != localmachineid) {
373 if (lastentry instanceof LastMessage) {
374 ((LastMessage)lastentry).setDead();
375 } else if (lastentry instanceof Slot) {
376 ((Slot)lastentry).setDead();
378 throw new Error("Unrecognized type");
382 if (machineid == localmachineid) {
383 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
384 throw new Error("Server Error: Mismatch on local machine sequence number");
386 if (lastmsgseqnum > seqnum)
387 throw new Error("Server Error: Rollback on remote machine sequence number");
391 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
392 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
393 for(Entry entry : slot.getEntries()) {
394 switch(entry.getType()) {
395 case Entry.TypeKeyValue:
396 processEntry((KeyValue)entry, indexer);
399 case Entry.TypeLastMessage:
400 processEntry((LastMessage)entry, indexer, machineSet);
403 case Entry.TypeRejectedMessage:
404 processEntry((RejectedMessage)entry, indexer);
407 case Entry.TypeTableStatus:
408 processEntry((TableStatus)entry, indexer);
412 throw new Error("Unrecognized type: "+entry.getType());
417 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
418 for(int i=0; i < newslots.length; i++) {
419 Slot currslot=newslots[i];
420 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
421 if (prevslot != null &&
422 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
423 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);