Fixing Bugs
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16
17
18 /**
19  * IoTTable data structure.  Provides client inferface.
20  * @author Brian Demsky
21  * @version 1.0
22  */
23
24 final public class Table {
25         private int numslots;   //number of slots stored in buffer
26
27         //table of key-value pairs
28         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
29
30         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
32         // machine id -> ...
33         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34         private Vector<Long> rejectedmessagelist = new Vector<Long>();
35         private SlotBuffer buffer;
36         private CloudComm cloud;
37         private long sequencenumber; //Largest sequence number a client has received
38         private long localmachineid;
39         private TableStatus lastTableStatus;
40         static final int FREE_SLOTS = 10; //number of slots that should be kept free
41         static final int SKIP_THRESHOLD = 10;
42         public long liveslotcount = 0;  // TODO:  MAKE PRIVATE
43         private int chance;
44         static final double RESIZE_MULTIPLE = 1.2;
45         static final double RESIZE_THRESHOLD = 0.75;
46         static final int REJECTED_THRESHOLD = 5;
47         public int resizethreshold;     // TODO:  MAKE PRIVATE
48         private long lastliveslotseqn;  //smallest sequence number with a live entry
49         private Random random = new Random();
50         private long lastUncommittedTransaction = 0;
51
52         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54         private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
55         private Map<Long, Abort> abortMap = null; // Set of the live aborts
56         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV       TODO: Make Private
57         public  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV       TODO: Make Private
58         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
59         public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
60         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
61         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
62         // private Set<Abort> arbitratorTable = null; // Table of arbitrators
63         private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
64         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
65         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
66
67
68
69         public Table(String baseurl, String password, long _localmachineid) {
70                 localmachineid = _localmachineid;
71                 buffer = new SlotBuffer();
72                 numslots = buffer.capacity();
73                 setResizeThreshold();
74                 sequencenumber = 0;
75                 cloud = new CloudComm(this, baseurl, password);
76                 lastliveslotseqn = 1;
77
78                 setupDataStructs();
79         }
80
81         public Table(CloudComm _cloud, long _localmachineid) {
82                 localmachineid = _localmachineid;
83                 buffer = new SlotBuffer();
84                 numslots = buffer.capacity();
85                 setResizeThreshold();
86                 sequencenumber = 0;
87                 cloud = _cloud;
88
89                 setupDataStructs();
90         }
91
92         private void setupDataStructs() {
93                 pendingTransQueue = new LinkedList<PendingTransaction>();
94                 commitMap = new HashMap<Long, Commit>();
95                 abortMap = new HashMap<Long, Abort>();
96                 committedMapByKey = new HashMap<IoTString, Commit>();
97                 commitedTable = new HashMap<IoTString, KeyValue>();
98                 speculativeTable = new HashMap<IoTString, KeyValue>();
99                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
100                 arbitratorTable = new HashMap<IoTString, Long>();
101                 newKeyTable = new HashMap<IoTString, NewKey>();
102                 newCommitMap = new HashMap<Long, Commit>();
103                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
104                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
105         }
106
107         public void rebuild() {
108                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
109                 validateandupdate(newslots, true);
110         }
111
112         // TODO: delete method
113         public void printSlots() {
114                 long o = buffer.getOldestSeqNum();
115                 long n = buffer.getNewestSeqNum();
116
117                 int[] types = new int[10];
118
119                 int num = 0;
120
121                 int livec = 0;
122                 int deadc = 0;
123                 for (long i = o; i < (n + 1); i++) {
124                         Slot s = buffer.getSlot(i);
125
126                         Vector<Entry> entries = s.getEntries();
127
128                         for (Entry e : entries) {
129                                 if (e.isLive()) {
130                                         int type = e.getType();
131                                         types[type] = types[type] + 1;
132                                         num++;
133                                         livec++;
134                                 } else {
135                                         deadc++;
136                                 }
137                         }
138                 }
139
140                 for (int i = 0; i < 10; i++) {
141                         System.out.println(i + "    " + types[i]);
142                 }
143                 System.out.println("Live count:   " + livec);
144                 System.out.println("Dead count:   " + deadc);
145                 System.out.println("Old:   " + o);
146                 System.out.println("New:   " + n);
147                 System.out.println("Size:   " + buffer.size());
148                 System.out.println("Commits Map:   " + commitedTable.size());
149                 System.out.println("Commits List:   " + commitMap.size());
150         }
151
152         public IoTString getCommitted(IoTString key) {
153                 KeyValue kv = commitedTable.get(key);
154                 if (kv != null) {
155                         return kv.getValue();
156                 } else {
157                         return null;
158                 }
159         }
160
161         public IoTString getSpeculative(IoTString key) {
162                 KeyValue kv = speculativeTable.get(key);
163                 if (kv != null) {
164                         return kv.getValue();
165                 } else {
166                         return null;
167                 }
168         }
169
170         public void initTable() {
171                 cloud.setSalt();//Set the salt
172                 Slot s = new Slot(this, 1, localmachineid);
173                 TableStatus status = new TableStatus(s, numslots);
174                 s.addEntry(status);
175                 Slot[] array = cloud.putSlot(s, numslots);
176                 if (array == null) {
177                         array = new Slot[] {s};
178                         /* update data structure */
179                         validateandupdate(array, true);
180                 } else {
181                         throw new Error("Error on initialization");
182                 }
183         }
184
185         public String toString() {
186                 String retString = " Committed Table: \n";
187                 retString += "---------------------------\n";
188                 retString += commitedTable.toString();
189
190                 retString += "\n\n";
191
192                 retString += " Speculative Table: \n";
193                 retString += "---------------------------\n";
194                 retString += speculativeTable.toString();
195
196                 return retString;
197         }
198
199         public void startTransaction() {
200                 // Create a new transaction, invalidates any old pending transactions.
201                 pendingTransBuild = new PendingTransaction();
202         }
203
204         public void commitTransaction() {
205
206                 if (pendingTransBuild.getKVUpdates().size() == 0) {
207                         // If no updates are made then there is no point inserting into the chain
208                         return;
209                 }
210
211                 // Add the pending transaction to the queue
212                 pendingTransQueue.add(pendingTransBuild);
213
214                 while (!pendingTransQueue.isEmpty()) {
215                         if (tryput( pendingTransQueue.peek(), false)) {
216                                 pendingTransQueue.poll();
217                         }
218                 }
219         }
220
221         public void addKV(IoTString key, IoTString value) {
222
223                 if (arbitratorTable.get(key) == null) {
224                         throw new Error("Key not Found.");
225                 }
226
227                 // Make sure new key value pair matches the current arbitrator
228                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
229                         // TODO: Maybe not throw en error
230                         throw new Error("Not all Key Values Match.");
231                 }
232
233                 KeyValue kv = new KeyValue(key, value);
234                 pendingTransBuild.addKV(kv);
235         }
236
237         public void addGuard(Guard guard) {
238                 pendingTransBuild.addGuard(guard);
239         }
240
241         public void update() {
242
243                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
244
245                 validateandupdate(newslots, false);
246
247                 if (uncommittedTransactionsMap.keySet().size() > 0) {
248
249                         boolean doEnd = false;
250                         boolean needResize = false;
251                         while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
252                                 boolean resize = needResize;
253                                 needResize = false;
254
255                                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
256                                 int newsize = 0;
257                                 if (liveslotcount > resizethreshold) {
258                                         resize = true; //Resize is forced
259                                 }
260
261                                 if (resize) {
262                                         newsize = (int) (numslots * RESIZE_MULTIPLE);
263                                         TableStatus status = new TableStatus(s, newsize);
264                                         s.addEntry(status);
265                                 }
266
267                                 doRejectedMessages(s);
268
269                                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
270
271                                 // Resize was needed so redo call
272                                 if (retTup.getFirst()) {
273                                         needResize = true;
274                                         continue;
275                                 }
276
277                                 // Extract working variables
278                                 boolean seenliveslot = retTup.getSecond();
279                                 long seqn = retTup.getThird();
280
281                                 // Did need to arbitrate
282                                 doEnd = !doArbitration(s);
283
284                                 doOptionalRescue(s, seenliveslot, seqn, resize);
285
286                                 int max = 0;
287                                 if (resize) {
288                                         max = newsize;
289                                 }
290
291                                 Slot[] array = cloud.putSlot(s, max);
292                                 if (array == null) {
293                                         array = new Slot[] {s};
294                                         rejectedmessagelist.clear();
295                                 }       else {
296                                         if (array.length == 0)
297                                                 throw new Error("Server Error: Did not send any slots");
298                                         rejectedmessagelist.add(s.getSequenceNumber());
299                                         doEnd = false;
300                                 }
301
302                                 /* update data structure */
303                                 validateandupdate(array, true);
304                         }
305                 }
306         }
307
308         public boolean createNewKey(IoTString keyName, long machineId) {
309
310                 while (true) {
311                         if (arbitratorTable.get(keyName) != null) {
312                                 // There is already an arbitrator
313                                 return false;
314                         }
315
316                         if (tryput(keyName, machineId, false)) {
317
318                                 // If successfully inserted
319                                 return true;
320                         }
321                 }
322         }
323
324         public void decrementLiveCount() {
325                 liveslotcount--;
326                 // System.out.println("Decrement Live Count");
327         }
328
329         private void setResizeThreshold() {
330                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
331                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
332         }
333
334         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
335                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
336
337                 int newsize = 0;
338                 if (liveslotcount > resizethreshold) {
339                         resize = true; //Resize is forced
340                 }
341
342                 if (resize) {
343                         newsize = (int) (numslots * RESIZE_MULTIPLE);
344                         TableStatus status = new TableStatus(s, newsize);
345                         s.addEntry(status);
346                 }
347
348                 doRejectedMessages(s);
349
350                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
351
352                 // Resize was needed so redo call
353                 if (retTup.getFirst()) {
354                         return tryput(pendingTrans, true);
355                 }
356
357                 // Extract working variables
358                 boolean seenliveslot = retTup.getSecond();
359                 long seqn = retTup.getThird();
360
361
362                 doArbitration(s);
363
364                 Transaction trans = new Transaction(s,
365                                                     s.getSequenceNumber(),
366                                                     localmachineid,
367                                                     pendingTrans.getArbitrator(),
368                                                     pendingTrans.getKVUpdates(),
369                                                     pendingTrans.getGuard());
370                 boolean insertedTrans = false;
371                 if (s.hasSpace(trans)) {
372                         s.addEntry(trans);
373                         insertedTrans = true;
374                 }
375
376                 doOptionalRescue(s, seenliveslot, seqn, resize);
377                 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
378         }
379
380         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
381                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
382                 int newsize = 0;
383                 if (liveslotcount > resizethreshold) {
384                         resize = true; //Resize is forced
385                 }
386
387                 if (resize) {
388                         newsize = (int) (numslots * RESIZE_MULTIPLE);
389                         TableStatus status = new TableStatus(s, newsize);
390                         s.addEntry(status);
391                 }
392
393                 doRejectedMessages(s);
394                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
395
396                 // Resize was needed so redo call
397                 if (retTup.getFirst()) {
398                         return tryput(keyName, arbMachineid, true);
399                 }
400
401                 // Extract working variables
402                 boolean seenliveslot = retTup.getSecond();
403                 long seqn = retTup.getThird();
404
405
406                 doArbitration(s);
407
408                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
409
410                 boolean insertedNewKey = false;
411                 if (s.hasSpace(newKey)) {
412                         s.addEntry(newKey);
413                         insertedNewKey = true;
414                 }
415
416                 doOptionalRescue(s, seenliveslot, seqn, resize);
417                 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
418         }
419
420         private void doRejectedMessages(Slot s) {
421                 if (! rejectedmessagelist.isEmpty()) {
422                         /* TODO: We should avoid generating a rejected message entry if
423                          * there is already a sufficient entry in the queue (e.g.,
424                          * equalsto value of true and same sequence number).  */
425
426                         long old_seqn = rejectedmessagelist.firstElement();
427                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
428                                 long new_seqn = rejectedmessagelist.lastElement();
429                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
430                                 s.addEntry(rm);
431                         } else {
432                                 long prev_seqn = -1;
433                                 int i = 0;
434                                 /* Go through list of missing messages */
435                                 for (; i < rejectedmessagelist.size(); i++) {
436                                         long curr_seqn = rejectedmessagelist.get(i);
437                                         Slot s_msg = buffer.getSlot(curr_seqn);
438                                         if (s_msg != null)
439                                                 break;
440                                         prev_seqn = curr_seqn;
441                                 }
442                                 /* Generate rejected message entry for missing messages */
443                                 if (prev_seqn != -1) {
444                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
445                                         s.addEntry(rm);
446                                 }
447                                 /* Generate rejected message entries for present messages */
448                                 for (; i < rejectedmessagelist.size(); i++) {
449                                         long curr_seqn = rejectedmessagelist.get(i);
450                                         Slot s_msg = buffer.getSlot(curr_seqn);
451                                         long machineid = s_msg.getMachineID();
452                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
453                                         s.addEntry(rm);
454                                 }
455                         }
456                 }
457         }
458
459         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
460                 long newestseqnum = buffer.getNewestSeqNum();
461                 long oldestseqnum = buffer.getOldestSeqNum();
462                 if (lastliveslotseqn < oldestseqnum)
463                         lastliveslotseqn = oldestseqnum;
464
465                 long seqn = lastliveslotseqn;
466                 boolean seenliveslot = false;
467                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
468                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
469
470
471                 // Mandatory Rescue
472                 for (; seqn < threshold; seqn++) {
473                         Slot prevslot = buffer.getSlot(seqn);
474                         // Push slot number forward
475                         if (! seenliveslot)
476                                 lastliveslotseqn = seqn;
477
478                         if (! prevslot.isLive())
479                                 continue;
480                         seenliveslot = true;
481                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
482                         for (Entry liveentry : liveentries) {
483                                 if (s.hasSpace(liveentry)) {
484                                         s.addEntry(liveentry);
485                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
486                                         if (!resize) {
487                                                 System.out.println("B"); //?
488
489                                                 // TODO delete
490                                                 System.out.println("==============================NEEEEDDDD RESIZING");
491                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
492                                         }
493                                 }
494                         }
495                 }
496
497                 // Did not resize
498                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
499         }
500
501         private boolean doArbitration(Slot s) {
502                 // Arbitrate
503                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
504
505                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
506
507                 // Sort from oldest to newest
508                 Collections.sort(transSeqNums);
509
510
511                 boolean didNeedArbitration = false;
512                 for (Long transNum : transSeqNums) {
513                         Transaction ut = uncommittedTransactionsMap.get(transNum);
514
515                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
516                         // Check if this machine arbitrates for this transaction
517                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
518
519                                 // TODO delete
520                                 // if (localmachineid == 351) {
521                                 //      System.out.println("Mis match Machine: " + localmachineid + "   Key: " + keyVal.getKey().toString());
522                                 // }
523                                 continue;
524                         }
525                         // else {
526                         //      // TODO delete
527                         //      if (localmachineid == 351) {
528                         //              System.out.println("Full Match Machine: " + localmachineid + "   Key: " + keyVal.getKey().toString());
529                         //      }
530                         // }
531
532                         // we did have something to arbitrate on
533                         didNeedArbitration = true;
534
535                         Entry newEntry = null;
536
537                         try {
538                                 if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
539                                         // Guard evaluated as true
540
541                                         // update the local tmp current key set
542                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
543                                                 speculativeTableTmp.put(kv.getKey(), kv);
544                                         }
545
546                                         // create the commit
547                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
548                                 } else {
549                                         // Guard was false
550
551                                         // create the abort
552                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
553                                 }
554                         } catch (Exception e) {
555                                 e.printStackTrace();
556                         }
557
558                         if ((newEntry != null) && s.hasSpace(newEntry)) {
559                                 s.addEntry(newEntry);
560                         } else {
561                                 break;
562                         }
563                 }
564
565                 return didNeedArbitration;
566         }
567
568         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
569                 /* now go through live entries from least to greatest sequence number until
570                  * either all live slots added, or the slot doesn't have enough room
571                  * for SKIP_THRESHOLD consecutive entries*/
572                 int skipcount = 0;
573                 long newestseqnum = buffer.getNewestSeqNum();
574                 search:
575                 for (; seqn <= newestseqnum; seqn++) {
576                         Slot prevslot = buffer.getSlot(seqn);
577                         //Push slot number forward
578                         if (!seenliveslot)
579                                 lastliveslotseqn = seqn;
580
581                         if (!prevslot.isLive())
582                                 continue;
583                         seenliveslot = true;
584                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
585                         for (Entry liveentry : liveentries) {
586                                 if (s.hasSpace(liveentry))
587                                         s.addEntry(liveentry);
588                                 else {
589                                         skipcount++;
590                                         if (skipcount > SKIP_THRESHOLD)
591                                                 break search;
592                                 }
593                         }
594                 }
595         }
596
597         private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
598                 int max = 0;
599                 if (resize)
600                         max = newsize;
601                 Slot[] array = cloud.putSlot(s, max);
602                 if (array == null) {
603                         array = new Slot[] {s};
604                         rejectedmessagelist.clear();
605                 }       else {
606                         if (array.length == 0)
607                                 throw new Error("Server Error: Did not send any slots");
608                         rejectedmessagelist.add(s.getSequenceNumber());
609                         inserted = false;
610                 }
611
612
613                 // TODO remove Timers
614                 // long startTime = System.currentTimeMillis();
615                 /* update data structure */
616                 validateandupdate(array, true);
617                 // long endTime = System.currentTimeMillis();
618
619                 // long diff = endTime - startTime;
620                 // if (diff >= 1) {
621                 //      System.out.println("Time Taken: " + diff);
622                 // }
623
624                 return inserted;
625         }
626
627         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
628                 /* The cloud communication layer has checked slot HMACs already
629                          before decoding */
630                 if (newslots.length == 0) return;
631
632                 long firstseqnum = newslots[0].getSequenceNumber();
633                 if (firstseqnum <= sequencenumber) {
634                         throw new Error("Server Error: Sent older slots!");
635                 }
636
637                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
638                 checkHMACChain(indexer, newslots);
639
640                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
641
642                 initExpectedSize(firstseqnum);
643                 for (Slot slot : newslots) {
644                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
645                         updateExpectedSize();
646                 }
647
648
649                 boolean hasGap = false;
650                 /* If there is a gap, check to see if the server sent us everything. */
651                 if (firstseqnum != (sequencenumber + 1)) {
652
653                         // TODO: Check size
654                         checkNumSlots(newslots.length);
655                         if (!machineSet.isEmpty()) {
656                                 throw new Error("Missing record for machines: " + machineSet);
657                         }
658                 }
659
660                 commitNewMaxSize();
661
662                 /* Commit new to slots. */
663                 for (Slot slot : newslots) {
664                         buffer.putSlot(slot);
665                         liveslotcount++;
666                 }
667                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
668
669                 // Process all on key value pairs
670                 proccessAllNewCommits();
671
672                 // Go through all uncommitted transactions and kill the ones that are dead
673                 deleteDeadUncommittedTransactions();
674
675                 // Speculate on key value pairs
676                 createSpeculativeTable();
677         }
678
679         public void proccessAllNewCommits() {
680
681                 // Process only if there are commit
682                 if (newCommitMap.keySet().size() == 0) {
683                         return;
684                 }
685
686                 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
687
688                 // Sort from oldest to newest commit
689                 Collections.sort(commitSeqNums);
690
691                 // Go through each new commit one by one
692                 for (Long entrySeqNum : commitSeqNums) {
693                         Commit entry = newCommitMap.get(entrySeqNum);
694
695                         long lastCommitSeenSeqNum = 0;
696
697                         if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
698                                 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
699                         }
700
701                         if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
702
703                                 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
704
705                                 if (prevCommit != null) {
706                                         prevCommit.setDead();
707
708                                         for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
709                                                 committedMapByKey.put(kv.getKey(), entry);
710                                         }
711                                 }
712
713                                 continue;
714                         }
715
716                         Set<Commit> commitsToEditSet = new HashSet<Commit>();
717
718                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
719                                 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
720                         }
721
722                         commitsToEditSet.remove(null);
723
724                         for (Commit prevCommit : commitsToEditSet) {
725
726                                 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
727
728                                 if (!prevCommit.isLive()) {
729                                         commitMap.remove(prevCommit.getTransSequenceNumber());
730                                 }
731                         }
732
733                         // // Remove any old commits
734                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
735                         //      Commit prevCommit = i.next().getValue();
736                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
737
738                         //      if (!prevCommit.isLive()) {
739                         //              i.remove();
740                         //      }
741                         // }
742
743                         // Remove any old commits
744                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
745                         //      Commit prevCommit = i.next().getValue();
746
747                         //      if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
748                         //              continue;
749                         //      }
750
751                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
752
753                         //      if (!prevCommit.isLive()) {
754                         //              i.remove();
755                         //      }
756                         // }
757
758
759                         // Add the new commit
760                         commitMap.put(entry.getTransSequenceNumber(), entry);
761                         lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
762
763                         // Update the committed table list
764                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
765                                 IoTString key = kv.getKey();
766                                 commitedTable.put(key, kv);
767
768                                 committedMapByKey.put(key, entry);
769                         }
770                 }
771
772                 // Clear the new commits storage so we can use it later
773                 newCommitMap.clear();
774         }
775
776         private void deleteDeadUncommittedTransactions() {
777                 // Make dead the transactions
778                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
779                         Transaction prevtrans = i.next().getValue();
780                         long transArb = prevtrans.getArbitrator();
781
782                         if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
783                                 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
784                                 i.remove();
785                                 prevtrans.setDead();
786                         }
787                 }
788         }
789
790         private void createSpeculativeTable() {
791
792                 if (uncommittedTransactionsMap.keySet().size() == 0) {
793                         speculativeTable = commitedTable; // Ok that they are the same object
794                         return;
795                 }
796
797                 Map speculativeTableTmp = null;
798                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
799
800                 // Sort from oldest to newest commit
801                 Collections.sort(utSeqNums);
802
803                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
804                         speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
805
806                         for (Long key : utSeqNums) {
807                                 Transaction trans = uncommittedTransactionsMap.get(key);
808
809                                 lastUncommittedTransaction = key;
810
811                                 try {
812                                         if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
813                                                 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
814                                                         speculativeTableTmp.put(kv.getKey(), kv);
815                                                 }
816                                         }
817
818                                 } catch (Exception e) {
819                                         e.printStackTrace();
820                                 }
821                         }
822                 } else {
823                         speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
824
825                         for (Long key : utSeqNums) {
826
827                                 if (key <= lastUncommittedTransaction) {
828                                         continue;
829                                 }
830
831                                 lastUncommittedTransaction = key;
832
833                                 Transaction trans = uncommittedTransactionsMap.get(key);
834
835                                 try {
836                                         if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
837                                                 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
838                                                         speculativeTableTmp.put(kv.getKey(), kv);
839                                                 }
840                                         }
841
842                                 } catch (Exception e) {
843                                         e.printStackTrace();
844                                 }
845                         }
846                 }
847
848                 speculativeTable = speculativeTableTmp;
849         }
850
851         private int expectedsize, currmaxsize;
852
853         private void checkNumSlots(int numslots) {
854                 if (numslots != expectedsize) {
855                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
856                 }
857         }
858
859         private void initExpectedSize(long firstsequencenumber) {
860                 long prevslots = firstsequencenumber;
861                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
862                 currmaxsize = numslots;
863         }
864
865         private void updateExpectedSize() {
866                 expectedsize++;
867                 if (expectedsize > currmaxsize) {
868                         expectedsize = currmaxsize;
869                 }
870         }
871
872         private void updateCurrMaxSize(int newmaxsize) {
873                 currmaxsize = newmaxsize;
874         }
875
876         private void commitNewMaxSize() {
877                 if (numslots != currmaxsize) {
878                         buffer.resize(currmaxsize);
879                 }
880
881                 numslots = currmaxsize;
882                 setResizeThreshold();
883         }
884
885         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
886                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
887         }
888
889         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
890                 long oldseqnum = entry.getOldSeqNum();
891                 long newseqnum = entry.getNewSeqNum();
892                 boolean isequal = entry.getEqual();
893                 long machineid = entry.getMachineID();
894                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
895                         Slot slot = indexer.getSlot(seqnum);
896                         if (slot != null) {
897                                 long slotmachineid = slot.getMachineID();
898                                 if (isequal != (slotmachineid == machineid)) {
899                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
900                                 }
901                         }
902                 }
903
904                 HashSet<Long> watchset = new HashSet<Long>();
905                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
906                         long entry_mid = lastmsg_entry.getKey();
907                         /* We've seen it, don't need to continue to watch.  Our next
908                          * message will implicitly acknowledge it. */
909                         if (entry_mid == localmachineid)
910                                 continue;
911                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
912                         long entry_seqn = v.getFirst();
913                         if (entry_seqn < newseqnum) {
914                                 addWatchList(entry_mid, entry);
915                                 watchset.add(entry_mid);
916                         }
917                 }
918                 if (watchset.isEmpty())
919                         entry.setDead();
920                 else
921                         entry.setWatchSet(watchset);
922         }
923
924         private void processEntry(NewKey entry) {
925                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
926
927                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
928
929                 if (oldNewKey != null) {
930                         oldNewKey.setDead();
931                 }
932         }
933
934         private void processEntry(Transaction entry) {
935                 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
936
937                 // Duplicate so delete old copy
938                 if (prevTrans != null) {
939                         prevTrans.setDead();
940                 }
941         }
942
943         private void processEntry(Abort entry) {
944
945                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
946                         // Abort has not been seen yet so we need to keep track of it
947
948                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
949                         if (prevAbort != null) {
950                                 prevAbort.setDead(); // delete old version of the duplicate
951                         }
952                 } else {
953                         // The machine already saw this so it is dead
954                         entry.setDead();
955                 }
956
957                 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
958         }
959
960         private void processEntry(Commit entry, Slot s) {
961                 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
962                 if (prevCommit != null) {
963                         prevCommit.setDead();
964                 }
965         }
966
967         private void processEntry(TableStatus entry) {
968                 int newnumslots = entry.getMaxSlots();
969                 updateCurrMaxSize(newnumslots);
970                 if (lastTableStatus != null)
971                         lastTableStatus.setDead();
972                 lastTableStatus = entry;
973         }
974
975         private void addWatchList(long machineid, RejectedMessage entry) {
976                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
977                 if (entries == null)
978                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
979                 entries.add(entry);
980         }
981
982         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
983                 machineSet.remove(machineid);
984
985                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
986                 if (watchset != null) {
987                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
988                                 RejectedMessage rm = rmit.next();
989                                 if (rm.getNewSeqNum() <= seqnum) {
990                                         /* Remove it from our watchlist */
991                                         rmit.remove();
992                                         /* Decrement machines that need to see this notification */
993                                         rm.removeWatcher(machineid);
994                                 }
995                         }
996                 }
997
998                 if (machineid == localmachineid) {
999                         /* Our own messages are immediately dead. */
1000                         if (liveness instanceof LastMessage) {
1001                                 ((LastMessage)liveness).setDead();
1002                         } else if (liveness instanceof Slot) {
1003                                 ((Slot)liveness).setDead();
1004                         } else {
1005                                 throw new Error("Unrecognized type");
1006                         }
1007                 }
1008
1009                 // Set dead the abort
1010                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1011                         Abort abort = i.next().getValue();
1012
1013                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1014                                 abort.setDead();
1015                                 i.remove();
1016                         }
1017                 }
1018
1019                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1020                 if (lastmsgentry == null)
1021                         return;
1022
1023                 long lastmsgseqnum = lastmsgentry.getFirst();
1024                 Liveness lastentry = lastmsgentry.getSecond();
1025                 if (machineid != localmachineid) {
1026                         if (lastentry instanceof LastMessage) {
1027                                 ((LastMessage)lastentry).setDead();
1028                         } else if (lastentry instanceof Slot) {
1029                                 ((Slot)lastentry).setDead();
1030                         } else {
1031                                 throw new Error("Unrecognized type");
1032                         }
1033                 }
1034
1035                 if (machineid == localmachineid) {
1036                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1037                                 throw new Error("Server Error: Mismatch on local machine sequence number");
1038                 } else {
1039                         if (lastmsgseqnum > seqnum)
1040                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1041                 }
1042         }
1043
1044         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1045                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1046                 for (Entry entry : slot.getEntries()) {
1047                         switch (entry.getType()) {
1048
1049                         case Entry.TypeNewKey:
1050                                 processEntry((NewKey)entry);
1051                                 break;
1052
1053                         case Entry.TypeCommit:
1054                                 processEntry((Commit)entry, slot);
1055                                 break;
1056
1057                         case Entry.TypeAbort:
1058                                 processEntry((Abort)entry);
1059                                 break;
1060
1061                         case Entry.TypeTransaction:
1062                                 processEntry((Transaction)entry);
1063                                 break;
1064
1065                         case Entry.TypeLastMessage:
1066                                 processEntry((LastMessage)entry, machineSet);
1067                                 break;
1068
1069                         case Entry.TypeRejectedMessage:
1070                                 processEntry((RejectedMessage)entry, indexer);
1071                                 break;
1072
1073                         case Entry.TypeTableStatus:
1074                                 processEntry((TableStatus)entry);
1075                                 break;
1076
1077                         default:
1078                                 throw new Error("Unrecognized type: " + entry.getType());
1079                         }
1080                 }
1081         }
1082
1083         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1084                 for (int i = 0; i < newslots.length; i++) {
1085                         Slot currslot = newslots[i];
1086                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1087                         if (prevslot != null &&
1088                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1089                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1090                 }
1091         }
1092 }