more updates
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
6 import javax.crypto.*;
7
8 final public class Table {
9         private int numslots;
10         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12         private SlotBuffer buffer;
13         private CloudComm cloud;
14         private Mac hmac;
15         private long sequencenumber;
16         private long localmachineid;
17   private TableStatus lastTableStatus;
18   static final int FREE_SLOTS = 10;
19   static final int FORCED_RESIZE_INCREMENT = 20;
20   
21         public Table(String baseurl, String password, long _localmachineid) {
22                 localmachineid=_localmachineid;
23                 buffer = new SlotBuffer();
24                 numslots = buffer.capacity();
25                 sequencenumber = 1;
26                 initCloud(baseurl, password);
27         }
28
29         public Table(CloudComm _cloud, long _localmachineid) {
30                 localmachineid=_localmachineid;
31                 buffer = new SlotBuffer();
32                 sequencenumber = 1;
33                 cloud=_cloud;
34         }
35         
36         private void initCloud(String baseurl, String password) {
37                 try {
38                         SecretKeySpec secret=getKey(password);
39                         Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
40                         encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
41                         Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
42                         decryptCipher.init(Cipher.DECRYPT_MODE, secret);
43                         hmac = Mac.getInstance("HmacSHA256");
44                         hmac.init(secret);
45                         cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
46                 } catch (Exception e) {
47                         throw new Error("Failed To Initialize Ciphers");
48                 }
49         }
50
51         private SecretKeySpec getKey(String password) {
52                 try {
53                         PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
54                         SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
55                         SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
56                         return secret;
57                 } catch (Exception e) {
58                         throw new Error("Failed generating key.");
59                 }
60         }
61
62         public void update() {
63                 Slot[] newslots=cloud.getSlots(sequencenumber);
64                 validateandupdate(newslots);
65         }
66
67         public IoTString get(IoTString key) {
68                 KeyValue kv=table.get(key);
69                 if (kv != null)
70                         return kv.getValue();
71                 else
72                         return null;
73         }
74
75         public void initTable() {
76                 Slot s=new Slot(1, localmachineid);
77                 TableStatus status=new TableStatus(s, numslots);
78                 s.addEntry(status);
79     Slot[] array=cloud.putSlot(s, numslots);
80     if (array == null) {
81       array = new Slot[] {s};
82                         validateandupdate(array); // update data structure
83                 } else {
84                         throw new Error("Error on initialization");
85                 }
86         }
87         
88         public IoTString put(IoTString key, IoTString value) {
89     while(true) {
90       KeyValue oldvalue=table.get(key);
91       if (tryput(key, value, false)) {
92         return oldvalue.getValue();
93       }
94     }
95   }
96
97   private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
98                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
99     long seqn = buffer.getOldestSeqNum();
100
101                 if (forcedresize) {
102                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
103                         s.addEntry(status);
104                 }
105                 
106     if ((numslots - buffer.size()) < FREE_SLOTS) {
107       //have to check whether we have enough free slots
108       seqn = buffer.getNewestSeqNum() + 1 - numslots;
109       for(int i=0; i < FREE_SLOTS; i++, seqn++) {
110         Slot prevslot=buffer.getSlot(seqn);
111         if (!prevslot.isLive())
112           continue;
113         Vector<Entry> liveentries = prevslot.getLiveEntries();
114         for(Entry liveentry:liveentries) {
115           if (s.hasSpace(liveentry))
116             s.addEntry(liveentry);
117           else if (i==0) {
118             if (s.canFit(liveentry))
119               s.addEntry(liveentry);
120             else if (!forcedresize) {
121               return tryput(key, value, true);
122                                                 }
123                                         }
124         }
125       }
126     }
127     KeyValue kv=new KeyValue(s, key, value);
128     boolean insertedkv=false;
129     if (s.hasSpace(kv)) {
130       s.addEntry(kv);
131       insertedkv=true;
132     }
133
134     long newestseqnum=buffer.getNewestSeqNum();
135     search:
136     for(;seqn<=newestseqnum;seqn++) {
137       Slot prevslot=buffer.getSlot(seqn);
138       if (!prevslot.isLive())
139         continue;
140       Vector<Entry> liveentries = prevslot.getLiveEntries();
141       for(Entry liveentry:liveentries) {
142         if (s.hasSpace(liveentry))
143           s.addEntry(liveentry);
144         else
145           break search;
146       }
147     }
148     
149     int max=0;
150     if (forcedresize)
151       max = numslots + FORCED_RESIZE_INCREMENT;
152     Slot[] array=cloud.putSlot(s, max);
153     if (array == null)
154       array = new Slot[] {s};
155     else
156       insertedkv=false;
157     
158                 validateandupdate(array); // update data structure
159     
160     return insertedkv;
161         }
162
163         private void validateandupdate(Slot[] newslots) {
164                 //The cloud communication layer has checked slot HMACs already
165                 //before decoding
166                 if (newslots.length==0)
167                         return;
168
169                 long firstseqnum=newslots[0].getSequenceNumber();
170                 if (firstseqnum < sequencenumber)
171                         throw new Error("Server Error: Sent older slots!");
172
173                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
174                 checkHMACChain(indexer, newslots);
175
176     initExpectedSize();
177     for(Slot slot: newslots) {
178       updateExpectedSize();
179                         processSlot(indexer, slot);
180                 }
181     checkNumSlots(newslots.length);
182     commitNewMaxSize();
183
184     //commit new to slots
185     for(Slot slot:newslots) {
186       buffer.putSlot(slot);
187     }
188         }
189
190   private int expectedsize, currmaxsize;
191
192   private void checkNumSlots(int numslots) {
193     if (numslots != expectedsize)
194       throw new Error("Server Error: Server did not send all slots");
195   }
196   
197   private void initExpectedSize() {
198     expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
199     currmaxsize = numslots;
200   }
201
202   private void updateExpectedSize() {
203     expectedsize++;
204     if (expectedsize > currmaxsize)
205       expectedsize = currmaxsize;
206   }
207
208   private void updateCurrMaxSize(int newmaxsize) {
209     currmaxsize=newmaxsize;
210   }
211
212   private void commitNewMaxSize() {
213     if (numslots != currmaxsize)
214       buffer.resize(currmaxsize);
215
216     numslots=currmaxsize;
217   }
218   
219         private void processEntry(KeyValue entry, SlotIndexer indexer) {
220                 IoTString key=entry.getKey();
221                 KeyValue oldvalue=table.get(key);
222                 if (oldvalue != null) {
223                         oldvalue.setDead();
224                 }
225                 table.put(key, entry);
226         }
227
228         private void processEntry(LastMessage entry, SlotIndexer indexer) {
229                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
230         }
231
232         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
233                 long oldseqnum=entry.getOldSeqNum();
234                 long newseqnum=entry.getNewSeqNum();
235                 boolean isequal=entry.getEqual();
236                 long machineid=entry.getMachineID();
237                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
238                         Slot slot=indexer.getSlot(seqnum);
239                         if (slot != null) {
240                                 long slotmachineid=slot.getMachineID();
241                                 if (isequal!=(slotmachineid==machineid)) {
242                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
243                                 }
244                         }
245                 }
246         }
247
248         private void processEntry(TableStatus entry, SlotIndexer indexer) {
249     int newnumslots=entry.getMaxSlots();
250     updateCurrMaxSize(newnumslots);
251     if (lastTableStatus != null)
252       lastTableStatus.setDead();
253     lastTableStatus = entry;
254         }
255
256         private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
257                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
258                 if (lastmsgentry == null)
259                         return;
260
261                 long lastmsgseqnum = lastmsgentry.getFirst();
262                 Liveness lastentry = lastmsgentry.getSecond();
263                 if (lastentry instanceof LastMessage) {
264                         ((LastMessage)lastentry).setDead();
265                 } else if (lastentry instanceof Slot) {
266                         ((Slot)lastentry).setDead();
267                 } else {
268                         throw new Error("Unrecognized type");
269                 }
270
271                 if (machineid == localmachineid) {
272                         if (lastmsgseqnum != seqnum)
273                                 throw new Error("Server Error: Mismatch on local machine sequence number");
274                 } else {
275                         if (lastmsgseqnum > seqnum)
276                                 throw new Error("Server Error: Rollback on remote machine sequence number");
277                 }
278         }
279
280         private void processSlot(SlotIndexer indexer, Slot slot) {
281                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
282     
283                 for(Entry entry : slot.getEntries()) {
284                         switch(entry.getType()) {
285                         case Entry.TypeKeyValue:
286                                 processEntry((KeyValue)entry, indexer);
287                                 break;
288
289                         case Entry.TypeLastMessage:
290                                 processEntry((LastMessage)entry, indexer);
291                                 break;
292
293                         case Entry.TypeRejectedMessage:
294                                 processEntry((RejectedMessage)entry, indexer);
295                                 break;
296
297                         case Entry.TypeTableStatus:
298                                 processEntry((TableStatus)entry, indexer);
299                                 break;
300
301                         default:
302                                 throw new Error("Unrecognized type: "+entry.getType());
303                         }
304                 }
305         }
306
307         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
308                 for(int i=0; i < newslots.length; i++) {
309                         Slot currslot=newslots[i];
310                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
311                         if (prevslot != null &&
312                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
313                                 throw new Error("Server Error: Invalid HMAC Chain");
314                 }
315         }
316 }