2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
7 * IoTTable data structure. Provides client inferface.
13 final public class Table {
15 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
16 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
17 private SlotBuffer buffer;
18 private CloudComm cloud;
19 private long sequencenumber;
20 private long localmachineid;
21 private TableStatus lastTableStatus;
22 static final int FREE_SLOTS = 10;
23 static final int FORCED_RESIZE_INCREMENT = 20;
25 public Table(String baseurl, String password, long _localmachineid) {
26 localmachineid=_localmachineid;
27 buffer = new SlotBuffer();
28 numslots = buffer.capacity();
30 cloud=new CloudComm(baseurl, password);
33 public Table(CloudComm _cloud, long _localmachineid) {
34 localmachineid=_localmachineid;
35 buffer = new SlotBuffer();
36 numslots = buffer.capacity();
41 public void rebuild() {
42 Slot[] newslots=cloud.getSlots(sequencenumber+1);
43 validateandupdate(newslots, true);
46 public void update() {
47 Slot[] newslots=cloud.getSlots(sequencenumber+1);
49 validateandupdate(newslots, false);
52 public IoTString get(IoTString key) {
53 KeyValue kv=table.get(key);
60 public void initTable() {
61 Slot s=new Slot(1, localmachineid);
62 TableStatus status=new TableStatus(s, numslots);
64 Slot[] array=cloud.putSlot(s, numslots);
66 array = new Slot[] {s};
67 /* update data structure */
68 validateandupdate(array, true);
70 throw new Error("Error on initialization");
74 public IoTString put(IoTString key, IoTString value) {
76 KeyValue oldvalue=table.get(key);
77 if (tryput(key, value, false)) {
81 return oldvalue.getValue();
86 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
87 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
88 long seqn = buffer.getOldestSeqNum();
91 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
95 if ((numslots - buffer.size()) < FREE_SLOTS) {
96 /* have to check whether we have enough free slots */
97 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
98 seqn = fullfirstseqn < 1?1:fullfirstseqn;
99 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
100 Slot prevslot=buffer.getSlot(seqn);
101 if (!prevslot.isLive())
103 Vector<Entry> liveentries = prevslot.getLiveEntries();
104 for(Entry liveentry:liveentries) {
105 if (redundant(liveentry))
107 if (s.hasSpace(liveentry))
108 s.addEntry(liveentry);
110 if (s.canFit(liveentry))
111 s.addEntry(liveentry);
112 else if (!forcedresize) {
113 return tryput(key, value, true);
119 KeyValue kv=new KeyValue(s, key, value);
120 boolean insertedkv=false;
121 if (s.hasSpace(kv)) {
126 long newestseqnum=buffer.getNewestSeqNum();
128 for(; seqn<=newestseqnum; seqn++) {
129 Slot prevslot=buffer.getSlot(seqn);
130 if (!prevslot.isLive())
132 Vector<Entry> liveentries = prevslot.getLiveEntries();
133 for(Entry liveentry:liveentries) {
134 if (redundant(liveentry))
136 if (s.hasSpace(liveentry))
137 s.addEntry(liveentry);
145 max = numslots + FORCED_RESIZE_INCREMENT;
146 Slot[] array=cloud.putSlot(s, max);
148 array = new Slot[] {s};
152 /* update data structure */
153 validateandupdate(array, true);
158 boolean redundant(Entry liveentry) {
159 if (liveentry.getType()==Entry.TypeLastMessage) {
160 LastMessage lastmsg=(LastMessage) liveentry;
161 return lastmsg.getMachineID() == localmachineid;
167 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
168 /* The cloud communication layer has checked slot HMACs already
170 if (newslots.length==0)
173 long firstseqnum=newslots[0].getSequenceNumber();
174 if (firstseqnum <= sequencenumber)
175 throw new Error("Server Error: Sent older slots!");
177 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
178 checkHMACChain(indexer, newslots);
181 for(Slot slot: newslots) {
182 updateExpectedSize();
183 processSlot(indexer, slot, acceptupdatestolocal);
186 /* If there is a gap, check to see if the server sent us everything. */
187 if (firstseqnum != (sequencenumber+1))
188 checkNumSlots(newslots.length);
192 /* Commit new to slots. */
193 for(Slot slot:newslots) {
194 buffer.putSlot(slot);
196 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
199 private int expectedsize, currmaxsize;
201 private void checkNumSlots(int numslots) {
202 if (numslots != expectedsize)
203 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
206 private void initExpectedSize() {
207 long prevslots = sequencenumber;
208 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
209 currmaxsize = numslots;
212 private void updateExpectedSize() {
214 if (expectedsize > currmaxsize)
215 expectedsize = currmaxsize;
218 private void updateCurrMaxSize(int newmaxsize) {
219 currmaxsize=newmaxsize;
222 private void commitNewMaxSize() {
223 if (numslots != currmaxsize)
224 buffer.resize(currmaxsize);
226 numslots=currmaxsize;
229 private void processEntry(KeyValue entry, SlotIndexer indexer) {
230 IoTString key=entry.getKey();
231 KeyValue oldvalue=table.get(key);
232 if (oldvalue != null) {
235 table.put(key, entry);
238 private void processEntry(LastMessage entry, SlotIndexer indexer) {
239 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
242 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
243 long oldseqnum=entry.getOldSeqNum();
244 long newseqnum=entry.getNewSeqNum();
245 boolean isequal=entry.getEqual();
246 long machineid=entry.getMachineID();
247 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
248 Slot slot=indexer.getSlot(seqnum);
250 long slotmachineid=slot.getMachineID();
251 if (isequal!=(slotmachineid==machineid)) {
252 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
258 private void processEntry(TableStatus entry, SlotIndexer indexer) {
259 int newnumslots=entry.getMaxSlots();
260 updateCurrMaxSize(newnumslots);
261 if (lastTableStatus != null)
262 lastTableStatus.setDead();
263 lastTableStatus = entry;
266 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
267 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
268 if (lastmsgentry == null)
271 long lastmsgseqnum = lastmsgentry.getFirst();
272 Liveness lastentry = lastmsgentry.getSecond();
273 if (lastentry instanceof LastMessage) {
274 ((LastMessage)lastentry).setDead();
275 } else if (lastentry instanceof Slot) {
276 ((Slot)lastentry).setDead();
278 throw new Error("Unrecognized type");
281 if (machineid == localmachineid) {
282 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
283 throw new Error("Server Error: Mismatch on local machine sequence number");
285 if (lastmsgseqnum > seqnum)
286 throw new Error("Server Error: Rollback on remote machine sequence number");
290 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
291 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
293 for(Entry entry : slot.getEntries()) {
294 switch(entry.getType()) {
295 case Entry.TypeKeyValue:
296 processEntry((KeyValue)entry, indexer);
299 case Entry.TypeLastMessage:
300 processEntry((LastMessage)entry, indexer);
303 case Entry.TypeRejectedMessage:
304 processEntry((RejectedMessage)entry, indexer);
307 case Entry.TypeTableStatus:
308 processEntry((TableStatus)entry, indexer);
312 throw new Error("Unrecognized type: "+entry.getType());
317 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
318 for(int i=0; i < newslots.length; i++) {
319 Slot currslot=newslots[i];
320 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
321 if (prevslot != null &&
322 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
323 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);