2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
6 final public class Table {
8 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
9 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
10 private SlotBuffer buffer;
11 private CloudComm cloud;
12 private long sequencenumber;
13 private long localmachineid;
14 private TableStatus lastTableStatus;
15 static final int FREE_SLOTS = 10;
16 static final int FORCED_RESIZE_INCREMENT = 20;
18 public Table(String baseurl, String password, long _localmachineid) {
19 localmachineid=_localmachineid;
20 buffer = new SlotBuffer();
21 numslots = buffer.capacity();
23 cloud=new CloudComm(baseurl, password);
26 public Table(CloudComm _cloud, long _localmachineid) {
27 localmachineid=_localmachineid;
28 buffer = new SlotBuffer();
29 numslots = buffer.capacity();
34 public void rebuild() {
35 Slot[] newslots=cloud.getSlots(sequencenumber+1);
36 validateandupdate(newslots, true);
39 public void update() {
40 Slot[] newslots=cloud.getSlots(sequencenumber+1);
42 validateandupdate(newslots, false);
45 public IoTString get(IoTString key) {
46 KeyValue kv=table.get(key);
53 public void initTable() {
54 Slot s=new Slot(1, localmachineid);
55 TableStatus status=new TableStatus(s, numslots);
57 Slot[] array=cloud.putSlot(s, numslots);
59 array = new Slot[] {s};
60 validateandupdate(array, true); // update data structure
62 throw new Error("Error on initialization");
66 public IoTString put(IoTString key, IoTString value) {
68 KeyValue oldvalue=table.get(key);
69 if (tryput(key, value, false)) {
73 return oldvalue.getValue();
78 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
79 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
80 long seqn = buffer.getOldestSeqNum();
83 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
87 if ((numslots - buffer.size()) < FREE_SLOTS) {
88 //have to check whether we have enough free slots
89 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
90 seqn = fullfirstseqn < 1?1:fullfirstseqn;
91 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
92 Slot prevslot=buffer.getSlot(seqn);
93 if (!prevslot.isLive())
95 Vector<Entry> liveentries = prevslot.getLiveEntries();
96 for(Entry liveentry:liveentries) {
97 if (redundant(liveentry))
99 if (s.hasSpace(liveentry))
100 s.addEntry(liveentry);
102 if (s.canFit(liveentry))
103 s.addEntry(liveentry);
104 else if (!forcedresize) {
105 return tryput(key, value, true);
111 KeyValue kv=new KeyValue(s, key, value);
112 boolean insertedkv=false;
113 if (s.hasSpace(kv)) {
118 long newestseqnum=buffer.getNewestSeqNum();
120 for(; seqn<=newestseqnum; seqn++) {
121 Slot prevslot=buffer.getSlot(seqn);
122 if (!prevslot.isLive())
124 Vector<Entry> liveentries = prevslot.getLiveEntries();
125 for(Entry liveentry:liveentries) {
126 if (redundant(liveentry))
128 if (s.hasSpace(liveentry))
129 s.addEntry(liveentry);
137 max = numslots + FORCED_RESIZE_INCREMENT;
138 Slot[] array=cloud.putSlot(s, max);
140 array = new Slot[] {s};
144 validateandupdate(array, true); // update data structure
149 boolean redundant(Entry liveentry) {
150 if (liveentry.getType()==Entry.TypeLastMessage) {
151 LastMessage lastmsg=(LastMessage) liveentry;
152 return lastmsg.getMachineID() == localmachineid;
158 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
159 //The cloud communication layer has checked slot HMACs already
161 if (newslots.length==0)
164 long firstseqnum=newslots[0].getSequenceNumber();
165 if (firstseqnum <= sequencenumber)
166 throw new Error("Server Error: Sent older slots!");
168 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
169 checkHMACChain(indexer, newslots);
172 for(Slot slot: newslots) {
173 updateExpectedSize();
174 processSlot(indexer, slot, acceptupdatestolocal);
177 //If there is a gap, check to see if the server sent us everything
178 if (firstseqnum != (sequencenumber+1))
179 checkNumSlots(newslots.length);
183 //commit new to slots
184 for(Slot slot:newslots) {
185 buffer.putSlot(slot);
187 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
190 private int expectedsize, currmaxsize;
192 private void checkNumSlots(int numslots) {
193 if (numslots != expectedsize)
194 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
197 private void initExpectedSize() {
198 long prevslots = sequencenumber;
199 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
200 currmaxsize = numslots;
203 private void updateExpectedSize() {
205 if (expectedsize > currmaxsize)
206 expectedsize = currmaxsize;
209 private void updateCurrMaxSize(int newmaxsize) {
210 currmaxsize=newmaxsize;
213 private void commitNewMaxSize() {
214 if (numslots != currmaxsize)
215 buffer.resize(currmaxsize);
217 numslots=currmaxsize;
220 private void processEntry(KeyValue entry, SlotIndexer indexer) {
221 IoTString key=entry.getKey();
222 KeyValue oldvalue=table.get(key);
223 if (oldvalue != null) {
226 table.put(key, entry);
229 private void processEntry(LastMessage entry, SlotIndexer indexer) {
230 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
233 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
234 long oldseqnum=entry.getOldSeqNum();
235 long newseqnum=entry.getNewSeqNum();
236 boolean isequal=entry.getEqual();
237 long machineid=entry.getMachineID();
238 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
239 Slot slot=indexer.getSlot(seqnum);
241 long slotmachineid=slot.getMachineID();
242 if (isequal!=(slotmachineid==machineid)) {
243 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
249 private void processEntry(TableStatus entry, SlotIndexer indexer) {
250 int newnumslots=entry.getMaxSlots();
251 updateCurrMaxSize(newnumslots);
252 if (lastTableStatus != null)
253 lastTableStatus.setDead();
254 lastTableStatus = entry;
257 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
258 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
259 if (lastmsgentry == null)
262 long lastmsgseqnum = lastmsgentry.getFirst();
263 Liveness lastentry = lastmsgentry.getSecond();
264 if (lastentry instanceof LastMessage) {
265 ((LastMessage)lastentry).setDead();
266 } else if (lastentry instanceof Slot) {
267 ((Slot)lastentry).setDead();
269 throw new Error("Unrecognized type");
272 if (machineid == localmachineid) {
273 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
274 throw new Error("Server Error: Mismatch on local machine sequence number");
276 if (lastmsgseqnum > seqnum)
277 throw new Error("Server Error: Rollback on remote machine sequence number");
281 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
282 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
284 for(Entry entry : slot.getEntries()) {
285 switch(entry.getType()) {
286 case Entry.TypeKeyValue:
287 processEntry((KeyValue)entry, indexer);
290 case Entry.TypeLastMessage:
291 processEntry((LastMessage)entry, indexer);
294 case Entry.TypeRejectedMessage:
295 processEntry((RejectedMessage)entry, indexer);
298 case Entry.TypeTableStatus:
299 processEntry((TableStatus)entry, indexer);
303 throw new Error("Unrecognized type: "+entry.getType());
308 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
309 for(int i=0; i < newslots.length; i++) {
310 Slot currslot=newslots[i];
311 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
312 if (prevslot != null &&
313 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
314 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);