2 import java.util.HashMap;
3 import java.util.HashSet;
4 import java.util.Arrays;
5 import java.util.Vector;
8 * IoTTable data structure. Provides client inferface.
14 final public class Table {
16 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
17 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
18 private SlotBuffer buffer;
19 private CloudComm cloud;
20 private long sequencenumber;
21 private long localmachineid;
22 private TableStatus lastTableStatus;
23 static final int FREE_SLOTS = 10;
24 static final int FORCED_RESIZE_INCREMENT = 20;
26 public Table(String baseurl, String password, long _localmachineid) {
27 localmachineid=_localmachineid;
28 buffer = new SlotBuffer();
29 numslots = buffer.capacity();
31 cloud=new CloudComm(baseurl, password);
34 public Table(CloudComm _cloud, long _localmachineid) {
35 localmachineid=_localmachineid;
36 buffer = new SlotBuffer();
37 numslots = buffer.capacity();
42 public void rebuild() {
43 Slot[] newslots=cloud.getSlots(sequencenumber+1);
44 validateandupdate(newslots, true);
47 public void update() {
48 Slot[] newslots=cloud.getSlots(sequencenumber+1);
50 validateandupdate(newslots, false);
53 public IoTString get(IoTString key) {
54 KeyValue kv=table.get(key);
61 public void initTable() {
62 Slot s=new Slot(1, localmachineid);
63 TableStatus status=new TableStatus(s, numslots);
65 Slot[] array=cloud.putSlot(s, numslots);
67 array = new Slot[] {s};
68 /* update data structure */
69 validateandupdate(array, true);
71 throw new Error("Error on initialization");
75 public IoTString put(IoTString key, IoTString value) {
77 KeyValue oldvalue=table.get(key);
78 if (tryput(key, value, false)) {
82 return oldvalue.getValue();
87 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
88 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
89 long seqn = buffer.getOldestSeqNum();
92 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
96 if ((numslots - buffer.size()) < FREE_SLOTS) {
97 /* have to check whether we have enough free slots */
98 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
99 seqn = fullfirstseqn < 1?1:fullfirstseqn;
100 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
101 Slot prevslot=buffer.getSlot(seqn);
102 if (!prevslot.isLive())
104 Vector<Entry> liveentries = prevslot.getLiveEntries();
105 for(Entry liveentry:liveentries) {
106 if (redundant(liveentry))
108 if (s.hasSpace(liveentry))
109 s.addEntry(liveentry);
111 if (s.canFit(liveentry))
112 s.addEntry(liveentry);
113 else if (!forcedresize) {
114 return tryput(key, value, true);
120 KeyValue kv=new KeyValue(s, key, value);
121 boolean insertedkv=false;
122 if (s.hasSpace(kv)) {
127 long newestseqnum=buffer.getNewestSeqNum();
129 for(; seqn<=newestseqnum; seqn++) {
130 Slot prevslot=buffer.getSlot(seqn);
131 if (!prevslot.isLive())
133 Vector<Entry> liveentries = prevslot.getLiveEntries();
134 for(Entry liveentry:liveentries) {
135 if (redundant(liveentry))
137 if (s.hasSpace(liveentry))
138 s.addEntry(liveentry);
146 max = numslots + FORCED_RESIZE_INCREMENT;
147 Slot[] array=cloud.putSlot(s, max);
149 array = new Slot[] {s};
153 /* update data structure */
154 validateandupdate(array, true);
159 boolean redundant(Entry liveentry) {
160 if (liveentry.getType()==Entry.TypeLastMessage) {
161 LastMessage lastmsg=(LastMessage) liveentry;
162 return lastmsg.getMachineID() == localmachineid;
168 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
169 /* The cloud communication layer has checked slot HMACs already
171 if (newslots.length==0)
174 long firstseqnum=newslots[0].getSequenceNumber();
175 if (firstseqnum <= sequencenumber)
176 throw new Error("Server Error: Sent older slots!");
178 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
179 checkHMACChain(indexer, newslots);
181 HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
184 for(Slot slot: newslots) {
185 updateExpectedSize();
186 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
189 /* If there is a gap, check to see if the server sent us everything. */
190 if (firstseqnum != (sequencenumber+1)) {
191 checkNumSlots(newslots.length);
192 if (!machineSet.isEmpty())
193 throw new Error("Missing record for machines: "+machineSet);
198 /* Commit new to slots. */
199 for(Slot slot:newslots) {
200 buffer.putSlot(slot);
202 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
205 private int expectedsize, currmaxsize;
207 private void checkNumSlots(int numslots) {
208 if (numslots != expectedsize)
209 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
212 private void initExpectedSize() {
213 long prevslots = sequencenumber;
214 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
215 currmaxsize = numslots;
218 private void updateExpectedSize() {
220 if (expectedsize > currmaxsize)
221 expectedsize = currmaxsize;
224 private void updateCurrMaxSize(int newmaxsize) {
225 currmaxsize=newmaxsize;
228 private void commitNewMaxSize() {
229 if (numslots != currmaxsize)
230 buffer.resize(currmaxsize);
232 numslots=currmaxsize;
235 private void processEntry(KeyValue entry, SlotIndexer indexer) {
236 IoTString key=entry.getKey();
237 KeyValue oldvalue=table.get(key);
238 if (oldvalue != null) {
241 table.put(key, entry);
244 private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
245 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
248 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
249 long oldseqnum=entry.getOldSeqNum();
250 long newseqnum=entry.getNewSeqNum();
251 boolean isequal=entry.getEqual();
252 long machineid=entry.getMachineID();
253 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
254 Slot slot=indexer.getSlot(seqnum);
256 long slotmachineid=slot.getMachineID();
257 if (isequal!=(slotmachineid==machineid)) {
258 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
264 private void processEntry(TableStatus entry, SlotIndexer indexer) {
265 int newnumslots=entry.getMaxSlots();
266 updateCurrMaxSize(newnumslots);
267 if (lastTableStatus != null)
268 lastTableStatus.setDead();
269 lastTableStatus = entry;
272 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
273 machineSet.remove(machineid);
274 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
275 if (lastmsgentry == null)
278 long lastmsgseqnum = lastmsgentry.getFirst();
279 Liveness lastentry = lastmsgentry.getSecond();
280 if (lastentry instanceof LastMessage) {
281 ((LastMessage)lastentry).setDead();
282 } else if (lastentry instanceof Slot) {
283 ((Slot)lastentry).setDead();
285 throw new Error("Unrecognized type");
288 if (machineid == localmachineid) {
289 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
290 throw new Error("Server Error: Mismatch on local machine sequence number");
292 if (lastmsgseqnum > seqnum)
293 throw new Error("Server Error: Rollback on remote machine sequence number");
297 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
298 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
300 for(Entry entry : slot.getEntries()) {
301 switch(entry.getType()) {
302 case Entry.TypeKeyValue:
303 processEntry((KeyValue)entry, indexer);
306 case Entry.TypeLastMessage:
307 processEntry((LastMessage)entry, indexer, machineSet);
310 case Entry.TypeRejectedMessage:
311 processEntry((RejectedMessage)entry, indexer);
314 case Entry.TypeTableStatus:
315 processEntry((TableStatus)entry, indexer);
319 throw new Error("Unrecognized type: "+entry.getType());
324 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
325 for(int i=0; i < newslots.length; i++) {
326 Slot currslot=newslots[i];
327 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
328 if (prevslot != null &&
329 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
330 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);