add some comments
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5
6 /**
7  * IoTTable data structure.  Provides client inferface.
8  * @author Brian Demsky
9  * @version 1.0
10  */
11
12
13 final public class Table {
14         private int numslots;
15         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
16         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
17         private SlotBuffer buffer;
18         private CloudComm cloud;
19         private long sequencenumber;
20         private long localmachineid;
21         private TableStatus lastTableStatus;
22         static final int FREE_SLOTS = 10;
23         static final int FORCED_RESIZE_INCREMENT = 20;
24
25         public Table(String baseurl, String password, long _localmachineid) {
26                 localmachineid=_localmachineid;
27                 buffer = new SlotBuffer();
28                 numslots = buffer.capacity();
29                 sequencenumber = 0;
30                 cloud=new CloudComm(baseurl, password);
31         }
32
33         public Table(CloudComm _cloud, long _localmachineid) {
34                 localmachineid=_localmachineid;
35                 buffer = new SlotBuffer();
36                 numslots = buffer.capacity();
37                 sequencenumber = 0;
38                 cloud=_cloud;
39         }
40
41         public void rebuild() {
42                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
43                 validateandupdate(newslots, true);
44         }
45
46         public void update() {
47                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
48
49                 validateandupdate(newslots, false);
50         }
51
52         public IoTString get(IoTString key) {
53                 KeyValue kv=table.get(key);
54                 if (kv != null)
55                         return kv.getValue();
56                 else
57                         return null;
58         }
59
60         public void initTable() {
61                 Slot s=new Slot(1, localmachineid);
62                 TableStatus status=new TableStatus(s, numslots);
63                 s.addEntry(status);
64                 Slot[] array=cloud.putSlot(s, numslots);
65                 if (array == null) {
66                         array = new Slot[] {s};
67                         /* update data structure */
68                         validateandupdate(array, true);
69                 } else {
70                         throw new Error("Error on initialization");
71                 }
72         }
73
74         public IoTString put(IoTString key, IoTString value) {
75                 while(true) {
76                         KeyValue oldvalue=table.get(key);
77                         if (tryput(key, value, false)) {
78                                 if (oldvalue==null)
79                                         return null;
80                                 else
81                                         return oldvalue.getValue();
82                         }
83                 }
84         }
85
86         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
87                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
88                 long seqn = buffer.getOldestSeqNum();
89
90                 if (forcedresize) {
91                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
92                         s.addEntry(status);
93                 }
94
95                 if ((numslots - buffer.size()) < FREE_SLOTS) {
96                         /* have to check whether we have enough free slots */
97                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
98                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
99                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
100                                 Slot prevslot=buffer.getSlot(seqn);
101                                 if (!prevslot.isLive())
102                                         continue;
103                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
104                                 for(Entry liveentry:liveentries) {
105                                         if (redundant(liveentry))
106                                                 continue;
107                                         if (s.hasSpace(liveentry))
108                                                 s.addEntry(liveentry);
109                                         else if (i==0) {
110                                                 if (s.canFit(liveentry))
111                                                         s.addEntry(liveentry);
112                                                 else if (!forcedresize) {
113                                                         return tryput(key, value, true);
114                                                 }
115                                         }
116                                 }
117                         }
118                 }
119                 KeyValue kv=new KeyValue(s, key, value);
120                 boolean insertedkv=false;
121                 if (s.hasSpace(kv)) {
122                         s.addEntry(kv);
123                         insertedkv=true;
124                 }
125
126                 long newestseqnum=buffer.getNewestSeqNum();
127 search:
128                 for(; seqn<=newestseqnum; seqn++) {
129                         Slot prevslot=buffer.getSlot(seqn);
130                         if (!prevslot.isLive())
131                                 continue;
132                         Vector<Entry> liveentries = prevslot.getLiveEntries();
133                         for(Entry liveentry:liveentries) {
134                                 if (redundant(liveentry))
135                                         continue;
136                                 if (s.hasSpace(liveentry))
137                                         s.addEntry(liveentry);
138                                 else
139                                         break search;
140                         }
141                 }
142
143                 int max=0;
144                 if (forcedresize)
145                         max = numslots + FORCED_RESIZE_INCREMENT;
146                 Slot[] array=cloud.putSlot(s, max);
147                 if (array == null)
148                         array = new Slot[] {s};
149                 else
150                         insertedkv=false;
151
152                 /* update data structure */
153                 validateandupdate(array, true);
154
155                 return insertedkv;
156         }
157
158         boolean redundant(Entry liveentry) {
159                 if (liveentry.getType()==Entry.TypeLastMessage) {
160                         LastMessage lastmsg=(LastMessage) liveentry;
161                         return lastmsg.getMachineID() == localmachineid;
162                 }
163                 return false;
164         }
165
166
167         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
168                 /* The cloud communication layer has checked slot HMACs already
169                          before decoding */
170                 if (newslots.length==0)
171                         return;
172
173                 long firstseqnum=newslots[0].getSequenceNumber();
174                 if (firstseqnum <= sequencenumber)
175                         throw new Error("Server Error: Sent older slots!");
176
177                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
178                 checkHMACChain(indexer, newslots);
179
180                 initExpectedSize();
181                 for(Slot slot: newslots) {
182                         updateExpectedSize();
183                         processSlot(indexer, slot, acceptupdatestolocal);
184                 }
185
186                 /* If there is a gap, check to see if the server sent us everything. */
187                 if (firstseqnum != (sequencenumber+1))
188                         checkNumSlots(newslots.length);
189
190                 commitNewMaxSize();
191
192                 /* Commit new to slots. */
193                 for(Slot slot:newslots) {
194                         buffer.putSlot(slot);
195                 }
196                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
197         }
198
199         private int expectedsize, currmaxsize;
200
201         private void checkNumSlots(int numslots) {
202                 if (numslots != expectedsize)
203                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
204         }
205
206         private void initExpectedSize() {
207                 long prevslots = sequencenumber;
208                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
209                 currmaxsize = numslots;
210         }
211
212         private void updateExpectedSize() {
213                 expectedsize++;
214                 if (expectedsize > currmaxsize)
215                         expectedsize = currmaxsize;
216         }
217
218         private void updateCurrMaxSize(int newmaxsize) {
219                 currmaxsize=newmaxsize;
220         }
221
222         private void commitNewMaxSize() {
223                 if (numslots != currmaxsize)
224                         buffer.resize(currmaxsize);
225
226                 numslots=currmaxsize;
227         }
228
229         private void processEntry(KeyValue entry, SlotIndexer indexer) {
230                 IoTString key=entry.getKey();
231                 KeyValue oldvalue=table.get(key);
232                 if (oldvalue != null) {
233                         oldvalue.setDead();
234                 }
235                 table.put(key, entry);
236         }
237
238         private void processEntry(LastMessage entry, SlotIndexer indexer) {
239                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
240         }
241
242         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
243                 long oldseqnum=entry.getOldSeqNum();
244                 long newseqnum=entry.getNewSeqNum();
245                 boolean isequal=entry.getEqual();
246                 long machineid=entry.getMachineID();
247                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
248                         Slot slot=indexer.getSlot(seqnum);
249                         if (slot != null) {
250                                 long slotmachineid=slot.getMachineID();
251                                 if (isequal!=(slotmachineid==machineid)) {
252                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
253                                 }
254                         }
255                 }
256         }
257
258         private void processEntry(TableStatus entry, SlotIndexer indexer) {
259                 int newnumslots=entry.getMaxSlots();
260                 updateCurrMaxSize(newnumslots);
261                 if (lastTableStatus != null)
262                         lastTableStatus.setDead();
263                 lastTableStatus = entry;
264         }
265
266         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
267                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
268                 if (lastmsgentry == null)
269                         return;
270
271                 long lastmsgseqnum = lastmsgentry.getFirst();
272                 Liveness lastentry = lastmsgentry.getSecond();
273                 if (lastentry instanceof LastMessage) {
274                         ((LastMessage)lastentry).setDead();
275                 } else if (lastentry instanceof Slot) {
276                         ((Slot)lastentry).setDead();
277                 } else {
278                         throw new Error("Unrecognized type");
279                 }
280
281                 if (machineid == localmachineid) {
282                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
283                                 throw new Error("Server Error: Mismatch on local machine sequence number");
284                 } else {
285                         if (lastmsgseqnum > seqnum)
286                                 throw new Error("Server Error: Rollback on remote machine sequence number");
287                 }
288         }
289
290         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
291                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
292
293                 for(Entry entry : slot.getEntries()) {
294                         switch(entry.getType()) {
295                         case Entry.TypeKeyValue:
296                                 processEntry((KeyValue)entry, indexer);
297                                 break;
298
299                         case Entry.TypeLastMessage:
300                                 processEntry((LastMessage)entry, indexer);
301                                 break;
302
303                         case Entry.TypeRejectedMessage:
304                                 processEntry((RejectedMessage)entry, indexer);
305                                 break;
306
307                         case Entry.TypeTableStatus:
308                                 processEntry((TableStatus)entry, indexer);
309                                 break;
310
311                         default:
312                                 throw new Error("Unrecognized type: "+entry.getType());
313                         }
314                 }
315         }
316
317         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
318                 for(int i=0; i < newslots.length; i++) {
319                         Slot currslot=newslots[i];
320                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
321                         if (prevslot != null &&
322                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
323                                 throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
324                 }
325         }
326 }