2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
8 final public class Table {
10 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12 private SlotBuffer buffer;
13 private CloudComm cloud;
15 private long sequencenumber;
16 private long localmachineid;
17 private TableStatus lastTableStatus;
18 static final int FREE_SLOTS = 10;
19 static final int FORCED_RESIZE_INCREMENT = 20;
21 public Table(String baseurl, String password, long _localmachineid) {
22 localmachineid=_localmachineid;
23 buffer = new SlotBuffer();
24 numslots = buffer.capacity();
26 initCloud(baseurl, password);
29 public Table(CloudComm _cloud, long _localmachineid) {
30 localmachineid=_localmachineid;
31 buffer = new SlotBuffer();
36 private void initCloud(String baseurl, String password) {
38 SecretKeySpec secret=getKey(password);
39 Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
40 encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
41 Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
42 decryptCipher.init(Cipher.DECRYPT_MODE, secret);
43 hmac = Mac.getInstance("HmacSHA256");
45 cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
46 } catch (Exception e) {
47 throw new Error("Failed To Initialize Ciphers");
51 private SecretKeySpec getKey(String password) {
53 PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
54 SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
55 SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
57 } catch (Exception e) {
58 throw new Error("Failed generating key.");
62 public void update() {
63 Slot[] newslots=cloud.getSlots(sequencenumber);
64 validateandupdate(newslots);
67 public IoTString get(IoTString key) {
68 KeyValue kv=table.get(key);
75 public void initTable() {
76 Slot s=new Slot(1, localmachineid);
77 TableStatus status=new TableStatus(s, numslots);
79 Slot[] array=cloud.putSlot(s, numslots);
81 array = new Slot[] {s};
82 validateandupdate(array); // update data structure
84 throw new Error("Error on initialization");
88 public IoTString put(IoTString key, IoTString value) {
90 KeyValue oldvalue=table.get(key);
91 if (tryput(key, value, false)) {
92 return oldvalue.getValue();
97 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
98 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
99 long seqn = buffer.getOldestSeqNum();
102 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
106 if ((numslots - buffer.size()) < FREE_SLOTS) {
107 //have to check whether we have enough free slots
108 seqn = buffer.getNewestSeqNum() + 1 - numslots;
109 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
110 Slot prevslot=buffer.getSlot(seqn);
111 if (!prevslot.isLive())
113 Vector<Entry> liveentries = prevslot.getLiveEntries();
114 for(Entry liveentry:liveentries) {
115 if (s.hasSpace(liveentry))
116 s.addEntry(liveentry);
118 if (s.canFit(liveentry))
119 s.addEntry(liveentry);
120 else if (!forcedresize) {
121 return tryput(key, value, true);
127 KeyValue kv=new KeyValue(s, key, value);
128 boolean insertedkv=false;
129 if (s.hasSpace(kv)) {
134 long newestseqnum=buffer.getNewestSeqNum();
136 for(;seqn<=newestseqnum;seqn++) {
137 Slot prevslot=buffer.getSlot(seqn);
138 if (!prevslot.isLive())
140 Vector<Entry> liveentries = prevslot.getLiveEntries();
141 for(Entry liveentry:liveentries) {
142 if (s.hasSpace(liveentry))
143 s.addEntry(liveentry);
151 max = numslots + FORCED_RESIZE_INCREMENT;
152 Slot[] array=cloud.putSlot(s, max);
154 array = new Slot[] {s};
158 validateandupdate(array); // update data structure
163 private void validateandupdate(Slot[] newslots) {
164 //The cloud communication layer has checked slot HMACs already
166 if (newslots.length==0)
169 long firstseqnum=newslots[0].getSequenceNumber();
170 if (firstseqnum < sequencenumber)
171 throw new Error("Server Error: Sent older slots!");
173 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
174 checkHMACChain(indexer, newslots);
177 for(Slot slot: newslots) {
178 updateExpectedSize();
179 processSlot(indexer, slot);
181 checkNumSlots(newslots.length);
184 //commit new to slots
185 for(Slot slot:newslots) {
186 buffer.putSlot(slot);
190 private int expectedsize, currmaxsize;
192 private void checkNumSlots(int numslots) {
193 if (numslots != expectedsize)
194 throw new Error("Server Error: Server did not send all slots");
197 private void initExpectedSize() {
198 expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
199 currmaxsize = numslots;
202 private void updateExpectedSize() {
204 if (expectedsize > currmaxsize)
205 expectedsize = currmaxsize;
208 private void updateCurrMaxSize(int newmaxsize) {
209 currmaxsize=newmaxsize;
212 private void commitNewMaxSize() {
213 if (numslots != currmaxsize)
214 buffer.resize(currmaxsize);
216 numslots=currmaxsize;
219 private void processEntry(KeyValue entry, SlotIndexer indexer) {
220 IoTString key=entry.getKey();
221 KeyValue oldvalue=table.get(key);
222 if (oldvalue != null) {
225 table.put(key, entry);
228 private void processEntry(LastMessage entry, SlotIndexer indexer) {
229 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
232 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
233 long oldseqnum=entry.getOldSeqNum();
234 long newseqnum=entry.getNewSeqNum();
235 boolean isequal=entry.getEqual();
236 long machineid=entry.getMachineID();
237 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
238 Slot slot=indexer.getSlot(seqnum);
240 long slotmachineid=slot.getMachineID();
241 if (isequal!=(slotmachineid==machineid)) {
242 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
248 private void processEntry(TableStatus entry, SlotIndexer indexer) {
249 int newnumslots=entry.getMaxSlots();
250 updateCurrMaxSize(newnumslots);
251 if (lastTableStatus != null)
252 lastTableStatus.setDead();
253 lastTableStatus = entry;
256 private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
257 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
258 if (lastmsgentry == null)
261 long lastmsgseqnum = lastmsgentry.getFirst();
262 Liveness lastentry = lastmsgentry.getSecond();
263 if (lastentry instanceof LastMessage) {
264 ((LastMessage)lastentry).setDead();
265 } else if (lastentry instanceof Slot) {
266 ((Slot)lastentry).setDead();
268 throw new Error("Unrecognized type");
271 if (machineid == localmachineid) {
272 if (lastmsgseqnum != seqnum)
273 throw new Error("Server Error: Mismatch on local machine sequence number");
275 if (lastmsgseqnum > seqnum)
276 throw new Error("Server Error: Rollback on remote machine sequence number");
280 private void processSlot(SlotIndexer indexer, Slot slot) {
281 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
283 for(Entry entry : slot.getEntries()) {
284 switch(entry.getType()) {
285 case Entry.TypeKeyValue:
286 processEntry((KeyValue)entry, indexer);
289 case Entry.TypeLastMessage:
290 processEntry((LastMessage)entry, indexer);
293 case Entry.TypeRejectedMessage:
294 processEntry((RejectedMessage)entry, indexer);
297 case Entry.TypeTableStatus:
298 processEntry((TableStatus)entry, indexer);
302 throw new Error("Unrecognized type: "+entry.getType());
307 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
308 for(int i=0; i < newslots.length; i++) {
309 Slot currslot=newslots[i];
310 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
311 if (prevslot != null &&
312 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
313 throw new Error("Server Error: Invalid HMAC Chain");