Squashed Last bugs
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16
17
18 /**
19  * IoTTable data structure.  Provides client inferface.
20  * @author Brian Demsky
21  * @version 1.0
22  */
23
24 final public class Table {
25         private int numslots;   //number of slots stored in buffer
26
27         //table of key-value pairs
28         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
29
30         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
32         // machine id -> ...
33         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34         private Vector<Long> rejectedmessagelist = new Vector<Long>();
35         private SlotBuffer buffer;
36         private CloudComm cloud;
37         private long sequencenumber; //Largest sequence number a client has received
38         private long localmachineid;
39         private TableStatus lastTableStatus;
40         static final int FREE_SLOTS = 10; //number of slots that should be kept free
41         static final int SKIP_THRESHOLD = 10;
42         private long liveslotcount = 0; 
43         private int chance;
44         static final double RESIZE_MULTIPLE = 1.2;
45         static final double RESIZE_THRESHOLD = 0.75;
46         static final int REJECTED_THRESHOLD = 5;
47         private int resizethreshold;
48         private long lastliveslotseqn;  //smallest sequence number with a live entry
49         private Random random = new Random();
50         private long lastUncommittedTransaction = 0;
51
52         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54         private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
55         private Map<Long, Abort> abortMap = null; // Set of the live aborts
56         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV   
57         private  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV    
58         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
59         private Map<Long, Transaction> uncommittedTransactionsMap = null;
60         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
61         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
62         private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
63         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
64         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
65
66
67
68         public Table(String baseurl, String password, long _localmachineid) {
69                 localmachineid = _localmachineid;
70                 buffer = new SlotBuffer();
71                 numslots = buffer.capacity();
72                 setResizeThreshold();
73                 sequencenumber = 0;
74                 cloud = new CloudComm(this, baseurl, password);
75                 lastliveslotseqn = 1;
76
77                 setupDataStructs();
78         }
79
80         public Table(CloudComm _cloud, long _localmachineid) {
81                 localmachineid = _localmachineid;
82                 buffer = new SlotBuffer();
83                 numslots = buffer.capacity();
84                 setResizeThreshold();
85                 sequencenumber = 0;
86                 cloud = _cloud;
87
88                 setupDataStructs();
89         }
90
91         private void setupDataStructs() {
92                 pendingTransQueue = new LinkedList<PendingTransaction>();
93                 commitMap = new HashMap<Long, Commit>();
94                 abortMap = new HashMap<Long, Abort>();
95                 committedMapByKey = new HashMap<IoTString, Commit>();
96                 commitedTable = new HashMap<IoTString, KeyValue>();
97                 speculativeTable = new HashMap<IoTString, KeyValue>();
98                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
99                 arbitratorTable = new HashMap<IoTString, Long>();
100                 newKeyTable = new HashMap<IoTString, NewKey>();
101                 newCommitMap = new HashMap<Long, Commit>();
102                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
103                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
104         }
105
106         public void rebuild() {
107                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
108                 validateandupdate(newslots, true);
109         }
110
111         // TODO: delete method
112         // public void printSlots() {
113         //      long o = buffer.getOldestSeqNum();
114         //      long n = buffer.getNewestSeqNum();
115
116         //      int[] types = new int[10];
117
118         //      int num = 0;
119
120         //      int livec = 0;
121         //      int deadc = 0;
122         //      for (long i = o; i < (n + 1); i++) {
123         //              Slot s = buffer.getSlot(i);
124
125         //              Vector<Entry> entries = s.getEntries();
126
127         //              for (Entry e : entries) {
128         //                      if (e.isLive()) {
129         //                              int type = e.getType();
130         //                              types[type] = types[type] + 1;
131         //                              num++;
132         //                              livec++;
133         //                      } else {
134         //                              deadc++;
135         //                      }
136         //              }
137         //      }
138
139         //      for (int i = 0; i < 10; i++) {
140         //              System.out.println(i + "    " + types[i]);
141         //      }
142         //      System.out.println("Live count:   " + livec);
143         //      System.out.println("Dead count:   " + deadc);
144         //      System.out.println("Old:   " + o);
145         //      System.out.println("New:   " + n);
146         //      System.out.println("Size:   " + buffer.size());
147         //      System.out.println("Commits Map:   " + commitedTable.size());
148         //      System.out.println("Commits List:   " + commitMap.size());
149         // }
150
151         public IoTString getCommitted(IoTString key) {
152                 KeyValue kv = commitedTable.get(key);
153                 if (kv != null) {
154                         return kv.getValue();
155                 } else {
156                         return null;
157                 }
158         }
159
160         public IoTString getSpeculative(IoTString key) {
161                 KeyValue kv = speculativeTable.get(key);
162                 if (kv != null) {
163                         return kv.getValue();
164                 } else {
165                         return null;
166                 }
167         }
168
169         public Long getArbitrator(IoTString key) {
170                 return arbitratorTable.get(key);
171         }
172
173         public void initTable() {
174                 cloud.setSalt();//Set the salt
175                 Slot s = new Slot(this, 1, localmachineid);
176                 TableStatus status = new TableStatus(s, numslots);
177                 s.addEntry(status);
178                 Slot[] array = cloud.putSlot(s, numslots);
179                 if (array == null) {
180                         array = new Slot[] {s};
181                         /* update data structure */
182                         validateandupdate(array, true);
183                 } else {
184                         throw new Error("Error on initialization");
185                 }
186         }
187
188         public String toString() {
189                 String retString = " Committed Table: \n";
190                 retString += "---------------------------\n";
191                 retString += commitedTable.toString();
192
193                 retString += "\n\n";
194
195                 retString += " Speculative Table: \n";
196                 retString += "---------------------------\n";
197                 retString += speculativeTable.toString();
198
199                 return retString;
200         }
201
202         public void startTransaction() {
203                 // Create a new transaction, invalidates any old pending transactions.
204                 pendingTransBuild = new PendingTransaction();
205         }
206
207         public void commitTransaction() {
208
209                 if (pendingTransBuild.getKVUpdates().size() == 0) {
210                         // If no updates are made then there is no point inserting into the chain
211                         return;
212                 }
213
214                 // Add the pending transaction to the queue
215                 pendingTransQueue.add(pendingTransBuild);
216
217                 while (!pendingTransQueue.isEmpty()) {
218                         if (tryput( pendingTransQueue.peek(), false)) {
219                                 pendingTransQueue.poll();
220                         }
221                 }
222         }
223
224         public void addKV(IoTString key, IoTString value) {
225
226                 if (arbitratorTable.get(key) == null) {
227                         throw new Error("Key not Found.");
228                 }
229
230                 // Make sure new key value pair matches the current arbitrator
231                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
232                         // TODO: Maybe not throw en error
233                         throw new Error("Not all Key Values Match.");
234                 }
235
236                 KeyValue kv = new KeyValue(key, value);
237                 pendingTransBuild.addKV(kv);
238         }
239
240         public void addGuard(Guard guard) {
241                 pendingTransBuild.addGuard(guard);
242         }
243
244         public void update() {
245
246                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
247
248                 validateandupdate(newslots, false);
249
250                 if (uncommittedTransactionsMap.keySet().size() > 0) {
251
252                         boolean doEnd = false;
253                         boolean needResize = false;
254                         while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
255                                 boolean resize = needResize;
256                                 needResize = false;
257
258                                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
259                                 int newsize = 0;
260                                 if (liveslotcount > resizethreshold) {
261                                         resize = true; //Resize is forced
262                                 }
263
264                                 if (resize) {
265                                         newsize = (int) (numslots * RESIZE_MULTIPLE);
266                                         TableStatus status = new TableStatus(s, newsize);
267                                         s.addEntry(status);
268                                 }
269
270                                 doRejectedMessages(s);
271
272                                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
273
274                                 // Resize was needed so redo call
275                                 if (retTup.getFirst()) {
276                                         needResize = true;
277                                         continue;
278                                 }
279
280                                 // Extract working variables
281                                 boolean seenliveslot = retTup.getSecond();
282                                 long seqn = retTup.getThird();
283
284                                 // Did need to arbitrate
285                                 doEnd = !doArbitration(s);
286
287                                 doOptionalRescue(s, seenliveslot, seqn, resize);
288
289                                 int max = 0;
290                                 if (resize) {
291                                         max = newsize;
292                                 }
293
294                                 Slot[] array = cloud.putSlot(s, max);
295                                 if (array == null) {
296                                         array = new Slot[] {s};
297                                         rejectedmessagelist.clear();
298                                 }       else {
299                                         if (array.length == 0)
300                                                 throw new Error("Server Error: Did not send any slots");
301                                         rejectedmessagelist.add(s.getSequenceNumber());
302                                         doEnd = false;
303                                 }
304
305                                 /* update data structure */
306                                 validateandupdate(array, true);
307                         }
308                 }
309         }
310
311         public boolean createNewKey(IoTString keyName, long machineId) {
312
313                 while (true) {
314                         if (arbitratorTable.get(keyName) != null) {
315                                 // There is already an arbitrator
316                                 return false;
317                         }
318
319                         if (tryput(keyName, machineId, false)) {
320                                 // If successfully inserted
321                                 return true;
322                         }
323                 }
324         }
325
326         public void decrementLiveCount() {
327                 liveslotcount--;
328         }
329
330         private void setResizeThreshold() {
331                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
332                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
333         }
334
335         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
336                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
337
338                 int newsize = 0;
339                 if (liveslotcount > resizethreshold) {
340                         resize = true; //Resize is forced
341                 }
342
343                 if (resize) {
344                         newsize = (int) (numslots * RESIZE_MULTIPLE);
345                         TableStatus status = new TableStatus(s, newsize);
346                         s.addEntry(status);
347                 }
348
349                 doRejectedMessages(s);
350
351                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
352
353                 // Resize was needed so redo call
354                 if (retTup.getFirst()) {
355                         return tryput(pendingTrans, true);
356                 }
357
358                 // Extract working variables
359                 boolean seenliveslot = retTup.getSecond();
360                 long seqn = retTup.getThird();
361
362
363                 doArbitration(s);
364
365                 Transaction trans = new Transaction(s,
366                                                     s.getSequenceNumber(),
367                                                     localmachineid,
368                                                     pendingTrans.getArbitrator(),
369                                                     pendingTrans.getKVUpdates(),
370                                                     pendingTrans.getGuard());
371                 boolean insertedTrans = false;
372                 if (s.hasSpace(trans)) {
373                         s.addEntry(trans);
374                         insertedTrans = true;
375                 }
376
377                 doOptionalRescue(s, seenliveslot, seqn, resize);
378                 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
379         }
380
381         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
382                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
383                 int newsize = 0;
384                 if (liveslotcount > resizethreshold) {
385                         resize = true; //Resize is forced
386                 }
387
388                 if (resize) {
389                         newsize = (int) (numslots * RESIZE_MULTIPLE);
390                         TableStatus status = new TableStatus(s, newsize);
391                         s.addEntry(status);
392                 }
393
394                 doRejectedMessages(s);
395                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
396
397                 // Resize was needed so redo call
398                 if (retTup.getFirst()) {
399                         return tryput(keyName, arbMachineid, true);
400                 }
401
402                 // Extract working variables
403                 boolean seenliveslot = retTup.getSecond();
404                 long seqn = retTup.getThird();
405
406
407                 doArbitration(s);
408
409                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
410
411                 boolean insertedNewKey = false;
412                 if (s.hasSpace(newKey)) {
413                         s.addEntry(newKey);
414                         insertedNewKey = true;
415                 }
416
417                 doOptionalRescue(s, seenliveslot, seqn, resize);
418                 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
419         }
420
421         private void doRejectedMessages(Slot s) {
422                 if (! rejectedmessagelist.isEmpty()) {
423                         /* TODO: We should avoid generating a rejected message entry if
424                          * there is already a sufficient entry in the queue (e.g.,
425                          * equalsto value of true and same sequence number).  */
426
427                         long old_seqn = rejectedmessagelist.firstElement();
428                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
429                                 long new_seqn = rejectedmessagelist.lastElement();
430                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
431                                 s.addEntry(rm);
432                         } else {
433                                 long prev_seqn = -1;
434                                 int i = 0;
435                                 /* Go through list of missing messages */
436                                 for (; i < rejectedmessagelist.size(); i++) {
437                                         long curr_seqn = rejectedmessagelist.get(i);
438                                         Slot s_msg = buffer.getSlot(curr_seqn);
439                                         if (s_msg != null)
440                                                 break;
441                                         prev_seqn = curr_seqn;
442                                 }
443                                 /* Generate rejected message entry for missing messages */
444                                 if (prev_seqn != -1) {
445                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
446                                         s.addEntry(rm);
447                                 }
448                                 /* Generate rejected message entries for present messages */
449                                 for (; i < rejectedmessagelist.size(); i++) {
450                                         long curr_seqn = rejectedmessagelist.get(i);
451                                         Slot s_msg = buffer.getSlot(curr_seqn);
452                                         long machineid = s_msg.getMachineID();
453                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
454                                         s.addEntry(rm);
455                                 }
456                         }
457                 }
458         }
459
460         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
461                 long newestseqnum = buffer.getNewestSeqNum();
462                 long oldestseqnum = buffer.getOldestSeqNum();
463                 if (lastliveslotseqn < oldestseqnum)
464                         lastliveslotseqn = oldestseqnum;
465
466                 long seqn = lastliveslotseqn;
467                 boolean seenliveslot = false;
468                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
469                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
470
471
472                 // Mandatory Rescue
473                 for (; seqn < threshold; seqn++) {
474                         Slot prevslot = buffer.getSlot(seqn);
475                         // Push slot number forward
476                         if (! seenliveslot)
477                                 lastliveslotseqn = seqn;
478
479                         if (! prevslot.isLive())
480                                 continue;
481                         seenliveslot = true;
482                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
483                         for (Entry liveentry : liveentries) {
484                                 if (s.hasSpace(liveentry)) {
485                                         s.addEntry(liveentry);
486                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
487                                         if (!resize) {
488                                                 System.out.println("B"); //?
489                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
490                                         }
491                                 }
492                         }
493                 }
494
495                 // Did not resize
496                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
497         }
498
499         private boolean doArbitration(Slot s) {
500                 // Arbitrate
501                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
502
503                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
504
505                 // Sort from oldest to newest
506                 Collections.sort(transSeqNums);
507
508
509                 boolean didNeedArbitration = false;
510                 for (Long transNum : transSeqNums) {
511                         Transaction ut = uncommittedTransactionsMap.get(transNum);
512
513                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
514                         // Check if this machine arbitrates for this transaction
515                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
516                                 continue;
517                         }
518
519                         // we did have something to arbitrate on
520                         didNeedArbitration = true;
521
522                         Entry newEntry = null;
523
524                         try {
525                                 if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
526                                         // Guard evaluated as true
527
528                                         // update the local tmp current key set
529                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
530                                                 speculativeTableTmp.put(kv.getKey(), kv);
531                                         }
532
533                                         // create the commit
534                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
535                                 } else {
536                                         // Guard was false
537
538                                         // create the abort
539                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
540                                 }
541                         } catch (Exception e) {
542                                 e.printStackTrace();
543                         }
544
545                         if ((newEntry != null) && s.hasSpace(newEntry)) {
546                                 s.addEntry(newEntry);
547                         } else {
548                                 break;
549                         }
550                 }
551
552                 return didNeedArbitration;
553         }
554
555         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
556                 /* now go through live entries from least to greatest sequence number until
557                  * either all live slots added, or the slot doesn't have enough room
558                  * for SKIP_THRESHOLD consecutive entries*/
559                 int skipcount = 0;
560                 long newestseqnum = buffer.getNewestSeqNum();
561                 search:
562                 for (; seqn <= newestseqnum; seqn++) {
563                         Slot prevslot = buffer.getSlot(seqn);
564                         //Push slot number forward
565                         if (!seenliveslot)
566                                 lastliveslotseqn = seqn;
567
568                         if (!prevslot.isLive())
569                                 continue;
570                         seenliveslot = true;
571                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
572                         for (Entry liveentry : liveentries) {
573                                 if (s.hasSpace(liveentry))
574                                         s.addEntry(liveentry);
575                                 else {
576                                         skipcount++;
577                                         if (skipcount > SKIP_THRESHOLD)
578                                                 break search;
579                                 }
580                         }
581                 }
582         }
583
584         private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
585                 int max = 0;
586                 if (resize)
587                         max = newsize;
588                 Slot[] array = cloud.putSlot(s, max);
589                 if (array == null) {
590                         array = new Slot[] {s};
591                         rejectedmessagelist.clear();
592                 }       else {
593                         if (array.length == 0)
594                                 throw new Error("Server Error: Did not send any slots");
595                         rejectedmessagelist.add(s.getSequenceNumber());
596                         inserted = false;
597                 }
598
599                 validateandupdate(array, true);
600                 return inserted;
601         }
602
603         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
604                 /* The cloud communication layer has checked slot HMACs already
605                          before decoding */
606                 if (newslots.length == 0) return;
607
608                 long firstseqnum = newslots[0].getSequenceNumber();
609                 if (firstseqnum <= sequencenumber) {
610                         throw new Error("Server Error: Sent older slots!");
611                 }
612
613                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
614                 checkHMACChain(indexer, newslots);
615
616                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
617
618                 initExpectedSize(firstseqnum);
619                 for (Slot slot : newslots) {
620                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
621                         updateExpectedSize();
622                 }
623
624
625                 boolean hasGap = false;
626                 /* If there is a gap, check to see if the server sent us everything. */
627                 if (firstseqnum != (sequencenumber + 1)) {
628
629                         // TODO: Check size
630                         checkNumSlots(newslots.length);
631                         if (!machineSet.isEmpty()) {
632                                 throw new Error("Missing record for machines: " + machineSet);
633                         }
634                 }
635
636                 commitNewMaxSize();
637
638                 /* Commit new to slots. */
639                 for (Slot slot : newslots) {
640                         buffer.putSlot(slot);
641                         liveslotcount++;
642                 }
643                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
644
645                 // Process all on key value pairs
646                 proccessAllNewCommits();
647
648                 // Go through all uncommitted transactions and kill the ones that are dead
649                 deleteDeadUncommittedTransactions();
650
651                 // Speculate on key value pairs
652                 createSpeculativeTable();
653         }
654
655         public void proccessAllNewCommits() {
656
657                 // Process only if there are commit
658                 if (newCommitMap.keySet().size() == 0) {
659                         return;
660                 }
661
662                 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
663
664                 // Sort from oldest to newest commit
665                 Collections.sort(commitSeqNums);
666
667                 // Go through each new commit one by one
668                 for (Long entrySeqNum : commitSeqNums) {
669                         Commit entry = newCommitMap.get(entrySeqNum);
670
671                         long lastCommitSeenSeqNum = 0;
672
673                         if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
674                                 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
675                         }
676
677                         if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
678
679                                 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
680
681                                 if (prevCommit != null) {
682                                         prevCommit.setDead();
683
684                                         for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
685                                                 committedMapByKey.put(kv.getKey(), entry);
686                                         }
687                                 }
688
689                                 continue;
690                         }
691
692                         Set<Commit> commitsToEditSet = new HashSet<Commit>();
693
694                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
695                                 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
696                         }
697
698                         commitsToEditSet.remove(null);
699
700                         for (Commit prevCommit : commitsToEditSet) {
701
702                                 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
703
704                                 if (!prevCommit.isLive()) {
705                                         commitMap.remove(prevCommit.getTransSequenceNumber());
706                                 }
707                         }
708
709                         // // Remove any old commits
710                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
711                         //      Commit prevCommit = i.next().getValue();
712                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
713
714                         //      if (!prevCommit.isLive()) {
715                         //              i.remove();
716                         //      }
717                         // }
718
719                         // Remove any old commits
720                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
721                         //      Commit prevCommit = i.next().getValue();
722
723                         //      if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
724                         //              continue;
725                         //      }
726
727                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
728
729                         //      if (!prevCommit.isLive()) {
730                         //              i.remove();
731                         //      }
732                         // }
733
734
735                         // Add the new commit
736                         commitMap.put(entry.getTransSequenceNumber(), entry);
737                         lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
738
739                         // Update the committed table list
740                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
741                                 IoTString key = kv.getKey();
742                                 commitedTable.put(key, kv);
743
744                                 committedMapByKey.put(key, entry);
745                         }
746                 }
747
748                 // Clear the new commits storage so we can use it later
749                 newCommitMap.clear();
750         }
751
752         private void deleteDeadUncommittedTransactions() {
753                 // Make dead the transactions
754                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
755                         Transaction prevtrans = i.next().getValue();
756                         long transArb = prevtrans.getArbitrator();
757
758                         if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
759                                 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
760                                 i.remove();
761                                 prevtrans.setDead();
762                         }
763                 }
764         }
765
766         private void createSpeculativeTable() {
767
768                 if (uncommittedTransactionsMap.keySet().size() == 0) {
769                         speculativeTable = commitedTable; // Ok that they are the same object
770                         return;
771                 }
772
773                 Map speculativeTableTmp = null;
774                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
775
776                 // Sort from oldest to newest commit
777                 Collections.sort(utSeqNums);
778
779                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
780                         speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
781
782                         for (Long key : utSeqNums) {
783                                 Transaction trans = uncommittedTransactionsMap.get(key);
784
785                                 lastUncommittedTransaction = key;
786
787                                 try {
788                                         if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
789                                                 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
790                                                         speculativeTableTmp.put(kv.getKey(), kv);
791                                                 }
792                                         }
793
794                                 } catch (Exception e) {
795                                         e.printStackTrace();
796                                 }
797                         }
798                 } else {
799                         speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
800
801                         for (Long key : utSeqNums) {
802
803                                 if (key <= lastUncommittedTransaction) {
804                                         continue;
805                                 }
806
807                                 lastUncommittedTransaction = key;
808
809                                 Transaction trans = uncommittedTransactionsMap.get(key);
810
811                                 try {
812                                         if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
813                                                 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
814                                                         speculativeTableTmp.put(kv.getKey(), kv);
815                                                 }
816                                         }
817
818                                 } catch (Exception e) {
819                                         e.printStackTrace();
820                                 }
821                         }
822                 }
823
824                 speculativeTable = speculativeTableTmp;
825         }
826
827         private int expectedsize, currmaxsize;
828
829         private void checkNumSlots(int numslots) {
830                 if (numslots != expectedsize) {
831                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
832                 }
833         }
834
835         private void initExpectedSize(long firstsequencenumber) {
836                 long prevslots = firstsequencenumber;
837                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
838                 currmaxsize = numslots;
839         }
840
841         private void updateExpectedSize() {
842                 expectedsize++;
843                 if (expectedsize > currmaxsize) {
844                         expectedsize = currmaxsize;
845                 }
846         }
847
848         private void updateCurrMaxSize(int newmaxsize) {
849                 currmaxsize = newmaxsize;
850         }
851
852         private void commitNewMaxSize() {
853                 if (numslots != currmaxsize) {
854                         buffer.resize(currmaxsize);
855                 }
856
857                 numslots = currmaxsize;
858                 setResizeThreshold();
859         }
860
861         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
862                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
863         }
864
865         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
866                 long oldseqnum = entry.getOldSeqNum();
867                 long newseqnum = entry.getNewSeqNum();
868                 boolean isequal = entry.getEqual();
869                 long machineid = entry.getMachineID();
870                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
871                         Slot slot = indexer.getSlot(seqnum);
872                         if (slot != null) {
873                                 long slotmachineid = slot.getMachineID();
874                                 if (isequal != (slotmachineid == machineid)) {
875                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
876                                 }
877                         }
878                 }
879
880                 HashSet<Long> watchset = new HashSet<Long>();
881                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
882                         long entry_mid = lastmsg_entry.getKey();
883                         /* We've seen it, don't need to continue to watch.  Our next
884                          * message will implicitly acknowledge it. */
885                         if (entry_mid == localmachineid)
886                                 continue;
887                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
888                         long entry_seqn = v.getFirst();
889                         if (entry_seqn < newseqnum) {
890                                 addWatchList(entry_mid, entry);
891                                 watchset.add(entry_mid);
892                         }
893                 }
894                 if (watchset.isEmpty())
895                         entry.setDead();
896                 else
897                         entry.setWatchSet(watchset);
898         }
899
900         private void processEntry(NewKey entry) {
901                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
902
903                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
904
905                 if (oldNewKey != null) {
906                         oldNewKey.setDead();
907                 }
908         }
909
910         private void processEntry(Transaction entry) {
911                 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
912
913                 // Duplicate so delete old copy
914                 if (prevTrans != null) {
915                         prevTrans.setDead();
916                 }
917         }
918
919         private void processEntry(Abort entry) {
920
921                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
922                         // Abort has not been seen yet so we need to keep track of it
923
924                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
925                         if (prevAbort != null) {
926                                 prevAbort.setDead(); // delete old version of the duplicate
927                         }
928                 } else {
929                         // The machine already saw this so it is dead
930                         entry.setDead();
931                 }
932
933                 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
934         }
935
936         private void processEntry(Commit entry, Slot s) {
937                 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
938                 if (prevCommit != null) {
939                         prevCommit.setDead();
940                 }
941         }
942
943         private void processEntry(TableStatus entry) {
944                 int newnumslots = entry.getMaxSlots();
945                 updateCurrMaxSize(newnumslots);
946                 if (lastTableStatus != null)
947                         lastTableStatus.setDead();
948                 lastTableStatus = entry;
949         }
950
951         private void addWatchList(long machineid, RejectedMessage entry) {
952                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
953                 if (entries == null)
954                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
955                 entries.add(entry);
956         }
957
958         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
959                 machineSet.remove(machineid);
960
961                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
962                 if (watchset != null) {
963                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
964                                 RejectedMessage rm = rmit.next();
965                                 if (rm.getNewSeqNum() <= seqnum) {
966                                         /* Remove it from our watchlist */
967                                         rmit.remove();
968                                         /* Decrement machines that need to see this notification */
969                                         rm.removeWatcher(machineid);
970                                 }
971                         }
972                 }
973
974                 if (machineid == localmachineid) {
975                         /* Our own messages are immediately dead. */
976                         if (liveness instanceof LastMessage) {
977                                 ((LastMessage)liveness).setDead();
978                         } else if (liveness instanceof Slot) {
979                                 ((Slot)liveness).setDead();
980                         } else {
981                                 throw new Error("Unrecognized type");
982                         }
983                 }
984
985                 // Set dead the abort
986                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
987                         Abort abort = i.next().getValue();
988
989                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
990                                 abort.setDead();
991                                 i.remove();
992                         }
993                 }
994
995                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
996                 if (lastmsgentry == null)
997                         return;
998
999                 long lastmsgseqnum = lastmsgentry.getFirst();
1000                 Liveness lastentry = lastmsgentry.getSecond();
1001                 if (machineid != localmachineid) {
1002                         if (lastentry instanceof LastMessage) {
1003                                 ((LastMessage)lastentry).setDead();
1004                         } else if (lastentry instanceof Slot) {
1005                                 ((Slot)lastentry).setDead();
1006                         } else {
1007                                 throw new Error("Unrecognized type");
1008                         }
1009                 }
1010
1011                 if (machineid == localmachineid) {
1012                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1013                                 throw new Error("Server Error: Mismatch on local machine sequence number");
1014                 } else {
1015                         if (lastmsgseqnum > seqnum)
1016                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1017                 }
1018         }
1019
1020         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1021                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1022                 for (Entry entry : slot.getEntries()) {
1023                         switch (entry.getType()) {
1024
1025                         case Entry.TypeNewKey:
1026                                 processEntry((NewKey)entry);
1027                                 break;
1028
1029                         case Entry.TypeCommit:
1030                                 processEntry((Commit)entry, slot);
1031                                 break;
1032
1033                         case Entry.TypeAbort:
1034                                 processEntry((Abort)entry);
1035                                 break;
1036
1037                         case Entry.TypeTransaction:
1038                                 processEntry((Transaction)entry);
1039                                 break;
1040
1041                         case Entry.TypeLastMessage:
1042                                 processEntry((LastMessage)entry, machineSet);
1043                                 break;
1044
1045                         case Entry.TypeRejectedMessage:
1046                                 processEntry((RejectedMessage)entry, indexer);
1047                                 break;
1048
1049                         case Entry.TypeTableStatus:
1050                                 processEntry((TableStatus)entry);
1051                                 break;
1052
1053                         default:
1054                                 throw new Error("Unrecognized type: " + entry.getType());
1055                         }
1056                 }
1057         }
1058
1059         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1060                 for (int i = 0; i < newslots.length; i++) {
1061                         Slot currslot = newslots[i];
1062                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1063                         if (prevslot != null &&
1064                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1065                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1066                 }
1067         }
1068 }