2 import java.util.HashMap;
3 import java.util.Arrays;
4 import javax.crypto.spec.*;
7 final public class Table {
9 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
10 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
11 private SlotBuffer buffer;
12 private CloudComm cloud;
14 private long sequencenumber;
15 private long localmachineid;
16 private TableStatus lastTableStatus;
17 static final int FREE_SLOTS = 10;
19 public Table(String baseurl, String password, long _localmachineid) {
20 localmachineid=_localmachineid;
21 buffer = new SlotBuffer();
23 initCloud(baseurl, password);
26 private void initCloud(String baseurl, String password) {
28 SecretKeySpec secret=getKey(password);
29 Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
30 encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
31 Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
32 decryptCipher.init(Cipher.DECRYPT_MODE, secret);
33 hmac = Mac.getInstance("HmacSHA256");
35 cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
36 } catch (Exception e) {
37 throw new Error("Failed To Initialize Ciphers");
41 private SecretKeySpec getKey(String password) {
43 PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
44 SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
45 SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
47 } catch (Exception e) {
48 throw new Error("Failed generating key.");
52 public void update() {
53 Slot[] newslots=cloud.getSlots(sequencenumber);
54 validateandupdate(newslots);
57 public IoTString get(IoTString key) {
58 KeyValue kv=table.get(key);
65 public IoTString put(IoTString key, IoTString value) {
66 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
68 if ((numslots - buffer.size()) < FREE_SLOTS) {
69 //have to check whether we have enough free slots
70 long seqn = buffer.getNewestSeqNum() + 1 - numslots;
71 for(int i=0; i < FREE_SLOTS; i++, seqn--) {
72 Slot prevslot=buffer.getSlot(seqn);
73 if (!prevslot.isLive())
83 private void validateandupdate(Slot[] newslots) {
84 //The cloud communication layer has checked slot HMACs already
86 if (newslots.length==0)
89 long firstseqnum=newslots[0].getSequenceNumber();
90 if (firstseqnum < sequencenumber)
91 throw new Error("Server Error: Sent older slots!");
93 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
94 checkHMACChain(indexer, newslots);
97 for(Slot slot: newslots) {
99 processSlot(indexer, slot);
101 checkNumSlots(newslots.length);
104 //commit new to slots
105 for(Slot slot:newslots) {
106 buffer.putSlot(slot);
110 private int expectedsize, currmaxsize;
112 private void checkNumSlots(int numslots) {
113 if (numslots != expectedsize)
114 throw new Error("Server Error: Server did not send all slots");
117 private void initExpectedSize() {
118 expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
119 currmaxsize = numslots;
122 private void updateExpectedSize() {
124 if (expectedsize > currmaxsize)
125 expectedsize = currmaxsize;
128 private void updateCurrMaxSize(int newmaxsize) {
129 currmaxsize=newmaxsize;
132 private void commitNewMaxSize() {
133 if (numslots != currmaxsize)
134 buffer.resize(currmaxsize);
136 numslots=currmaxsize;
139 private void processEntry(KeyValue entry, SlotIndexer indexer) {
140 IoTString key=entry.getKey();
141 KeyValue oldvalue=table.get(key);
142 if (oldvalue != null) {
145 table.put(key, entry);
148 private void processEntry(LastMessage entry, SlotIndexer indexer) {
149 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
152 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
153 long oldseqnum=entry.getOldSeqNum();
154 long newseqnum=entry.getNewSeqNum();
155 boolean isequal=entry.getEqual();
156 long machineid=entry.getMachineID();
157 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
158 Slot slot=indexer.getSlot(seqnum);
160 long slotmachineid=slot.getMachineID();
161 if (isequal!=(slotmachineid==machineid)) {
162 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
168 private void processEntry(TableStatus entry, SlotIndexer indexer) {
169 int newnumslots=entry.getMaxSlots();
170 updateCurrMaxSize(newnumslots);
171 if (lastTableStatus != null)
172 lastTableStatus.setDead();
173 lastTableStatus = entry;
176 private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
177 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
178 if (lastmsgentry == null)
181 long lastmsgseqnum = lastmsgentry.getFirst();
182 Liveness lastentry = lastmsgentry.getSecond();
183 if (lastentry instanceof LastMessage) {
184 ((LastMessage)lastentry).setDead();
185 } else if (lastentry instanceof Slot) {
186 ((Slot)lastentry).setDead();
188 throw new Error("Unrecognized type");
191 if (machineid == localmachineid) {
192 if (lastmsgseqnum != seqnum)
193 throw new Error("Server Error: Mismatch on local machine sequence number");
195 if (lastmsgseqnum > seqnum)
196 throw new Error("Server Error: Rollback on remote machine sequence number");
200 private void processSlot(SlotIndexer indexer, Slot slot) {
201 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
203 for(Entry entry : slot.getEntries()) {
204 switch(entry.getType()) {
205 case Entry.TypeKeyValue:
206 processEntry((KeyValue)entry, indexer);
209 case Entry.TypeLastMessage:
210 processEntry((LastMessage)entry, indexer);
213 case Entry.TypeRejectedMessage:
214 processEntry((RejectedMessage)entry, indexer);
217 case Entry.TypeTableStatus:
218 processEntry((TableStatus)entry, indexer);
222 throw new Error("Unrecognized type: "+entry.getType());
227 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
228 for(int i=0; i < newslots.length; i++) {
229 Slot currslot=newslots[i];
230 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
231 if (prevslot != null &&
232 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
233 throw new Error("Server Error: Invalid HMAC Chain");