2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
6 final public class Table {
8 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
9 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
10 private SlotBuffer buffer;
11 private CloudComm cloud;
12 private long sequencenumber;
13 private long localmachineid;
14 private TableStatus lastTableStatus;
15 static final int FREE_SLOTS = 10;
16 static final int FORCED_RESIZE_INCREMENT = 20;
18 public Table(String baseurl, String password, long _localmachineid) {
19 localmachineid=_localmachineid;
20 buffer = new SlotBuffer();
21 numslots = buffer.capacity();
23 cloud=new CloudComm(baseurl, password);
26 public Table(CloudComm _cloud, long _localmachineid) {
27 localmachineid=_localmachineid;
28 buffer = new SlotBuffer();
29 numslots = buffer.capacity();
34 public void update() {
35 Slot[] newslots=cloud.getSlots(sequencenumber+1);
37 validateandupdate(newslots, false);
40 public IoTString get(IoTString key) {
41 KeyValue kv=table.get(key);
48 public void initTable() {
49 Slot s=new Slot(1, localmachineid);
50 TableStatus status=new TableStatus(s, numslots);
52 Slot[] array=cloud.putSlot(s, numslots);
54 array = new Slot[] {s};
55 validateandupdate(array, true); // update data structure
57 throw new Error("Error on initialization");
61 public IoTString put(IoTString key, IoTString value) {
63 KeyValue oldvalue=table.get(key);
64 if (tryput(key, value, false)) {
68 return oldvalue.getValue();
73 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
74 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
75 long seqn = buffer.getOldestSeqNum();
78 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
82 if ((numslots - buffer.size()) < FREE_SLOTS) {
83 //have to check whether we have enough free slots
84 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
85 seqn = fullfirstseqn < 1?1:fullfirstseqn;
86 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
87 Slot prevslot=buffer.getSlot(seqn);
88 if (!prevslot.isLive())
90 Vector<Entry> liveentries = prevslot.getLiveEntries();
91 for(Entry liveentry:liveentries) {
92 if (redundant(liveentry))
94 if (s.hasSpace(liveentry))
95 s.addEntry(liveentry);
97 if (s.canFit(liveentry))
98 s.addEntry(liveentry);
99 else if (!forcedresize) {
100 return tryput(key, value, true);
106 KeyValue kv=new KeyValue(s, key, value);
107 boolean insertedkv=false;
108 if (s.hasSpace(kv)) {
113 long newestseqnum=buffer.getNewestSeqNum();
115 for(; seqn<=newestseqnum; seqn++) {
116 Slot prevslot=buffer.getSlot(seqn);
117 if (!prevslot.isLive())
119 Vector<Entry> liveentries = prevslot.getLiveEntries();
120 for(Entry liveentry:liveentries) {
121 if (redundant(liveentry))
123 if (s.hasSpace(liveentry))
124 s.addEntry(liveentry);
132 max = numslots + FORCED_RESIZE_INCREMENT;
133 Slot[] array=cloud.putSlot(s, max);
135 array = new Slot[] {s};
139 validateandupdate(array, true); // update data structure
144 boolean redundant(Entry liveentry) {
145 if (liveentry.getType()==Entry.TypeLastMessage) {
146 LastMessage lastmsg=(LastMessage) liveentry;
147 return lastmsg.getMachineID() == localmachineid;
153 private void validateandupdate(Slot[] newslots, boolean isput) {
154 //The cloud communication layer has checked slot HMACs already
156 if (newslots.length==0)
159 long firstseqnum=newslots[0].getSequenceNumber();
160 if (firstseqnum <= sequencenumber)
161 throw new Error("Server Error: Sent older slots!");
163 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
164 checkHMACChain(indexer, newslots);
167 for(Slot slot: newslots) {
168 updateExpectedSize();
169 processSlot(indexer, slot, isput);
172 //If there is a gap, check to see if the server sent us everything
173 if (firstseqnum != (sequencenumber+1))
174 checkNumSlots(newslots.length);
178 //commit new to slots
179 for(Slot slot:newslots) {
180 buffer.putSlot(slot);
182 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
185 private int expectedsize, currmaxsize;
187 private void checkNumSlots(int numslots) {
188 if (numslots != expectedsize)
189 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
192 private void initExpectedSize() {
193 long prevslots = sequencenumber;
194 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
195 currmaxsize = numslots;
198 private void updateExpectedSize() {
200 if (expectedsize > currmaxsize)
201 expectedsize = currmaxsize;
204 private void updateCurrMaxSize(int newmaxsize) {
205 currmaxsize=newmaxsize;
208 private void commitNewMaxSize() {
209 if (numslots != currmaxsize)
210 buffer.resize(currmaxsize);
212 numslots=currmaxsize;
215 private void processEntry(KeyValue entry, SlotIndexer indexer) {
216 IoTString key=entry.getKey();
217 KeyValue oldvalue=table.get(key);
218 if (oldvalue != null) {
221 table.put(key, entry);
224 private void processEntry(LastMessage entry, SlotIndexer indexer) {
225 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
228 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
229 long oldseqnum=entry.getOldSeqNum();
230 long newseqnum=entry.getNewSeqNum();
231 boolean isequal=entry.getEqual();
232 long machineid=entry.getMachineID();
233 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
234 Slot slot=indexer.getSlot(seqnum);
236 long slotmachineid=slot.getMachineID();
237 if (isequal!=(slotmachineid==machineid)) {
238 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
244 private void processEntry(TableStatus entry, SlotIndexer indexer) {
245 int newnumslots=entry.getMaxSlots();
246 updateCurrMaxSize(newnumslots);
247 if (lastTableStatus != null)
248 lastTableStatus.setDead();
249 lastTableStatus = entry;
252 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
253 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
254 if (lastmsgentry == null)
257 long lastmsgseqnum = lastmsgentry.getFirst();
258 Liveness lastentry = lastmsgentry.getSecond();
259 if (lastentry instanceof LastMessage) {
260 ((LastMessage)lastentry).setDead();
261 } else if (lastentry instanceof Slot) {
262 ((Slot)lastentry).setDead();
264 throw new Error("Unrecognized type");
267 if (machineid == localmachineid) {
268 if (lastmsgseqnum != seqnum && !isput)
269 throw new Error("Server Error: Mismatch on local machine sequence number");
271 if (lastmsgseqnum > seqnum)
272 throw new Error("Server Error: Rollback on remote machine sequence number");
276 private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
277 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
279 for(Entry entry : slot.getEntries()) {
280 switch(entry.getType()) {
281 case Entry.TypeKeyValue:
282 processEntry((KeyValue)entry, indexer);
285 case Entry.TypeLastMessage:
286 processEntry((LastMessage)entry, indexer);
289 case Entry.TypeRejectedMessage:
290 processEntry((RejectedMessage)entry, indexer);
293 case Entry.TypeTableStatus:
294 processEntry((TableStatus)entry, indexer);
298 throw new Error("Unrecognized type: "+entry.getType());
303 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
304 for(int i=0; i < newslots.length; i++) {
305 Slot currslot=newslots[i];
306 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
307 if (prevslot != null &&
308 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
309 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);