more bug fixes
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5
6 final public class Table {
7         private int numslots;
8         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
9         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
10         private SlotBuffer buffer;
11         private CloudComm cloud;
12         private long sequencenumber;
13         private long localmachineid;
14         private TableStatus lastTableStatus;
15         static final int FREE_SLOTS = 10;
16         static final int FORCED_RESIZE_INCREMENT = 20;
17
18         public Table(String baseurl, String password, long _localmachineid) {
19                 localmachineid=_localmachineid;
20                 buffer = new SlotBuffer();
21                 numslots = buffer.capacity();
22                 sequencenumber = 0;
23                 cloud=new CloudComm(baseurl, password);
24         }
25
26         public Table(CloudComm _cloud, long _localmachineid) {
27                 localmachineid=_localmachineid;
28                 buffer = new SlotBuffer();
29                 numslots = buffer.capacity();
30                 sequencenumber = 0;
31                 cloud=_cloud;
32         }
33
34         public void rebuild() {
35                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
36                 validateandupdate(newslots, true);
37         }
38         
39         public void update() {
40                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
41
42                 validateandupdate(newslots, false);
43         }
44
45         public IoTString get(IoTString key) {
46                 KeyValue kv=table.get(key);
47                 if (kv != null)
48                         return kv.getValue();
49                 else
50                         return null;
51         }
52
53         public void initTable() {
54                 Slot s=new Slot(1, localmachineid);
55                 TableStatus status=new TableStatus(s, numslots);
56                 s.addEntry(status);
57                 Slot[] array=cloud.putSlot(s, numslots);
58                 if (array == null) {
59                         array = new Slot[] {s};
60                         validateandupdate(array, true);                                                                         // update data structure
61                 } else {
62                         throw new Error("Error on initialization");
63                 }
64         }
65
66         public IoTString put(IoTString key, IoTString value) {
67                 while(true) {
68                         KeyValue oldvalue=table.get(key);
69                         if (tryput(key, value, false)) {
70                                 if (oldvalue==null)
71                                         return null;
72                                 else
73                                         return oldvalue.getValue();
74                         }
75                 }
76         }
77
78         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
79                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
80                 long seqn = buffer.getOldestSeqNum();
81
82                 if (forcedresize) {
83                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
84                         s.addEntry(status);
85                 }
86
87                 if ((numslots - buffer.size()) < FREE_SLOTS) {
88                         //have to check whether we have enough free slots
89                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
90                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
91                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
92                                 Slot prevslot=buffer.getSlot(seqn);
93                                 if (!prevslot.isLive())
94                                         continue;
95                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
96                                 for(Entry liveentry:liveentries) {
97                                         if (redundant(liveentry))
98                                                 continue;
99                                         if (s.hasSpace(liveentry))
100                                                 s.addEntry(liveentry);
101                                         else if (i==0) {
102                                                 if (s.canFit(liveentry))
103                                                         s.addEntry(liveentry);
104                                                 else if (!forcedresize) {
105                                                         return tryput(key, value, true);
106                                                 }
107                                         }
108                                 }
109                         }
110                 }
111                 KeyValue kv=new KeyValue(s, key, value);
112                 boolean insertedkv=false;
113                 if (s.hasSpace(kv)) {
114                         s.addEntry(kv);
115                         insertedkv=true;
116                 }
117
118                 long newestseqnum=buffer.getNewestSeqNum();
119 search:
120                 for(; seqn<=newestseqnum; seqn++) {
121                         Slot prevslot=buffer.getSlot(seqn);
122                         if (!prevslot.isLive())
123                                 continue;
124                         Vector<Entry> liveentries = prevslot.getLiveEntries();
125                         for(Entry liveentry:liveentries) {
126                                 if (redundant(liveentry))
127                                         continue;
128                                 if (s.hasSpace(liveentry))
129                                         s.addEntry(liveentry);
130                                 else
131                                         break search;
132                         }
133                 }
134
135                 int max=0;
136                 if (forcedresize)
137                         max = numslots + FORCED_RESIZE_INCREMENT;
138                 Slot[] array=cloud.putSlot(s, max);
139                 if (array == null)
140                         array = new Slot[] {s};
141                 else
142                         insertedkv=false;
143
144                 validateandupdate(array, true);                                                 // update data structure
145
146                 return insertedkv;
147         }
148
149         boolean redundant(Entry liveentry) {
150                 if (liveentry.getType()==Entry.TypeLastMessage) {
151                         LastMessage lastmsg=(LastMessage) liveentry;
152                         return lastmsg.getMachineID() == localmachineid;
153                 }
154                 return false;
155         }
156
157
158         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
159                 //The cloud communication layer has checked slot HMACs already
160                 //before decoding
161                 if (newslots.length==0)
162                         return;
163
164                 long firstseqnum=newslots[0].getSequenceNumber();
165                 if (firstseqnum <= sequencenumber)
166                         throw new Error("Server Error: Sent older slots!");
167
168                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
169                 checkHMACChain(indexer, newslots);
170
171                 initExpectedSize();
172                 for(Slot slot: newslots) {
173                         updateExpectedSize();
174                         processSlot(indexer, slot, acceptupdatestolocal);
175                 }
176
177                 //If there is a gap, check to see if the server sent us everything
178                 if (firstseqnum != (sequencenumber+1))
179                         checkNumSlots(newslots.length);
180
181                 commitNewMaxSize();
182
183                 //commit new to slots
184                 for(Slot slot:newslots) {
185                         buffer.putSlot(slot);
186                 }
187                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
188         }
189
190         private int expectedsize, currmaxsize;
191
192         private void checkNumSlots(int numslots) {
193                 if (numslots != expectedsize)
194                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
195         }
196
197         private void initExpectedSize() {
198                 long prevslots = sequencenumber;
199                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
200                 currmaxsize = numslots;
201         }
202
203         private void updateExpectedSize() {
204                 expectedsize++;
205                 if (expectedsize > currmaxsize)
206                         expectedsize = currmaxsize;
207         }
208
209         private void updateCurrMaxSize(int newmaxsize) {
210                 currmaxsize=newmaxsize;
211         }
212
213         private void commitNewMaxSize() {
214                 if (numslots != currmaxsize)
215                         buffer.resize(currmaxsize);
216
217                 numslots=currmaxsize;
218         }
219
220         private void processEntry(KeyValue entry, SlotIndexer indexer) {
221                 IoTString key=entry.getKey();
222                 KeyValue oldvalue=table.get(key);
223                 if (oldvalue != null) {
224                         oldvalue.setDead();
225                 }
226                 table.put(key, entry);
227         }
228
229         private void processEntry(LastMessage entry, SlotIndexer indexer) {
230                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
231         }
232
233         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
234                 long oldseqnum=entry.getOldSeqNum();
235                 long newseqnum=entry.getNewSeqNum();
236                 boolean isequal=entry.getEqual();
237                 long machineid=entry.getMachineID();
238                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
239                         Slot slot=indexer.getSlot(seqnum);
240                         if (slot != null) {
241                                 long slotmachineid=slot.getMachineID();
242                                 if (isequal!=(slotmachineid==machineid)) {
243                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
244                                 }
245                         }
246                 }
247         }
248
249         private void processEntry(TableStatus entry, SlotIndexer indexer) {
250                 int newnumslots=entry.getMaxSlots();
251                 updateCurrMaxSize(newnumslots);
252                 if (lastTableStatus != null)
253                         lastTableStatus.setDead();
254                 lastTableStatus = entry;
255         }
256
257         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
258                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
259                 if (lastmsgentry == null)
260                         return;
261
262                 long lastmsgseqnum = lastmsgentry.getFirst();
263                 Liveness lastentry = lastmsgentry.getSecond();
264                 if (lastentry instanceof LastMessage) {
265                         ((LastMessage)lastentry).setDead();
266                 } else if (lastentry instanceof Slot) {
267                         ((Slot)lastentry).setDead();
268                 } else {
269                         throw new Error("Unrecognized type");
270                 }
271
272                 if (machineid == localmachineid) {
273                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
274                                 throw new Error("Server Error: Mismatch on local machine sequence number");
275                 } else {
276                         if (lastmsgseqnum > seqnum)
277                                 throw new Error("Server Error: Rollback on remote machine sequence number");
278                 }
279         }
280
281         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
282                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
283
284                 for(Entry entry : slot.getEntries()) {
285                         switch(entry.getType()) {
286                         case Entry.TypeKeyValue:
287                                 processEntry((KeyValue)entry, indexer);
288                                 break;
289
290                         case Entry.TypeLastMessage:
291                                 processEntry((LastMessage)entry, indexer);
292                                 break;
293
294                         case Entry.TypeRejectedMessage:
295                                 processEntry((RejectedMessage)entry, indexer);
296                                 break;
297
298                         case Entry.TypeTableStatus:
299                                 processEntry((TableStatus)entry, indexer);
300                                 break;
301
302                         default:
303                                 throw new Error("Unrecognized type: "+entry.getType());
304                         }
305                 }
306         }
307
308         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
309                 for(int i=0; i < newslots.length; i++) {
310                         Slot currslot=newslots[i];
311                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
312                         if (prevslot != null &&
313                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
314                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
315                 }
316         }
317 }