2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
10 * IoTTable data structure. Provides client inferface.
11 * @author Brian Demsky
16 final public class Table {
18 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
19 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
20 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
21 private SlotBuffer buffer;
22 private CloudComm cloud;
23 private long sequencenumber;
24 private long localmachineid;
25 private TableStatus lastTableStatus;
26 static final int FREE_SLOTS = 10;
27 static final int FORCED_RESIZE_INCREMENT = 20;
29 public Table(String baseurl, String password, long _localmachineid) {
30 localmachineid=_localmachineid;
31 buffer = new SlotBuffer();
32 numslots = buffer.capacity();
34 cloud=new CloudComm(baseurl, password);
37 public Table(CloudComm _cloud, long _localmachineid) {
38 localmachineid=_localmachineid;
39 buffer = new SlotBuffer();
40 numslots = buffer.capacity();
45 public void rebuild() {
46 Slot[] newslots=cloud.getSlots(sequencenumber+1);
47 validateandupdate(newslots, true);
50 public void update() {
51 Slot[] newslots=cloud.getSlots(sequencenumber+1);
53 validateandupdate(newslots, false);
56 public IoTString get(IoTString key) {
57 KeyValue kv=table.get(key);
64 public void initTable() {
65 cloud.setSalt();//Set the salt
66 Slot s=new Slot(1, localmachineid);
67 TableStatus status=new TableStatus(s, numslots);
69 Slot[] array=cloud.putSlot(s, numslots);
71 array = new Slot[] {s};
72 /* update data structure */
73 validateandupdate(array, true);
75 throw new Error("Error on initialization");
79 public String toString() {
80 return table.toString();
83 public IoTString put(IoTString key, IoTString value) {
85 KeyValue oldvalue=table.get(key);
86 if (tryput(key, value, false)) {
90 return oldvalue.getValue();
95 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
96 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
97 long seqn = buffer.getOldestSeqNum();
100 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
104 if ((numslots - buffer.size()) < FREE_SLOTS) {
105 /* have to check whether we have enough free slots */
106 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
107 seqn = fullfirstseqn < 1?1:fullfirstseqn;
108 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
109 Slot prevslot=buffer.getSlot(seqn);
110 if (!prevslot.isLive())
112 Vector<Entry> liveentries = prevslot.getLiveEntries();
113 for(Entry liveentry:liveentries) {
114 if (s.hasSpace(liveentry))
115 s.addEntry(liveentry);
117 if (s.canFit(liveentry))
118 s.addEntry(liveentry);
119 else if (!forcedresize) {
120 return tryput(key, value, true);
126 KeyValue kv=new KeyValue(s, key, value);
127 boolean insertedkv=false;
128 if (s.hasSpace(kv)) {
133 long newestseqnum=buffer.getNewestSeqNum();
135 for(; seqn<=newestseqnum; seqn++) {
136 Slot prevslot=buffer.getSlot(seqn);
137 if (!prevslot.isLive())
139 Vector<Entry> liveentries = prevslot.getLiveEntries();
140 for(Entry liveentry:liveentries) {
141 if (s.hasSpace(liveentry))
142 s.addEntry(liveentry);
150 max = numslots + FORCED_RESIZE_INCREMENT;
151 Slot[] array=cloud.putSlot(s, max);
153 array = new Slot[] {s};
157 /* update data structure */
158 validateandupdate(array, true);
163 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
164 /* The cloud communication layer has checked slot HMACs already
166 if (newslots.length==0)
169 long firstseqnum=newslots[0].getSequenceNumber();
170 if (firstseqnum <= sequencenumber)
171 throw new Error("Server Error: Sent older slots!");
173 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
174 checkHMACChain(indexer, newslots);
176 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
179 for(Slot slot: newslots) {
180 updateExpectedSize();
181 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
184 /* If there is a gap, check to see if the server sent us everything. */
185 if (firstseqnum != (sequencenumber+1)) {
186 checkNumSlots(newslots.length);
187 if (!machineSet.isEmpty())
188 throw new Error("Missing record for machines: "+machineSet);
193 /* Commit new to slots. */
194 for(Slot slot:newslots) {
195 buffer.putSlot(slot);
197 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
200 private int expectedsize, currmaxsize;
202 private void checkNumSlots(int numslots) {
203 if (numslots != expectedsize)
204 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
207 private void initExpectedSize() {
208 long prevslots = sequencenumber;
209 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
210 currmaxsize = numslots;
213 private void updateExpectedSize() {
215 if (expectedsize > currmaxsize)
216 expectedsize = currmaxsize;
219 private void updateCurrMaxSize(int newmaxsize) {
220 currmaxsize=newmaxsize;
223 private void commitNewMaxSize() {
224 if (numslots != currmaxsize)
225 buffer.resize(currmaxsize);
227 numslots=currmaxsize;
230 private void processEntry(KeyValue entry, SlotIndexer indexer) {
231 IoTString key=entry.getKey();
232 KeyValue oldvalue=table.get(key);
233 if (oldvalue != null) {
236 table.put(key, entry);
239 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
240 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
243 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
244 long oldseqnum=entry.getOldSeqNum();
245 long newseqnum=entry.getNewSeqNum();
246 boolean isequal=entry.getEqual();
247 long machineid=entry.getMachineID();
248 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
249 Slot slot=indexer.getSlot(seqnum);
251 long slotmachineid=slot.getMachineID();
252 if (isequal!=(slotmachineid==machineid)) {
253 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
258 HashSet<Long> watchset=new HashSet<Long>();
259 for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
260 long entry_mid=lastmsg_entry.getKey();
261 /* We've seen it, don't need to continue to watch. Our next
262 * message will implicitly acknowledge it. */
263 if (entry_mid == localmachineid)
265 Pair<Long, Liveness> v=lastmsg_entry.getValue();
266 long entry_seqn=v.getFirst();
267 if (entry_seqn < newseqnum) {
268 addWatchList(entry_mid, entry);
269 watchset.add(entry_mid);
272 if (watchset.isEmpty())
275 entry.setWatchSet(watchset);
278 private void addWatchList(long machineid, RejectedMessage entry) {
279 HashSet<RejectedMessage> entries=watchlist.get(machineid);
281 watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
285 private void processEntry(TableStatus entry, SlotIndexer indexer) {
286 int newnumslots=entry.getMaxSlots();
287 updateCurrMaxSize(newnumslots);
288 if (lastTableStatus != null)
289 lastTableStatus.setDead();
290 lastTableStatus = entry;
293 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
294 machineSet.remove(machineid);
296 HashSet<RejectedMessage> watchset=watchlist.get(machineid);
297 if (watchset != null) {
298 for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
299 RejectedMessage rm=rmit.next();
300 if (rm.getNewSeqNum() <= seqnum) {
301 /* Remove it from our watchlist */
303 /* Decrement machines that need to see this notification */
304 rm.removeWatcher(machineid);
309 if (machineid == localmachineid) {
310 /* Our own messages are immediately dead. */
311 if (liveness instanceof LastMessage) {
312 ((LastMessage)liveness).setDead();
313 } else if (liveness instanceof Slot) {
314 ((Slot)liveness).setDead();
316 throw new Error("Unrecognized type");
321 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
322 if (lastmsgentry == null)
325 long lastmsgseqnum = lastmsgentry.getFirst();
326 Liveness lastentry = lastmsgentry.getSecond();
327 if (machineid != localmachineid) {
328 if (lastentry instanceof LastMessage) {
329 ((LastMessage)lastentry).setDead();
330 } else if (lastentry instanceof Slot) {
331 ((Slot)lastentry).setDead();
333 throw new Error("Unrecognized type");
337 if (machineid == localmachineid) {
338 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
339 throw new Error("Server Error: Mismatch on local machine sequence number");
341 if (lastmsgseqnum > seqnum)
342 throw new Error("Server Error: Rollback on remote machine sequence number");
346 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
347 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
348 for(Entry entry : slot.getEntries()) {
349 switch(entry.getType()) {
350 case Entry.TypeKeyValue:
351 processEntry((KeyValue)entry, indexer);
354 case Entry.TypeLastMessage:
355 processEntry((LastMessage)entry, indexer, machineSet);
358 case Entry.TypeRejectedMessage:
359 processEntry((RejectedMessage)entry, indexer);
362 case Entry.TypeTableStatus:
363 processEntry((TableStatus)entry, indexer);
367 throw new Error("Unrecognized type: "+entry.getType());
372 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
373 for(int i=0; i < newslots.length; i++) {
374 Slot currslot=newslots[i];
375 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
376 if (prevslot != null &&
377 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
378 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);