2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
8 final public class Table {
10 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12 private SlotBuffer buffer;
13 private CloudComm cloud;
15 private long sequencenumber;
16 private long localmachineid;
17 private TableStatus lastTableStatus;
18 static final int FREE_SLOTS = 10;
19 static final int FORCED_RESIZE_INCREMENT = 20;
21 public Table(String baseurl, String password, long _localmachineid) {
22 localmachineid=_localmachineid;
23 buffer = new SlotBuffer();
25 initCloud(baseurl, password);
28 private void initCloud(String baseurl, String password) {
30 SecretKeySpec secret=getKey(password);
31 Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
32 encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
33 Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
34 decryptCipher.init(Cipher.DECRYPT_MODE, secret);
35 hmac = Mac.getInstance("HmacSHA256");
37 cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
38 } catch (Exception e) {
39 throw new Error("Failed To Initialize Ciphers");
43 private SecretKeySpec getKey(String password) {
45 PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
46 SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
47 SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
49 } catch (Exception e) {
50 throw new Error("Failed generating key.");
54 public void update() {
55 Slot[] newslots=cloud.getSlots(sequencenumber);
56 validateandupdate(newslots);
59 public IoTString get(IoTString key) {
60 KeyValue kv=table.get(key);
67 public IoTString put(IoTString key, IoTString value) {
69 KeyValue oldvalue=table.get(key);
70 if (tryput(key, value)) {
71 return oldvalue.getValue();
76 private boolean tryput(IoTString key, IoTString value) {
77 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
78 boolean forcedresize = false;
80 long seqn = buffer.getOldestSeqNum();
82 if ((numslots - buffer.size()) < FREE_SLOTS) {
83 //have to check whether we have enough free slots
84 seqn = buffer.getNewestSeqNum() + 1 - numslots;
85 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
86 Slot prevslot=buffer.getSlot(seqn);
87 if (!prevslot.isLive())
89 Vector<Entry> liveentries = prevslot.getLiveEntries();
90 for(Entry liveentry:liveentries) {
91 if (s.hasSpace(liveentry))
92 s.addEntry(liveentry);
94 if (s.canFit(liveentry))
95 s.addEntry(liveentry);
102 KeyValue kv=new KeyValue(s, key, value);
103 boolean insertedkv=false;
104 if (s.hasSpace(kv)) {
109 long newestseqnum=buffer.getNewestSeqNum();
111 for(;seqn<=newestseqnum;seqn++) {
112 Slot prevslot=buffer.getSlot(seqn);
113 if (!prevslot.isLive())
115 Vector<Entry> liveentries = prevslot.getLiveEntries();
116 for(Entry liveentry:liveentries) {
117 if (s.hasSpace(liveentry))
118 s.addEntry(liveentry);
126 max = numslots + FORCED_RESIZE_INCREMENT;
127 Slot[] array=cloud.putSlot(s, max);
129 array = new Slot[] {s};
133 validateandupdate(array); // update data structure
138 private void validateandupdate(Slot[] newslots) {
139 //The cloud communication layer has checked slot HMACs already
141 if (newslots.length==0)
144 long firstseqnum=newslots[0].getSequenceNumber();
145 if (firstseqnum < sequencenumber)
146 throw new Error("Server Error: Sent older slots!");
148 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
149 checkHMACChain(indexer, newslots);
152 for(Slot slot: newslots) {
153 updateExpectedSize();
154 processSlot(indexer, slot);
156 checkNumSlots(newslots.length);
159 //commit new to slots
160 for(Slot slot:newslots) {
161 buffer.putSlot(slot);
165 private int expectedsize, currmaxsize;
167 private void checkNumSlots(int numslots) {
168 if (numslots != expectedsize)
169 throw new Error("Server Error: Server did not send all slots");
172 private void initExpectedSize() {
173 expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
174 currmaxsize = numslots;
177 private void updateExpectedSize() {
179 if (expectedsize > currmaxsize)
180 expectedsize = currmaxsize;
183 private void updateCurrMaxSize(int newmaxsize) {
184 currmaxsize=newmaxsize;
187 private void commitNewMaxSize() {
188 if (numslots != currmaxsize)
189 buffer.resize(currmaxsize);
191 numslots=currmaxsize;
194 private void processEntry(KeyValue entry, SlotIndexer indexer) {
195 IoTString key=entry.getKey();
196 KeyValue oldvalue=table.get(key);
197 if (oldvalue != null) {
200 table.put(key, entry);
203 private void processEntry(LastMessage entry, SlotIndexer indexer) {
204 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
207 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
208 long oldseqnum=entry.getOldSeqNum();
209 long newseqnum=entry.getNewSeqNum();
210 boolean isequal=entry.getEqual();
211 long machineid=entry.getMachineID();
212 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
213 Slot slot=indexer.getSlot(seqnum);
215 long slotmachineid=slot.getMachineID();
216 if (isequal!=(slotmachineid==machineid)) {
217 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
223 private void processEntry(TableStatus entry, SlotIndexer indexer) {
224 int newnumslots=entry.getMaxSlots();
225 updateCurrMaxSize(newnumslots);
226 if (lastTableStatus != null)
227 lastTableStatus.setDead();
228 lastTableStatus = entry;
231 private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
232 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
233 if (lastmsgentry == null)
236 long lastmsgseqnum = lastmsgentry.getFirst();
237 Liveness lastentry = lastmsgentry.getSecond();
238 if (lastentry instanceof LastMessage) {
239 ((LastMessage)lastentry).setDead();
240 } else if (lastentry instanceof Slot) {
241 ((Slot)lastentry).setDead();
243 throw new Error("Unrecognized type");
246 if (machineid == localmachineid) {
247 if (lastmsgseqnum != seqnum)
248 throw new Error("Server Error: Mismatch on local machine sequence number");
250 if (lastmsgseqnum > seqnum)
251 throw new Error("Server Error: Rollback on remote machine sequence number");
255 private void processSlot(SlotIndexer indexer, Slot slot) {
256 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
258 for(Entry entry : slot.getEntries()) {
259 switch(entry.getType()) {
260 case Entry.TypeKeyValue:
261 processEntry((KeyValue)entry, indexer);
264 case Entry.TypeLastMessage:
265 processEntry((LastMessage)entry, indexer);
268 case Entry.TypeRejectedMessage:
269 processEntry((RejectedMessage)entry, indexer);
272 case Entry.TypeTableStatus:
273 processEntry((TableStatus)entry, indexer);
277 throw new Error("Unrecognized type: "+entry.getType());
282 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
283 for(int i=0; i < newslots.length; i++) {
284 Slot currslot=newslots[i];
285 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
286 if (prevslot != null &&
287 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
288 throw new Error("Server Error: Invalid HMAC Chain");