edits
[iotcloud.git] / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5
6 final public class Table {
7         private int numslots;
8         private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
9         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
10         private SlotBuffer buffer;
11         private CloudComm cloud;
12         private long sequencenumber;
13         private long localmachineid;
14         private TableStatus lastTableStatus;
15         static final int FREE_SLOTS = 10;
16         static final int FORCED_RESIZE_INCREMENT = 20;
17
18         public Table(String baseurl, String password, long _localmachineid) {
19                 localmachineid=_localmachineid;
20                 buffer = new SlotBuffer();
21                 numslots = buffer.capacity();
22                 sequencenumber = 0;
23                 cloud=new CloudComm(baseurl, password);
24         }
25
26         public Table(CloudComm _cloud, long _localmachineid) {
27                 localmachineid=_localmachineid;
28                 buffer = new SlotBuffer();
29                 numslots = buffer.capacity();
30                 sequencenumber = 0;
31                 cloud=_cloud;
32         }
33
34         public void update() {
35                 Slot[] newslots=cloud.getSlots(sequencenumber+1);
36
37                 validateandupdate(newslots, false);
38         }
39
40         public IoTString get(IoTString key) {
41                 KeyValue kv=table.get(key);
42                 if (kv != null)
43                         return kv.getValue();
44                 else
45                         return null;
46         }
47
48         public void initTable() {
49                 Slot s=new Slot(1, localmachineid);
50                 TableStatus status=new TableStatus(s, numslots);
51                 s.addEntry(status);
52                 Slot[] array=cloud.putSlot(s, numslots);
53                 if (array == null) {
54                         array = new Slot[] {s};
55                         validateandupdate(array, true);                                                                         // update data structure
56                 } else {
57                         throw new Error("Error on initialization");
58                 }
59         }
60
61         public IoTString put(IoTString key, IoTString value) {
62                 while(true) {
63                         KeyValue oldvalue=table.get(key);
64                         if (tryput(key, value, false)) {
65                                 if (oldvalue==null)
66                                         return null;
67                                 else
68                                         return oldvalue.getValue();
69                         }
70                 }
71         }
72
73         private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
74                 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
75                 long seqn = buffer.getOldestSeqNum();
76
77                 if (forcedresize) {
78                         TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
79                         s.addEntry(status);
80                 }
81
82                 if ((numslots - buffer.size()) < FREE_SLOTS) {
83                         //have to check whether we have enough free slots
84                         long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
85                         seqn = fullfirstseqn < 1?1:fullfirstseqn;
86                         for(int i=0; i < FREE_SLOTS; i++, seqn++) {
87                                 Slot prevslot=buffer.getSlot(seqn);
88                                 if (!prevslot.isLive())
89                                         continue;
90                                 Vector<Entry> liveentries = prevslot.getLiveEntries();
91                                 for(Entry liveentry:liveentries) {
92                                         if (redundant(liveentry))
93                                                 continue;
94                                         if (s.hasSpace(liveentry))
95                                                 s.addEntry(liveentry);
96                                         else if (i==0) {
97                                                 if (s.canFit(liveentry))
98                                                         s.addEntry(liveentry);
99                                                 else if (!forcedresize) {
100                                                         return tryput(key, value, true);
101                                                 }
102                                         }
103                                 }
104                         }
105                 }
106                 KeyValue kv=new KeyValue(s, key, value);
107                 boolean insertedkv=false;
108                 if (s.hasSpace(kv)) {
109                         s.addEntry(kv);
110                         insertedkv=true;
111                 }
112
113                 long newestseqnum=buffer.getNewestSeqNum();
114 search:
115                 for(; seqn<=newestseqnum; seqn++) {
116                         Slot prevslot=buffer.getSlot(seqn);
117                         if (!prevslot.isLive())
118                                 continue;
119                         Vector<Entry> liveentries = prevslot.getLiveEntries();
120                         for(Entry liveentry:liveentries) {
121                                 if (redundant(liveentry))
122                                         continue;
123                                 if (s.hasSpace(liveentry))
124                                         s.addEntry(liveentry);
125                                 else
126                                         break search;
127                         }
128                 }
129
130                 int max=0;
131                 if (forcedresize)
132                         max = numslots + FORCED_RESIZE_INCREMENT;
133                 Slot[] array=cloud.putSlot(s, max);
134                 if (array == null)
135                         array = new Slot[] {s};
136                 else
137                         insertedkv=false;
138
139                 validateandupdate(array, true);                                                 // update data structure
140
141                 return insertedkv;
142         }
143
144         boolean redundant(Entry liveentry) {
145                 if (liveentry.getType()==Entry.TypeLastMessage) {
146                         LastMessage lastmsg=(LastMessage) liveentry;
147                         return lastmsg.getMachineID() == localmachineid;
148                 }
149                 return false;
150         }
151
152
153         private void validateandupdate(Slot[] newslots, boolean isput) {
154                 //The cloud communication layer has checked slot HMACs already
155                 //before decoding
156                 if (newslots.length==0)
157                         return;
158
159                 long firstseqnum=newslots[0].getSequenceNumber();
160                 if (firstseqnum <= sequencenumber)
161                         throw new Error("Server Error: Sent older slots!");
162
163                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
164                 checkHMACChain(indexer, newslots);
165
166                 initExpectedSize();
167                 for(Slot slot: newslots) {
168                         updateExpectedSize();
169                         processSlot(indexer, slot, isput);
170                 }
171
172                 //If there is a gap, check to see if the server sent us everything
173                 if (firstseqnum != (sequencenumber+1))
174                         checkNumSlots(newslots.length);
175
176                 commitNewMaxSize();
177
178                 //commit new to slots
179                 for(Slot slot:newslots) {
180                         buffer.putSlot(slot);
181                 }
182                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
183         }
184
185         private int expectedsize, currmaxsize;
186
187         private void checkNumSlots(int numslots) {
188                 if (numslots != expectedsize)
189                         throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
190         }
191
192         private void initExpectedSize() {
193                 long prevslots = sequencenumber;
194                 expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
195                 currmaxsize = numslots;
196         }
197
198         private void updateExpectedSize() {
199                 expectedsize++;
200                 if (expectedsize > currmaxsize)
201                         expectedsize = currmaxsize;
202         }
203
204         private void updateCurrMaxSize(int newmaxsize) {
205                 currmaxsize=newmaxsize;
206         }
207
208         private void commitNewMaxSize() {
209                 if (numslots != currmaxsize)
210                         buffer.resize(currmaxsize);
211
212                 numslots=currmaxsize;
213         }
214
215         private void processEntry(KeyValue entry, SlotIndexer indexer) {
216                 IoTString key=entry.getKey();
217                 KeyValue oldvalue=table.get(key);
218                 if (oldvalue != null) {
219                         oldvalue.setDead();
220                 }
221                 table.put(key, entry);
222         }
223
224         private void processEntry(LastMessage entry, SlotIndexer indexer) {
225                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
226         }
227
228         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
229                 long oldseqnum=entry.getOldSeqNum();
230                 long newseqnum=entry.getNewSeqNum();
231                 boolean isequal=entry.getEqual();
232                 long machineid=entry.getMachineID();
233                 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
234                         Slot slot=indexer.getSlot(seqnum);
235                         if (slot != null) {
236                                 long slotmachineid=slot.getMachineID();
237                                 if (isequal!=(slotmachineid==machineid)) {
238                                         throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
239                                 }
240                         }
241                 }
242         }
243
244         private void processEntry(TableStatus entry, SlotIndexer indexer) {
245                 int newnumslots=entry.getMaxSlots();
246                 updateCurrMaxSize(newnumslots);
247                 if (lastTableStatus != null)
248                         lastTableStatus.setDead();
249                 lastTableStatus = entry;
250         }
251
252         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
253                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
254                 if (lastmsgentry == null)
255                         return;
256
257                 long lastmsgseqnum = lastmsgentry.getFirst();
258                 Liveness lastentry = lastmsgentry.getSecond();
259                 if (lastentry instanceof LastMessage) {
260                         ((LastMessage)lastentry).setDead();
261                 } else if (lastentry instanceof Slot) {
262                         ((Slot)lastentry).setDead();
263                 } else {
264                         throw new Error("Unrecognized type");
265                 }
266
267                 if (machineid == localmachineid) {
268                         if (lastmsgseqnum != seqnum && !isput)
269                                 throw new Error("Server Error: Mismatch on local machine sequence number");
270                 } else {
271                         if (lastmsgseqnum > seqnum)
272                                 throw new Error("Server Error: Rollback on remote machine sequence number");
273                 }
274         }
275
276         private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
277                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
278
279                 for(Entry entry : slot.getEntries()) {
280                         switch(entry.getType()) {
281                         case Entry.TypeKeyValue:
282                                 processEntry((KeyValue)entry, indexer);
283                                 break;
284
285                         case Entry.TypeLastMessage:
286                                 processEntry((LastMessage)entry, indexer);
287                                 break;
288
289                         case Entry.TypeRejectedMessage:
290                                 processEntry((RejectedMessage)entry, indexer);
291                                 break;
292
293                         case Entry.TypeTableStatus:
294                                 processEntry((TableStatus)entry, indexer);
295                                 break;
296
297                         default:
298                                 throw new Error("Unrecognized type: "+entry.getType());
299                         }
300                 }
301         }
302
303         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
304                 for(int i=0; i < newslots.length; i++) {
305                         Slot currslot=newslots[i];
306                         Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
307                         if (prevslot != null &&
308                                         !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
309                                 throw new Error("Server Error: Invalid HMAC Chain");
310                 }
311         }
312 }