Initial commit of code for new version of block chain, does not compile (had to go...
[iotcloud.git] / src2 / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 /**
12  * IoTTable data structure.  Provides client inferface.
13  * @author Brian Demsky
14  * @version 1.0
15  */
16
17 final public class Table {
18         private int numslots;   //number of slots stored in buffer
19
20         //table of key-value pairs
21         private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
22
23         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
24         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
25         // machine id -> ...
26         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
27         private Vector<Long> rejectedmessagelist = new Vector<Long>();
28         private SlotBuffer buffer;
29         private CloudComm cloud;
30         private long sequencenumber; //Largest sequence number a client has received
31         private long localmachineid;
32         private TableStatus lastTableStatus;
33         static final int FREE_SLOTS = 10; //number of slots that should be kept free
34         static final int SKIP_THRESHOLD = 10;
35         private long liveslotcount = 0;
36         private int chance;
37         static final double RESIZE_MULTIPLE = 1.2;
38         static final double RESIZE_THRESHOLD = 0.75;
39         static final int REJECTED_THRESHOLD = 5;
40         private int resizethreshold;
41         private long lastliveslotseqn;  //smallest sequence number with a live entry
42         private Random random = new Random();
43
44         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
45         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
46
47
48         public Table(String baseurl, String password, long _localmachineid) {
49                 localmachineid = _localmachineid;
50                 buffer = new SlotBuffer();
51                 numslots = buffer.capacity();
52                 setResizeThreshold();
53                 sequencenumber = 0;
54                 cloud = new CloudComm(this, baseurl, password);
55                 lastliveslotseqn = 1;
56
57                 pendingTransQueue = new LinkedList<PendingTransaction>();
58         }
59
60         public Table(CloudComm _cloud, long _localmachineid) {
61                 localmachineid = _localmachineid;
62                 buffer = new SlotBuffer();
63                 numslots = buffer.capacity();
64                 setResizeThreshold();
65                 sequencenumber = 0;
66                 cloud = _cloud;
67
68                 pendingTransQueue = new LinkedList<PendingTransaction>();
69         }
70
71         public void rebuild() {
72                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
73                 validateandupdate(newslots, true);
74         }
75
76         public void update() {
77                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
78
79                 validateandupdate(newslots, false);
80         }
81
82         public IoTString get(IoTString key) {
83                 KeyValue kv = table.get(key);
84                 if (kv != null)
85                         return kv.getValue();
86                 else
87                         return null;
88         }
89
90         public void initTable() {
91                 cloud.setSalt();//Set the salt
92                 Slot s = new Slot(this, 1, localmachineid);
93                 TableStatus status = new TableStatus(s, numslots);
94                 s.addEntry(status);
95                 Slot[] array = cloud.putSlot(s, numslots);
96                 if (array == null) {
97                         array = new Slot[] {s};
98                         /* update data structure */
99                         validateandupdate(array, true);
100                 } else {
101                         throw new Error("Error on initialization");
102                 }
103         }
104
105         public String toString() {
106                 return table.toString();
107         }
108
109         public void startTransaction() {
110                 // Create a new transaction, invalidates any old pending transactions.
111                 pendingTransBuild = new PendingTransaction();
112         }
113
114         public void commitTransaction() {
115
116                 // Add the pending transaction to the queue
117                 pendingTransQueue.add(pendingTransBuild);
118
119                 while (!pendingTransQueue.isEmpty()) {
120                         if (tryput( pendingTransQueue.peek(), false)) {
121                                 pendingTransQueue.poll();
122                         }
123                 }
124         }
125
126         public void addKV(IoTString key, IoTString value) {
127                 KeyValue kv = new KeyValue(key, value);
128                 pendingTransBuild.addKV(kv);
129         }
130
131         public void addGuard(IoTString key, IoTString value) {
132                 KeyValue kv = new KeyValue(key, value);
133                 pendingTransBuild.addKV(kv);
134         }
135
136
137
138
139
140
141         void decrementLiveCount() {
142                 liveslotcount--;
143         }
144
145
146         private void setResizeThreshold() {
147                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
148                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
149         }
150
151         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
152                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
153                 int newsize = 0;
154                 if (liveslotcount > resizethreshold) {
155                         resize = true; //Resize is forced
156                 }
157
158                 if (resize) {
159                         newsize = (int) (numslots * RESIZE_MULTIPLE);
160                         TableStatus status = new TableStatus(s, newsize);
161                         s.addEntry(status);
162                 }
163
164                 if (! rejectedmessagelist.isEmpty()) {
165                         /* TODO: We should avoid generating a rejected message entry if
166                          * there is already a sufficient entry in the queue (e.g.,
167                          * equalsto value of true and same sequence number).  */
168
169                         long old_seqn = rejectedmessagelist.firstElement();
170                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
171                                 long new_seqn = rejectedmessagelist.lastElement();
172                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
173                                 s.addEntry(rm);
174                         } else {
175                                 long prev_seqn = -1;
176                                 int i = 0;
177                                 /* Go through list of missing messages */
178                                 for (; i < rejectedmessagelist.size(); i++) {
179                                         long curr_seqn = rejectedmessagelist.get(i);
180                                         Slot s_msg = buffer.getSlot(curr_seqn);
181                                         if (s_msg != null)
182                                                 break;
183                                         prev_seqn = curr_seqn;
184                                 }
185                                 /* Generate rejected message entry for missing messages */
186                                 if (prev_seqn != -1) {
187                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
188                                         s.addEntry(rm);
189                                 }
190                                 /* Generate rejected message entries for present messages */
191                                 for (; i < rejectedmessagelist.size(); i++) {
192                                         long curr_seqn = rejectedmessagelist.get(i);
193                                         Slot s_msg = buffer.getSlot(curr_seqn);
194                                         long machineid = s_msg.getMachineID();
195                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
196                                         s.addEntry(rm);
197                                 }
198                         }
199                 }
200
201                 long newestseqnum = buffer.getNewestSeqNum();
202                 long oldestseqnum = buffer.getOldestSeqNum();
203                 if (lastliveslotseqn < oldestseqnum)
204                         lastliveslotseqn = oldestseqnum;
205
206                 long seqn = lastliveslotseqn;
207                 boolean seenliveslot = false;
208                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
209                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
210
211                 for (; seqn < threshold; seqn++) {
212                         Slot prevslot = buffer.getSlot(seqn);
213                         //Push slot number forward
214                         if (! seenliveslot)
215                                 lastliveslotseqn = seqn;
216
217                         if (! prevslot.isLive())
218                                 continue;
219                         seenliveslot = true;
220                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
221                         for (Entry liveentry : liveentries) {
222                                 if (s.hasSpace(liveentry)) {
223                                         s.addEntry(liveentry);
224                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
225                                         if (!resize) {
226                                                 System.out.print("B"); //?
227                                                 return tryput(pendingTrans, true);
228                                         }
229                                 }
230                         }
231                 }
232
233
234                 Transaction trans = new Transaction(s,
235                                                     s.getSequenceNumber(),
236                                                     localmachineid,
237                                                     pendingTrans.getKVUpdates(),
238                                                     pendingTrans.getGuard());
239                 boolean insertedTrans = false;
240                 if (s.hasSpace(trans)) {
241                         s.addEntry(trans);
242                         insertedTrans=true;
243                 }
244
245                 /* now go through live entries from least to greatest sequence number until
246                  * either all live slots added, or the slot doesn't have enough room
247                  * for SKIP_THRESHOLD consecutive entries*/
248                 int skipcount = 0;
249                 search:
250                 for (; seqn <= newestseqnum; seqn++) {
251                         Slot prevslot = buffer.getSlot(seqn);
252                         //Push slot number forward
253                         if (!seenliveslot)
254                                 lastliveslotseqn = seqn;
255
256                         if (!prevslot.isLive())
257                                 continue;
258                         seenliveslot = true;
259                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
260                         for (Entry liveentry : liveentries) {
261                                 if (s.hasSpace(liveentry))
262                                         s.addEntry(liveentry);
263                                 else {
264                                         skipcount++;
265                                         if (skipcount > SKIP_THRESHOLD)
266                                                 break search;
267                                 }
268                         }
269                 }
270
271                 int max = 0;
272                 if (resize)
273                         max = newsize;
274                 Slot[] array = cloud.putSlot(s, max);
275                 if (array == null) {
276                         array = new Slot[] {s};
277                         rejectedmessagelist.clear();
278                 }       else {
279                         if (array.length == 0)
280                                 throw new Error("Server Error: Did not send any slots");
281                         rejectedmessagelist.add(s.getSequenceNumber());
282                         insertedTrans = false;
283                 }
284
285                 /* update data structure */
286                 validateandupdate(array, true);
287
288                 return insertedTrans;
289         }
290
291         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
292                 /* The cloud communication layer has checked slot HMACs already
293                          before decoding */
294                 if (newslots.length == 0) return;
295
296                 long firstseqnum = newslots[0].getSequenceNumber();
297                 if (firstseqnum <= sequencenumber)
298                         throw new Error("Server Error: Sent older slots!");
299
300                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
301                 checkHMACChain(indexer, newslots);
302
303                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
304
305                 initExpectedSize(firstseqnum);
306                 for (Slot slot : newslots) {
307                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
308                         updateExpectedSize();
309                 }
310
311                 /* If there is a gap, check to see if the server sent us everything. */
312                 if (firstseqnum != (sequencenumber + 1)) {
313                         checkNumSlots(newslots.length);
314                         if (!machineSet.isEmpty())
315                                 throw new Error("Missing record for machines: " + machineSet);
316                 }
317
318                 commitNewMaxSize();
319
320                 /* Commit new to slots. */
321                 for (Slot slot : newslots) {
322                         buffer.putSlot(slot);
323                         liveslotcount++;
324                 }
325                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
326         }
327
328         private int expectedsize, currmaxsize;
329
330         private void checkNumSlots(int numslots) {
331                 if (numslots != expectedsize)
332                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
333         }
334
335         private void initExpectedSize(long firstsequencenumber) {
336                 long prevslots = firstsequencenumber;
337                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
338                 currmaxsize = numslots;
339         }
340
341         private void updateExpectedSize() {
342                 expectedsize++;
343                 if (expectedsize > currmaxsize)
344                         expectedsize = currmaxsize;
345         }
346
347         private void updateCurrMaxSize(int newmaxsize) {
348                 currmaxsize = newmaxsize;
349         }
350
351         private void commitNewMaxSize() {
352                 if (numslots != currmaxsize)
353                         buffer.resize(currmaxsize);
354
355                 numslots = currmaxsize;
356                 setResizeThreshold();
357         }
358
359         // private void processEntry(KeyValue entry, SlotIndexer indexer) {
360         //      IoTString key=entry.getKey();
361         //      KeyValue oldvalue=table.get(key);
362         //      if (oldvalue != null) {
363         //              oldvalue.setDead();
364         //      }
365         //      table.put(key, entry);
366         // }
367
368         private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
369                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
370         }
371
372         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
373                 long oldseqnum = entry.getOldSeqNum();
374                 long newseqnum = entry.getNewSeqNum();
375                 boolean isequal = entry.getEqual();
376                 long machineid = entry.getMachineID();
377                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
378                         Slot slot = indexer.getSlot(seqnum);
379                         if (slot != null) {
380                                 long slotmachineid = slot.getMachineID();
381                                 if (isequal != (slotmachineid == machineid)) {
382                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
383                                 }
384                         }
385                 }
386
387                 HashSet<Long> watchset = new HashSet<Long>();
388                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
389                         long entry_mid = lastmsg_entry.getKey();
390                         /* We've seen it, don't need to continue to watch.  Our next
391                          * message will implicitly acknowledge it. */
392                         if (entry_mid == localmachineid)
393                                 continue;
394                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
395                         long entry_seqn = v.getFirst();
396                         if (entry_seqn < newseqnum) {
397                                 addWatchList(entry_mid, entry);
398                                 watchset.add(entry_mid);
399                         }
400                 }
401                 if (watchset.isEmpty())
402                         entry.setDead();
403                 else
404                         entry.setWatchSet(watchset);
405         }
406
407         private void addWatchList(long machineid, RejectedMessage entry) {
408                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
409                 if (entries == null)
410                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
411                 entries.add(entry);
412         }
413
414         private void processEntry(TableStatus entry, SlotIndexer indexer) {
415                 int newnumslots = entry.getMaxSlots();
416                 updateCurrMaxSize(newnumslots);
417                 if (lastTableStatus != null)
418                         lastTableStatus.setDead();
419                 lastTableStatus = entry;
420         }
421
422         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
423                 machineSet.remove(machineid);
424
425                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
426                 if (watchset != null) {
427                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
428                                 RejectedMessage rm = rmit.next();
429                                 if (rm.getNewSeqNum() <= seqnum) {
430                                         /* Remove it from our watchlist */
431                                         rmit.remove();
432                                         /* Decrement machines that need to see this notification */
433                                         rm.removeWatcher(machineid);
434                                 }
435                         }
436                 }
437
438                 if (machineid == localmachineid) {
439                         /* Our own messages are immediately dead. */
440                         if (liveness instanceof LastMessage) {
441                                 ((LastMessage)liveness).setDead();
442                         } else if (liveness instanceof Slot) {
443                                 ((Slot)liveness).setDead();
444                         } else {
445                                 throw new Error("Unrecognized type");
446                         }
447                 }
448
449
450                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
451                 if (lastmsgentry == null)
452                         return;
453
454                 long lastmsgseqnum = lastmsgentry.getFirst();
455                 Liveness lastentry = lastmsgentry.getSecond();
456                 if (machineid != localmachineid) {
457                         if (lastentry instanceof LastMessage) {
458                                 ((LastMessage)lastentry).setDead();
459                         } else if (lastentry instanceof Slot) {
460                                 ((Slot)lastentry).setDead();
461                         } else {
462                                 throw new Error("Unrecognized type");
463                         }
464                 }
465
466                 if (machineid == localmachineid) {
467                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
468                                 throw new Error("Server Error: Mismatch on local machine sequence number");
469                 } else {
470                         if (lastmsgseqnum > seqnum)
471                                 throw new Error("Server Error: Rollback on remote machine sequence number");
472                 }
473         }
474
475         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
476                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
477                 for (Entry entry : slot.getEntries()) {
478                         switch (entry.getType()) {
479                         // case Entry.TypeKeyValue:
480                         //      processEntry((KeyValue)entry, indexer);
481                         //      break;
482
483                         case Entry.TypeLastMessage:
484                                 processEntry((LastMessage)entry, indexer, machineSet);
485                                 break;
486
487                         case Entry.TypeRejectedMessage:
488                                 processEntry((RejectedMessage)entry, indexer);
489                                 break;
490
491                         case Entry.TypeTableStatus:
492                                 processEntry((TableStatus)entry, indexer);
493                                 break;
494
495                         default:
496                                 throw new Error("Unrecognized type: " + entry.getType());
497                         }
498                 }
499         }
500
501         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
502                 for (int i = 0; i < newslots.length; i++) {
503                         Slot currslot = newslots[i];
504                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
505                         if (prevslot != null &&
506                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
507                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
508                 }
509         }
510 }