c2adbc93fe9a67e84b6f669e52177760ff9d760a
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
6 import javax.crypto.*;
7
8 final public class Table {
9         private int numslots;
10         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12         private SlotBuffer buffer;
13         private CloudComm cloud;
14         private Mac hmac;
15         private long sequencenumber;
16         private long localmachineid;
17   private TableStatus lastTableStatus;
18   static final int FREE_SLOTS = 10;
19   static final int FORCED_RESIZE_INCREMENT = 20;
20   
21         public Table(String baseurl, String password, long _localmachineid) {
22                 localmachineid=_localmachineid;
23                 buffer = new SlotBuffer();
24                 sequencenumber = 1;
25                 initCloud(baseurl, password);
26         }
27
28         private void initCloud(String baseurl, String password) {
29                 try {
30                         SecretKeySpec secret=getKey(password);
31                         Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
32                         encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
33                         Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
34                         decryptCipher.init(Cipher.DECRYPT_MODE, secret);
35                         hmac = Mac.getInstance("HmacSHA256");
36                         hmac.init(secret);
37                         cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
38                 } catch (Exception e) {
39                         throw new Error("Failed To Initialize Ciphers");
40                 }
41         }
42
43         private SecretKeySpec getKey(String password) {
44                 try {
45                         PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
46                         SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
47                         SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
48                         return secret;
49                 } catch (Exception e) {
50                         throw new Error("Failed generating key.");
51                 }
52         }
53
54         public void update() {
55                 Slot[] newslots=cloud.getSlots(sequencenumber);
56                 validateandupdate(newslots);
57         }
58
59         public IoTString get(IoTString key) {
60                 KeyValue kv=table.get(key);
61                 if (kv != null)
62                         return kv.getValue();
63                 else
64                         return null;
65         }
66
67         public IoTString put(IoTString key, IoTString value) {
68     while(true) {
69       KeyValue oldvalue=table.get(key);
70       if (tryput(key, value)) {
71         return oldvalue.getValue();
72       }
73     }
74   }
75
76   private boolean tryput(IoTString key, IoTString value) {
77     Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
78     boolean forcedresize = false;
79
80     long seqn = buffer.getOldestSeqNum();
81     
82     if ((numslots - buffer.size()) < FREE_SLOTS) {
83       //have to check whether we have enough free slots
84       seqn = buffer.getNewestSeqNum() + 1 - numslots;
85       for(int i=0; i < FREE_SLOTS; i++, seqn++) {
86         Slot prevslot=buffer.getSlot(seqn);
87         if (!prevslot.isLive())
88           continue;
89         Vector<Entry> liveentries = prevslot.getLiveEntries();
90         for(Entry liveentry:liveentries) {
91           if (s.hasSpace(liveentry))
92             s.addEntry(liveentry);
93           else if (i==0) {
94             if (s.canFit(liveentry))
95               s.addEntry(liveentry);
96             else
97               forcedresize = true;
98           }
99         }
100       }
101     }
102     KeyValue kv=new KeyValue(s, key, value);
103     boolean insertedkv=false;
104     if (s.hasSpace(kv)) {
105       s.addEntry(kv);
106       insertedkv=true;
107     }
108
109     long newestseqnum=buffer.getNewestSeqNum();
110     search:
111     for(;seqn<=newestseqnum;seqn++) {
112       Slot prevslot=buffer.getSlot(seqn);
113       if (!prevslot.isLive())
114         continue;
115       Vector<Entry> liveentries = prevslot.getLiveEntries();
116       for(Entry liveentry:liveentries) {
117         if (s.hasSpace(liveentry))
118           s.addEntry(liveentry);
119         else
120           break search;
121       }
122     }
123     
124     int max=0;
125     if (forcedresize)
126       max = numslots + FORCED_RESIZE_INCREMENT;
127     Slot[] array=cloud.putSlot(s, max);
128     if (array == null)
129       array = new Slot[] {s};
130     else
131       insertedkv=false;
132     
133                 validateandupdate(array); // update data structure
134     
135     return insertedkv;
136         }
137
138         private void validateandupdate(Slot[] newslots) {
139                 //The cloud communication layer has checked slot HMACs already
140                 //before decoding
141                 if (newslots.length==0)
142                         return;
143
144                 long firstseqnum=newslots[0].getSequenceNumber();
145                 if (firstseqnum < sequencenumber)
146                         throw new Error("Server Error: Sent older slots!");
147
148                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
149                 checkHMACChain(indexer, newslots);
150
151     initExpectedSize();
152     for(Slot slot: newslots) {
153       updateExpectedSize();
154                         processSlot(indexer, slot);
155                 }
156     checkNumSlots(newslots.length);
157     commitNewMaxSize();
158
159     //commit new to slots
160     for(Slot slot:newslots) {
161       buffer.putSlot(slot);
162     }
163         }
164
165   private int expectedsize, currmaxsize;
166
167   private void checkNumSlots(int numslots) {
168     if (numslots != expectedsize)
169       throw new Error("Server Error: Server did not send all slots");
170   }
171   
172   private void initExpectedSize() {
173     expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
174     currmaxsize = numslots;
175   }
176
177   private void updateExpectedSize() {
178     expectedsize++;
179     if (expectedsize > currmaxsize)
180       expectedsize = currmaxsize;
181   }
182
183   private void updateCurrMaxSize(int newmaxsize) {
184     currmaxsize=newmaxsize;
185   }
186
187   private void commitNewMaxSize() {
188     if (numslots != currmaxsize)
189       buffer.resize(currmaxsize);
190
191     numslots=currmaxsize;
192   }
193   
194         private void processEntry(KeyValue entry, SlotIndexer indexer) {
195                 IoTString key=entry.getKey();
196                 KeyValue oldvalue=table.get(key);
197                 if (oldvalue != null) {
198                         oldvalue.setDead();
199                 }
200                 table.put(key, entry);
201         }
202
203         private void processEntry(LastMessage entry, SlotIndexer indexer) {
204                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
205         }
206
207         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
208                 long oldseqnum=entry.getOldSeqNum();
209                 long newseqnum=entry.getNewSeqNum();
210                 boolean isequal=entry.getEqual();
211                 long machineid=entry.getMachineID();
212                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
213                         Slot slot=indexer.getSlot(seqnum);
214                         if (slot != null) {
215                                 long slotmachineid=slot.getMachineID();
216                                 if (isequal!=(slotmachineid==machineid)) {
217                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
218                                 }
219                         }
220                 }
221         }
222
223         private void processEntry(TableStatus entry, SlotIndexer indexer) {
224     int newnumslots=entry.getMaxSlots();
225     updateCurrMaxSize(newnumslots);
226     if (lastTableStatus != null)
227       lastTableStatus.setDead();
228     lastTableStatus = entry;
229         }
230
231         private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
232                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
233                 if (lastmsgentry == null)
234                         return;
235
236                 long lastmsgseqnum = lastmsgentry.getFirst();
237                 Liveness lastentry = lastmsgentry.getSecond();
238                 if (lastentry instanceof LastMessage) {
239                         ((LastMessage)lastentry).setDead();
240                 } else if (lastentry instanceof Slot) {
241                         ((Slot)lastentry).setDead();
242                 } else {
243                         throw new Error("Unrecognized type");
244                 }
245
246                 if (machineid == localmachineid) {
247                         if (lastmsgseqnum != seqnum)
248                                 throw new Error("Server Error: Mismatch on local machine sequence number");
249                 } else {
250                         if (lastmsgseqnum > seqnum)
251                                 throw new Error("Server Error: Rollback on remote machine sequence number");
252                 }
253         }
254
255         private void processSlot(SlotIndexer indexer, Slot slot) {
256                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
257     
258                 for(Entry entry : slot.getEntries()) {
259                         switch(entry.getType()) {
260                         case Entry.TypeKeyValue:
261                                 processEntry((KeyValue)entry, indexer);
262                                 break;
263
264                         case Entry.TypeLastMessage:
265                                 processEntry((LastMessage)entry, indexer);
266                                 break;
267
268                         case Entry.TypeRejectedMessage:
269                                 processEntry((RejectedMessage)entry, indexer);
270                                 break;
271
272                         case Entry.TypeTableStatus:
273                                 processEntry((TableStatus)entry, indexer);
274                                 break;
275
276                         default:
277                                 throw new Error("Unrecognized type: "+entry.getType());
278                         }
279                 }
280         }
281
282         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
283                 for(int i=0; i < newslots.length; i++) {
284                         Slot currslot=newslots[i];
285                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
286                         if (prevslot != null &&
287                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
288                                 throw new Error("Server Error: Invalid HMAC Chain");
289                 }
290         }
291 }