Tex typo issues, Code
[iotcloud.git] / src2 / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.List;
12
13 /**
14  * IoTTable data structure.  Provides client inferface.
15  * @author Brian Demsky
16  * @version 1.0
17  */
18
19 final public class Table {
20         private int numslots;   //number of slots stored in buffer
21
22         //table of key-value pairs
23         private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
24
25         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
26         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
27         // machine id -> ...
28         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
29         private Vector<Long> rejectedmessagelist = new Vector<Long>();
30         private SlotBuffer buffer;
31         private CloudComm cloud;
32         private long sequencenumber; //Largest sequence number a client has received
33         private long localmachineid;
34         private TableStatus lastTableStatus;
35         static final int FREE_SLOTS = 10; //number of slots that should be kept free
36         static final int SKIP_THRESHOLD = 10;
37         private long liveslotcount = 0;
38         private int chance;
39         static final double RESIZE_MULTIPLE = 1.2;
40         static final double RESIZE_THRESHOLD = 0.75;
41         static final int REJECTED_THRESHOLD = 5;
42         private int resizethreshold;
43         private long lastliveslotseqn;  //smallest sequence number with a live entry
44         private Random random = new Random();
45
46         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
47         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
48         private List<Commit> commitList = null; // List of all the most recent live commits
49         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
50         private List<Transaction> uncommittedTransactionsList = null; //
51         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
52         private Set<Abort> arbitratorTable = null; // Table of arbitrators
53
54
55         public Table(String baseurl, String password, long _localmachineid) {
56                 localmachineid = _localmachineid;
57                 buffer = new SlotBuffer();
58                 numslots = buffer.capacity();
59                 setResizeThreshold();
60                 sequencenumber = 0;
61                 cloud = new CloudComm(this, baseurl, password);
62                 lastliveslotseqn = 1;
63
64                 pendingTransQueue = new LinkedList<PendingTransaction>();
65                 commitList = new LinkedList<Commit>();
66                 commitedTable = new HashMap<IoTString, KeyValue>();
67                 uncommittedTransactionsList = new LinkedList<Transaction>();
68                 arbitratorTable = new HashMap<IoTString, Long>();
69         }
70
71         public Table(CloudComm _cloud, long _localmachineid) {
72                 localmachineid = _localmachineid;
73                 buffer = new SlotBuffer();
74                 numslots = buffer.capacity();
75                 setResizeThreshold();
76                 sequencenumber = 0;
77                 cloud = _cloud;
78
79                 pendingTransQueue = new LinkedList<PendingTransaction>();
80                 commitList = new LinkedList<Commit>();
81                 commitedTable = new HashMap<IoTString, KeyValue>();
82                 uncommittedTransactionsList = new LinkedList<Transaction>();
83                 arbitratorTable = new HashMap<IoTString, Long>();
84         }
85
86         public void rebuild() {
87                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
88                 validateandupdate(newslots, true);
89         }
90
91         public void update() {
92                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
93
94                 validateandupdate(newslots, false);
95         }
96
97         public IoTString get(IoTString key) {
98                 KeyValue kv = table.get(key);
99                 if (kv != null)
100                         return kv.getValue();
101                 else
102                         return null;
103         }
104
105         public void initTable() {
106                 cloud.setSalt();//Set the salt
107                 Slot s = new Slot(this, 1, localmachineid);
108                 TableStatus status = new TableStatus(s, numslots);
109                 s.addEntry(status);
110                 Slot[] array = cloud.putSlot(s, numslots);
111                 if (array == null) {
112                         array = new Slot[] {s};
113                         /* update data structure */
114                         validateandupdate(array, true);
115                 } else {
116                         throw new Error("Error on initialization");
117                 }
118         }
119
120         public String toString() {
121                 return table.toString();
122         }
123
124         public void startTransaction() {
125                 // Create a new transaction, invalidates any old pending transactions.
126                 pendingTransBuild = new PendingTransaction();
127         }
128
129         public void commitTransaction() {
130
131                 // Add the pending transaction to the queue
132                 pendingTransQueue.add(pendingTransBuild);
133
134                 while (!pendingTransQueue.isEmpty()) {
135                         if (tryput( pendingTransQueue.peek(), false)) {
136                                 pendingTransQueue.poll();
137                         }
138                 }
139         }
140
141         public void addKV(IoTString key, IoTString value) {
142                 KeyValue kv = new KeyValue(key, value);
143                 pendingTransBuild.addKV(kv);
144         }
145
146         public void addGuard(IoTString key, IoTString value) {
147                 KeyValue kv = new KeyValue(key, value);
148                 pendingTransBuild.addKV(kv);
149         }
150
151         void decrementLiveCount() {
152                 liveslotcount--;
153         }
154
155
156         private void setResizeThreshold() {
157                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
158                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
159         }
160
161         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
162                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
163                 int newsize = 0;
164                 if (liveslotcount > resizethreshold) {
165                         resize = true; //Resize is forced
166                 }
167
168                 if (resize) {
169                         newsize = (int) (numslots * RESIZE_MULTIPLE);
170                         TableStatus status = new TableStatus(s, newsize);
171                         s.addEntry(status);
172                 }
173
174                 if (! rejectedmessagelist.isEmpty()) {
175                         /* TODO: We should avoid generating a rejected message entry if
176                          * there is already a sufficient entry in the queue (e.g.,
177                          * equalsto value of true and same sequence number).  */
178
179                         long old_seqn = rejectedmessagelist.firstElement();
180                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
181                                 long new_seqn = rejectedmessagelist.lastElement();
182                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
183                                 s.addEntry(rm);
184                         } else {
185                                 long prev_seqn = -1;
186                                 int i = 0;
187                                 /* Go through list of missing messages */
188                                 for (; i < rejectedmessagelist.size(); i++) {
189                                         long curr_seqn = rejectedmessagelist.get(i);
190                                         Slot s_msg = buffer.getSlot(curr_seqn);
191                                         if (s_msg != null)
192                                                 break;
193                                         prev_seqn = curr_seqn;
194                                 }
195                                 /* Generate rejected message entry for missing messages */
196                                 if (prev_seqn != -1) {
197                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
198                                         s.addEntry(rm);
199                                 }
200                                 /* Generate rejected message entries for present messages */
201                                 for (; i < rejectedmessagelist.size(); i++) {
202                                         long curr_seqn = rejectedmessagelist.get(i);
203                                         Slot s_msg = buffer.getSlot(curr_seqn);
204                                         long machineid = s_msg.getMachineID();
205                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
206                                         s.addEntry(rm);
207                                 }
208                         }
209                 }
210
211                 long newestseqnum = buffer.getNewestSeqNum();
212                 long oldestseqnum = buffer.getOldestSeqNum();
213                 if (lastliveslotseqn < oldestseqnum)
214                         lastliveslotseqn = oldestseqnum;
215
216                 long seqn = lastliveslotseqn;
217                 boolean seenliveslot = false;
218                 long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
219                 long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
220
221                 for (; seqn < threshold; seqn++) {
222                         Slot prevslot = buffer.getSlot(seqn);
223                         //Push slot number forward
224                         if (! seenliveslot)
225                                 lastliveslotseqn = seqn;
226
227                         if (! prevslot.isLive())
228                                 continue;
229                         seenliveslot = true;
230                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
231                         for (Entry liveentry : liveentries) {
232                                 if (s.hasSpace(liveentry)) {
233                                         s.addEntry(liveentry);
234                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
235                                         if (!resize) {
236                                                 System.out.print("B"); //?
237                                                 return tryput(pendingTrans, true);
238                                         }
239                                 }
240                         }
241                 }
242
243
244                 Transaction trans = new Transaction(s,
245                                                     s.getSequenceNumber(),
246                                                     localmachineid,
247                                                     pendingTrans.getKVUpdates(),
248                                                     pendingTrans.getGuard());
249                 boolean insertedTrans = false;
250                 if (s.hasSpace(trans)) {
251                         s.addEntry(trans);
252                         insertedTrans = true;
253                 }
254
255                 /* now go through live entries from least to greatest sequence number until
256                  * either all live slots added, or the slot doesn't have enough room
257                  * for SKIP_THRESHOLD consecutive entries*/
258                 int skipcount = 0;
259                 search:
260                 for (; seqn <= newestseqnum; seqn++) {
261                         Slot prevslot = buffer.getSlot(seqn);
262                         //Push slot number forward
263                         if (!seenliveslot)
264                                 lastliveslotseqn = seqn;
265
266                         if (!prevslot.isLive())
267                                 continue;
268                         seenliveslot = true;
269                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
270                         for (Entry liveentry : liveentries) {
271                                 if (s.hasSpace(liveentry))
272                                         s.addEntry(liveentry);
273                                 else {
274                                         skipcount++;
275                                         if (skipcount > SKIP_THRESHOLD)
276                                                 break search;
277                                 }
278                         }
279                 }
280
281                 int max = 0;
282                 if (resize)
283                         max = newsize;
284                 Slot[] array = cloud.putSlot(s, max);
285                 if (array == null) {
286                         array = new Slot[] {s};
287                         rejectedmessagelist.clear();
288                 }       else {
289                         if (array.length == 0)
290                                 throw new Error("Server Error: Did not send any slots");
291                         rejectedmessagelist.add(s.getSequenceNumber());
292                         insertedTrans = false;
293                 }
294
295                 /* update data structure */
296                 validateandupdate(array, true);
297
298                 return insertedTrans;
299         }
300
301         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
302                 /* The cloud communication layer has checked slot HMACs already
303                          before decoding */
304                 if (newslots.length == 0) return;
305
306                 long firstseqnum = newslots[0].getSequenceNumber();
307                 if (firstseqnum <= sequencenumber)
308                         throw new Error("Server Error: Sent older slots!");
309
310                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
311                 checkHMACChain(indexer, newslots);
312
313                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
314
315                 initExpectedSize(firstseqnum);
316                 for (Slot slot : newslots) {
317                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
318                         updateExpectedSize();
319                 }
320
321                 /* If there is a gap, check to see if the server sent us everything. */
322                 if (firstseqnum != (sequencenumber + 1)) {
323                         checkNumSlots(newslots.length);
324                         if (!machineSet.isEmpty())
325                                 throw new Error("Missing record for machines: " + machineSet);
326                 }
327
328                 commitNewMaxSize();
329
330                 /* Commit new to slots. */
331                 for (Slot slot : newslots) {
332                         buffer.putSlot(slot);
333                         liveslotcount++;
334                 }
335                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
336         }
337
338         private int expectedsize, currmaxsize;
339
340         private void checkNumSlots(int numslots) {
341                 if (numslots != expectedsize)
342                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
343         }
344
345         private void initExpectedSize(long firstsequencenumber) {
346                 long prevslots = firstsequencenumber;
347                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
348                 currmaxsize = numslots;
349         }
350
351         private void updateExpectedSize() {
352                 expectedsize++;
353                 if (expectedsize > currmaxsize)
354                         expectedsize = currmaxsize;
355         }
356
357         private void updateCurrMaxSize(int newmaxsize) {
358                 currmaxsize = newmaxsize;
359         }
360
361         private void commitNewMaxSize() {
362                 if (numslots != currmaxsize)
363                         buffer.resize(currmaxsize);
364
365                 numslots = currmaxsize;
366                 setResizeThreshold();
367         }
368
369         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
370                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
371         }
372
373         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
374                 long oldseqnum = entry.getOldSeqNum();
375                 long newseqnum = entry.getNewSeqNum();
376                 boolean isequal = entry.getEqual();
377                 long machineid = entry.getMachineID();
378                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
379                         Slot slot = indexer.getSlot(seqnum);
380                         if (slot != null) {
381                                 long slotmachineid = slot.getMachineID();
382                                 if (isequal != (slotmachineid == machineid)) {
383                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
384                                 }
385                         }
386                 }
387
388                 HashSet<Long> watchset = new HashSet<Long>();
389                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
390                         long entry_mid = lastmsg_entry.getKey();
391                         /* We've seen it, don't need to continue to watch.  Our next
392                          * message will implicitly acknowledge it. */
393                         if (entry_mid == localmachineid)
394                                 continue;
395                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
396                         long entry_seqn = v.getFirst();
397                         if (entry_seqn < newseqnum) {
398                                 addWatchList(entry_mid, entry);
399                                 watchset.add(entry_mid);
400                         }
401                 }
402                 if (watchset.isEmpty())
403                         entry.setDead();
404                 else
405                         entry.setWatchSet(watchset);
406         }
407
408         private void processEntry(NewKey entry) {
409                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
410         }
411
412         private void processEntry(Transaction entry) {
413                 uncommittedTransactionsList.add(entry);
414         }
415
416         private void processEntry(Abort entry) {
417                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
418                         Transaction prevtrans = i.next();
419                         if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
420                                 uncommittedTransactionsList.remove(prevtrans);
421                                 prevtrans.setDead();
422                                 return;
423                         }
424                 }
425         }
426
427         private void processEntry(Commit entry) {
428
429                 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
430                         Commit prevcommit = i.next();
431                         prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
432
433                         if (!prevcommit.isLive()) {
434                                 commitList.remove(prevcommit);
435                         }
436                 }
437
438                 commitList.add(entry);
439
440                 // Update the committed table list
441                 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
442                         IoTString key = kv.getKey();
443                         commitedTable.put(key, kv);
444                 }
445
446                 long committedTransSeq = entry.getTransSequenceNumber();
447
448                 // Make dead the transactions
449                 for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
450                         Transaction prevtrans = i.next();
451
452                         if (prevtrans.getSequenceNumber() <= committedTransSeq) {
453                                 uncommittedTransactionsList.remove(prevtrans);
454                                 prevtrans.setDead();
455                         }
456                 }
457         }
458
459         private void addWatchList(long machineid, RejectedMessage entry) {
460                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
461                 if (entries == null)
462                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
463                 entries.add(entry);
464         }
465
466         private void processEntry(TableStatus entry) {
467                 int newnumslots = entry.getMaxSlots();
468                 updateCurrMaxSize(newnumslots);
469                 if (lastTableStatus != null)
470                         lastTableStatus.setDead();
471                 lastTableStatus = entry;
472         }
473
474         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
475                 machineSet.remove(machineid);
476
477                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
478                 if (watchset != null) {
479                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
480                                 RejectedMessage rm = rmit.next();
481                                 if (rm.getNewSeqNum() <= seqnum) {
482                                         /* Remove it from our watchlist */
483                                         rmit.remove();
484                                         /* Decrement machines that need to see this notification */
485                                         rm.removeWatcher(machineid);
486                                 }
487                         }
488                 }
489
490                 if (machineid == localmachineid) {
491                         /* Our own messages are immediately dead. */
492                         if (liveness instanceof LastMessage) {
493                                 ((LastMessage)liveness).setDead();
494                         } else if (liveness instanceof Slot) {
495                                 ((Slot)liveness).setDead();
496                         } else {
497                                 throw new Error("Unrecognized type");
498                         }
499                 }
500
501
502                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
503                 if (lastmsgentry == null)
504                         return;
505
506                 long lastmsgseqnum = lastmsgentry.getFirst();
507                 Liveness lastentry = lastmsgentry.getSecond();
508                 if (machineid != localmachineid) {
509                         if (lastentry instanceof LastMessage) {
510                                 ((LastMessage)lastentry).setDead();
511                         } else if (lastentry instanceof Slot) {
512                                 ((Slot)lastentry).setDead();
513                         } else {
514                                 throw new Error("Unrecognized type");
515                         }
516                 }
517
518                 if (machineid == localmachineid) {
519                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
520                                 throw new Error("Server Error: Mismatch on local machine sequence number");
521                 } else {
522                         if (lastmsgseqnum > seqnum)
523                                 throw new Error("Server Error: Rollback on remote machine sequence number");
524                 }
525         }
526
527         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
528                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
529                 for (Entry entry : slot.getEntries()) {
530                         switch (entry.getType()) {
531
532                         case Entry.TypeNewKey:
533                                 processEntry((NewKey)entry);
534                                 break;
535
536                         case Entry.TypeCommit:
537                                 processEntry((Commit)entry);
538                                 break;
539
540                         case Entry.TypeAbort:
541                                 processEntry((Abort)entry);
542                                 break;
543
544                         case Entry.TypeTransaction:
545                                 processEntry((Transaction)entry);
546                                 break;
547
548                         case Entry.TypeLastMessage:
549                                 processEntry((LastMessage)entry, machineSet);
550                                 break;
551
552                         case Entry.TypeRejectedMessage:
553                                 processEntry((RejectedMessage)entry, indexer);
554                                 break;
555
556                         case Entry.TypeTableStatus:
557                                 processEntry((TableStatus)entry);
558                                 break;
559
560                         default:
561                                 throw new Error("Unrecognized type: " + entry.getType());
562                         }
563                 }
564         }
565
566         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
567                 for (int i = 0; i < newslots.length; i++) {
568                         Slot currslot = newslots[i];
569                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
570                         if (prevslot != null &&
571                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
572                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
573                 }
574         }
575 }