more bugs
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
6 import javax.crypto.*;
7
8 final public class Table {
9         private int numslots;
10         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12         private SlotBuffer buffer;
13         private CloudComm cloud;
14         private Mac hmac;
15         private long sequencenumber;
16         private long localmachineid;
17   private TableStatus lastTableStatus;
18   static final int FREE_SLOTS = 10;
19   static final int FORCED_RESIZE_INCREMENT = 20;
20   
21         public Table(String baseurl, String password, long _localmachineid) {
22                 localmachineid=_localmachineid;
23                 buffer = new SlotBuffer();
24                 numslots = buffer.capacity();
25                 sequencenumber = 0;
26                 initCloud(baseurl, password);
27         }
28
29         public Table(CloudComm _cloud, long _localmachineid) {
30                 localmachineid=_localmachineid;
31                 buffer = new SlotBuffer();
32                 numslots = buffer.capacity();
33                 sequencenumber = 0;
34                 cloud=_cloud;
35         }
36         
37         private void initCloud(String baseurl, String password) {
38                 try {
39                         SecretKeySpec secret=getKey(password);
40                         Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
41                         encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
42                         Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
43                         decryptCipher.init(Cipher.DECRYPT_MODE, secret);
44                         hmac = Mac.getInstance("HmacSHA256");
45                         hmac.init(secret);
46                         cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
47                 } catch (Exception e) {
48                         throw new Error("Failed To Initialize Ciphers");
49                 }
50         }
51
52         private SecretKeySpec getKey(String password) {
53                 try {
54                         PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
55                         SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
56                         SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
57                         return secret;
58                 } catch (Exception e) {
59                         throw new Error("Failed generating key.");
60                 }
61         }
62
63         public void update() {
64                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
65                 validateandupdate(newslots, false);
66         }
67
68         public IoTString get(IoTString key) {
69                 KeyValue kv=table.get(key);
70                 if (kv != null)
71                         return kv.getValue();
72                 else
73                         return null;
74         }
75
76         public void initTable() {
77                 Slot s=new Slot(1, localmachineid);
78                 TableStatus status=new TableStatus(s, numslots);
79                 s.addEntry(status);
80     Slot[] array=cloud.putSlot(s, numslots);
81     if (array == null) {
82       array = new Slot[] {s};
83                         validateandupdate(array, true); // update data structure
84                 } else {
85                         throw new Error("Error on initialization");
86                 }
87         }
88         
89         public IoTString put(IoTString key, IoTString value) {
90     while(true) {
91       KeyValue oldvalue=table.get(key);
92       if (tryput(key, value, false)) {
93                                 if (oldvalue==null)
94                                         return null;
95                                 else
96                                         return oldvalue.getValue();
97       }
98     }
99   }
100
101   private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
102                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
103     long seqn = buffer.getOldestSeqNum();
104
105                 if (forcedresize) {
106                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
107                         s.addEntry(status);
108                 }
109                 
110     if ((numslots - buffer.size()) < FREE_SLOTS) {
111       //have to check whether we have enough free slots
112       long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
113                         seqn = fullfirstseqn < 1 ? 1: fullfirstseqn;
114       for(int i=0; i < FREE_SLOTS; i++, seqn++) {
115         Slot prevslot=buffer.getSlot(seqn);
116         if (!prevslot.isLive())
117           continue;
118         Vector<Entry> liveentries = prevslot.getLiveEntries();
119         for(Entry liveentry:liveentries) {
120                                         if (redundant(liveentry))
121                                                 continue;
122           if (s.hasSpace(liveentry))
123             s.addEntry(liveentry);
124           else if (i==0) {
125             if (s.canFit(liveentry))
126               s.addEntry(liveentry);
127             else if (!forcedresize) {
128               return tryput(key, value, true);
129                                                 }
130                                         }
131         }
132       }
133     }
134     KeyValue kv=new KeyValue(s, key, value);
135     boolean insertedkv=false;
136     if (s.hasSpace(kv)) {
137       s.addEntry(kv);
138       insertedkv=true;
139     }
140
141     long newestseqnum=buffer.getNewestSeqNum();
142     search:
143     for(;seqn<=newestseqnum;seqn++) {
144       Slot prevslot=buffer.getSlot(seqn);
145       if (!prevslot.isLive())
146         continue;
147       Vector<Entry> liveentries = prevslot.getLiveEntries();
148       for(Entry liveentry:liveentries) {
149                                 if (redundant(liveentry))
150                                         continue;
151         if (s.hasSpace(liveentry))
152           s.addEntry(liveentry);
153         else
154           break search;
155       }
156     }
157     
158     int max=0;
159     if (forcedresize)
160       max = numslots + FORCED_RESIZE_INCREMENT;
161     Slot[] array=cloud.putSlot(s, max);
162     if (array == null)
163       array = new Slot[] {s};
164     else
165       insertedkv=false;
166     
167                 validateandupdate(array, true); // update data structure
168     
169     return insertedkv;
170         }
171
172         boolean redundant(Entry liveentry) {
173                 if (liveentry.getType()==Entry.TypeLastMessage) {
174                         LastMessage lastmsg=(LastMessage) liveentry;
175                         return lastmsg.getMachineID() == localmachineid;
176                 }
177                 return false;
178         }
179
180         
181         private void validateandupdate(Slot[] newslots, boolean isput) {
182                 //The cloud communication layer has checked slot HMACs already
183                 //before decoding
184                 if (newslots.length==0)
185                         return;
186
187                 long firstseqnum=newslots[0].getSequenceNumber();
188                 if (firstseqnum <= sequencenumber)
189                         throw new Error("Server Error: Sent older slots!");
190
191                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
192                 checkHMACChain(indexer, newslots);
193
194     initExpectedSize();
195     for(Slot slot: newslots) {
196       updateExpectedSize();
197                         processSlot(indexer, slot, isput);
198                 }
199
200                 //If there is a gap, check to see if the server sent us everything
201                 if (firstseqnum != (sequencenumber+1))
202                         checkNumSlots(newslots.length);
203                 
204     commitNewMaxSize();
205
206     //commit new to slots
207     for(Slot slot:newslots) {
208       buffer.putSlot(slot);
209     }
210                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
211         }
212
213   private int expectedsize, currmaxsize;
214
215   private void checkNumSlots(int numslots) {
216     if (numslots != expectedsize)
217       throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
218   }
219   
220   private void initExpectedSize() {
221                 long prevslots = sequencenumber;
222     expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
223     currmaxsize = numslots;
224   }
225
226   private void updateExpectedSize() {
227     expectedsize++;
228     if (expectedsize > currmaxsize)
229       expectedsize = currmaxsize;
230   }
231
232   private void updateCurrMaxSize(int newmaxsize) {
233     currmaxsize=newmaxsize;
234   }
235
236   private void commitNewMaxSize() {
237     if (numslots != currmaxsize)
238       buffer.resize(currmaxsize);
239
240     numslots=currmaxsize;
241   }
242   
243         private void processEntry(KeyValue entry, SlotIndexer indexer) {
244                 IoTString key=entry.getKey();
245                 KeyValue oldvalue=table.get(key);
246                 if (oldvalue != null) {
247                         oldvalue.setDead();
248                 }
249                 table.put(key, entry);
250         }
251
252         private void processEntry(LastMessage entry, SlotIndexer indexer) {
253                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
254         }
255
256         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
257                 long oldseqnum=entry.getOldSeqNum();
258                 long newseqnum=entry.getNewSeqNum();
259                 boolean isequal=entry.getEqual();
260                 long machineid=entry.getMachineID();
261                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
262                         Slot slot=indexer.getSlot(seqnum);
263                         if (slot != null) {
264                                 long slotmachineid=slot.getMachineID();
265                                 if (isequal!=(slotmachineid==machineid)) {
266                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
267                                 }
268                         }
269                 }
270         }
271
272         private void processEntry(TableStatus entry, SlotIndexer indexer) {
273     int newnumslots=entry.getMaxSlots();
274     updateCurrMaxSize(newnumslots);
275     if (lastTableStatus != null)
276       lastTableStatus.setDead();
277     lastTableStatus = entry;
278         }
279
280         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
281                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
282                 if (lastmsgentry == null)
283                         return;
284
285                 long lastmsgseqnum = lastmsgentry.getFirst();
286                 Liveness lastentry = lastmsgentry.getSecond();
287                 if (lastentry instanceof LastMessage) {
288                         ((LastMessage)lastentry).setDead();
289                 } else if (lastentry instanceof Slot) {
290                         ((Slot)lastentry).setDead();
291                 } else {
292                         throw new Error("Unrecognized type");
293                 }
294
295                 if (machineid == localmachineid) {
296                         if (lastmsgseqnum != seqnum && !isput)
297                                 throw new Error("Server Error: Mismatch on local machine sequence number");
298                 } else {
299                         if (lastmsgseqnum > seqnum)
300                                 throw new Error("Server Error: Rollback on remote machine sequence number");
301                 }
302         }
303
304         private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
305                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
306     
307                 for(Entry entry : slot.getEntries()) {
308                         switch(entry.getType()) {
309                         case Entry.TypeKeyValue:
310                                 processEntry((KeyValue)entry, indexer);
311                                 break;
312
313                         case Entry.TypeLastMessage:
314                                 processEntry((LastMessage)entry, indexer);
315                                 break;
316
317                         case Entry.TypeRejectedMessage:
318                                 processEntry((RejectedMessage)entry, indexer);
319                                 break;
320
321                         case Entry.TypeTableStatus:
322                                 processEntry((TableStatus)entry, indexer);
323                                 break;
324
325                         default:
326                                 throw new Error("Unrecognized type: "+entry.getType());
327                         }
328                 }
329         }
330
331         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
332                 for(int i=0; i < newslots.length; i++) {
333                         Slot currslot=newslots[i];
334                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
335                         if (prevslot != null &&
336                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
337                                 throw new Error("Server Error: Invalid HMAC Chain");
338                 }
339         }
340 }