Changed way Guard works, Sped up code
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16
17
18 /**
19  * IoTTable data structure.  Provides client inferface.
20  * @author Brian Demsky
21  * @version 1.0
22  */
23
24 final public class Table {
25         private int numslots;   //number of slots stored in buffer
26
27         //table of key-value pairs
28         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
29
30         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
32         // machine id -> ...
33         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34         private Vector<Long> rejectedmessagelist = new Vector<Long>();
35         private SlotBuffer buffer;
36         private CloudComm cloud;
37         private long sequencenumber; //Largest sequence number a client has received
38         private long localmachineid;
39         private TableStatus lastTableStatus;
40         static final int FREE_SLOTS = 10; //number of slots that should be kept free
41         static final int SKIP_THRESHOLD = 10;
42         private long liveslotcount = 0;
43         private int chance;
44         static final double RESIZE_MULTIPLE = 1.2;
45         static final double RESIZE_THRESHOLD = 0.75;
46         static final int REJECTED_THRESHOLD = 5;
47         private int resizethreshold;
48         private long lastliveslotseqn;  //smallest sequence number with a live entry
49         private Random random = new Random();
50         private long lastUncommittedTransaction = 0;
51
52         private int smallestTableStatusSeen = -1;
53         private int largestTableStatusSeen = -1;
54
55         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
56         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
57         private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
58         private Map<Long, Abort> abortMap = null; // Set of the live aborts
59         private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
60         private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
61         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
62         private Map<Long, Transaction> uncommittedTransactionsMap = null;
63         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
64         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
65         private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
66         private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
67         private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68
69
70
71         public Table(String baseurl, String password, long _localmachineid) {
72                 localmachineid = _localmachineid;
73                 buffer = new SlotBuffer();
74                 numslots = buffer.capacity();
75                 setResizeThreshold();
76                 sequencenumber = 0;
77                 cloud = new CloudComm(this, baseurl, password);
78                 lastliveslotseqn = 1;
79
80                 setupDataStructs();
81         }
82
83         public Table(CloudComm _cloud, long _localmachineid) {
84                 localmachineid = _localmachineid;
85                 buffer = new SlotBuffer();
86                 numslots = buffer.capacity();
87                 setResizeThreshold();
88                 sequencenumber = 0;
89                 cloud = _cloud;
90
91                 setupDataStructs();
92         }
93
94         private void setupDataStructs() {
95                 pendingTransQueue = new LinkedList<PendingTransaction>();
96                 commitMap = new HashMap<Long, Commit>();
97                 abortMap = new HashMap<Long, Abort>();
98                 committedMapByKey = new HashMap<IoTString, Commit>();
99                 commitedTable = new HashMap<IoTString, KeyValue>();
100                 speculativeTable = new HashMap<IoTString, KeyValue>();
101                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
102                 arbitratorTable = new HashMap<IoTString, Long>();
103                 newKeyTable = new HashMap<IoTString, NewKey>();
104                 newCommitMap = new HashMap<Long, Commit>();
105                 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
106                 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
107         }
108
109         public void rebuild() throws ServerException {
110                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
111                 validateandupdate(newslots, true);
112         }
113
114         // // TODO: delete method
115         // public void printSlots() {
116         //      long o = buffer.getOldestSeqNum();
117         //      long n = buffer.getNewestSeqNum();
118
119         //      int[] types = new int[10];
120
121         //      int num = 0;
122
123         //      int livec = 0;
124         //      int deadc = 0;
125         //      for (long i = o; i < (n + 1); i++) {
126         //              Slot s = buffer.getSlot(i);
127
128         //              Vector<Entry> entries = s.getEntries();
129
130         //              for (Entry e : entries) {
131         //                      if (e.isLive()) {
132         //                              int type = e.getType();
133         //                              types[type] = types[type] + 1;
134         //                              num++;
135         //                              livec++;
136         //                      } else {
137         //                              deadc++;
138         //                      }
139         //              }
140         //      }
141
142         //      for (int i = 0; i < 10; i++) {
143         //              System.out.println(i + "    " + types[i]);
144         //      }
145         //      System.out.println("Live count:   " + livec);
146         //      System.out.println("Dead count:   " + deadc);
147         //      System.out.println("Old:   " + o);
148         //      System.out.println("New:   " + n);
149         //      System.out.println("Size:   " + buffer.size());
150         //      System.out.println("Commits Map:   " + commitedTable.size());
151         //      System.out.println("Commits List:   " + commitMap.size());
152         // }
153
154         public IoTString getCommitted(IoTString key) {
155                 KeyValue kv = commitedTable.get(key);
156                 if (kv != null) {
157                         return kv.getValue();
158                 } else {
159                         return null;
160                 }
161         }
162
163         public IoTString getSpeculative(IoTString key) {
164                 KeyValue kv = speculativeTable.get(key);
165                 if (kv != null) {
166                         return kv.getValue();
167                 } else {
168                         return null;
169                 }
170         }
171
172         public IoTString getCommittedAtomic(IoTString key) {
173                 KeyValue kv = commitedTable.get(key);
174
175                 if (arbitratorTable.get(key) == null) {
176                         throw new Error("Key not Found.");
177                 }
178
179                 // Make sure new key value pair matches the current arbitrator
180                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
181                         // TODO: Maybe not throw en error
182                         throw new Error("Not all Key Values Match Arbitrator.");
183                 }
184
185                 if (kv != null) {
186                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
187                         return kv.getValue();
188                 } else {
189                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
190                         return null;
191                 }
192         }
193
194         public IoTString getSpeculativeAtomic(IoTString key) {
195
196                 if (arbitratorTable.get(key) == null) {
197                         throw new Error("Key not Found.");
198                 }
199
200                 // Make sure new key value pair matches the current arbitrator
201                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
202                         // TODO: Maybe not throw en error
203                         throw new Error("Not all Key Values Match Arbitrator.");
204                 }
205
206                 KeyValue kv = speculativeTable.get(key);
207                 if (kv == null) {
208                         kv = commitedTable.get(key);
209                 }
210
211                 if (kv != null) {
212                         pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
213                         return kv.getValue();
214                 } else {
215                         pendingTransBuild.addKVGuard(new KeyValue(key, null));
216                         return null;
217                 }
218         }
219
220         public Long getArbitrator(IoTString key) {
221                 return arbitratorTable.get(key);
222         }
223
224         public void initTable() throws ServerException {
225                 cloud.setSalt();//Set the salt
226                 Slot s = new Slot(this, 1, localmachineid);
227                 TableStatus status = new TableStatus(s, numslots);
228                 s.addEntry(status);
229                 Slot[] array = cloud.putSlot(s, numslots);
230                 if (array == null) {
231                         array = new Slot[] {s};
232                         /* update data structure */
233                         validateandupdate(array, true);
234                 } else {
235                         throw new Error("Error on initialization");
236                 }
237         }
238
239         public String toString() {
240                 String retString = " Committed Table: \n";
241                 retString += "---------------------------\n";
242                 retString += commitedTable.toString();
243
244                 retString += "\n\n";
245
246                 retString += " Speculative Table: \n";
247                 retString += "---------------------------\n";
248                 retString += speculativeTable.toString();
249
250                 return retString;
251         }
252
253         public void startTransaction() {
254                 // Create a new transaction, invalidates any old pending transactions.
255                 pendingTransBuild = new PendingTransaction();
256         }
257
258         public void commitTransaction() throws ServerException {
259
260                 if (pendingTransBuild.getKVUpdates().size() == 0) {
261                         // If no updates are made then there is no point inserting into the chain
262                         return;
263                 }
264
265                 // Add the pending transaction to the queue
266                 pendingTransQueue.add(pendingTransBuild);
267
268                 // Delete since already inserted
269                 pendingTransBuild = new PendingTransaction();
270
271                 while (!pendingTransQueue.isEmpty()) {
272                         if (tryput( pendingTransQueue.peek(), false)) {
273                                 pendingTransQueue.poll();
274                         }
275                 }
276         }
277
278         public void addKV(IoTString key, IoTString value) {
279
280                 if (arbitratorTable.get(key) == null) {
281                         throw new Error("Key not Found.");
282                 }
283
284                 // Make sure new key value pair matches the current arbitrator
285                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
286                         // TODO: Maybe not throw en error
287                         throw new Error("Not all Key Values Match Arbitrator.");
288                 }
289
290                 KeyValue kv = new KeyValue(key, value);
291                 pendingTransBuild.addKV(kv);
292         }
293
294         public void update()  throws ServerException {
295
296                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
297
298                 validateandupdate(newslots, false);
299
300
301                 if (!pendingTransQueue.isEmpty()) {
302                         // We have a pending transaction so do full insertion
303
304                         while (!pendingTransQueue.isEmpty()) {
305                                 if (tryput( pendingTransQueue.peek(), false)) {
306                                         pendingTransQueue.poll();
307                                 }
308                         }
309                 } else {
310                         // We dont have a pending transaction so do minimal effort
311                         if (uncommittedTransactionsMap.keySet().size() > 0) {
312
313                                 boolean doEnd = false;
314                                 boolean needResize = false;
315                                 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
316                                         boolean resize = needResize;
317                                         needResize = false;
318
319                                         Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
320                                         int newsize = 0;
321                                         if (liveslotcount > resizethreshold) {
322                                                 resize = true; //Resize is forced
323                                         }
324
325                                         if (resize) {
326                                                 newsize = (int) (numslots * RESIZE_MULTIPLE);
327                                                 TableStatus status = new TableStatus(s, newsize);
328                                                 s.addEntry(status);
329                                         }
330
331                                         doRejectedMessages(s);
332
333                                         ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
334
335                                         // Resize was needed so redo call
336                                         if (retTup.getFirst()) {
337                                                 needResize = true;
338                                                 continue;
339                                         }
340
341                                         // Extract working variables
342                                         boolean seenliveslot = retTup.getSecond();
343                                         long seqn = retTup.getThird();
344
345                                         // Did need to arbitrate
346                                         doEnd = !doArbitration(s);
347
348                                         doOptionalRescue(s, seenliveslot, seqn, resize);
349
350                                         int max = 0;
351                                         if (resize) {
352                                                 max = newsize;
353                                         }
354
355                                         Slot[] array = cloud.putSlot(s, max);
356                                         if (array == null) {
357                                                 array = new Slot[] {s};
358                                                 rejectedmessagelist.clear();
359                                         }       else {
360                                                 if (array.length == 0)
361                                                         throw new Error("Server Error: Did not send any slots");
362                                                 rejectedmessagelist.add(s.getSequenceNumber());
363                                                 doEnd = false;
364                                         }
365
366                                         /* update data structure */
367                                         validateandupdate(array, true);
368                                 }
369                         }
370                 }
371         }
372
373         public boolean createNewKey(IoTString keyName, long machineId)  throws ServerException {
374
375                 while (true) {
376                         if (arbitratorTable.get(keyName) != null) {
377                                 // There is already an arbitrator
378                                 return false;
379                         }
380
381                         if (tryput(keyName, machineId, false)) {
382                                 // If successfully inserted
383                                 return true;
384                         }
385                 }
386         }
387
388         public void decrementLiveCount() {
389                 liveslotcount--;
390         }
391
392         private void setResizeThreshold() {
393                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
394                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
395         }
396
397         private boolean tryput(PendingTransaction pendingTrans, boolean resize)  throws ServerException {
398                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
399
400                 int newsize = 0;
401                 if (liveslotcount > resizethreshold) {
402                         resize = true; //Resize is forced
403                 }
404
405                 if (resize) {
406                         newsize = (int) (numslots * RESIZE_MULTIPLE);
407                         TableStatus status = new TableStatus(s, newsize);
408                         s.addEntry(status);
409                 }
410
411                 doRejectedMessages(s);
412
413                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
414
415                 // Resize was needed so redo call
416                 if (retTup.getFirst()) {
417                         return tryput(pendingTrans, true);
418                 }
419
420                 // Extract working variables
421                 boolean seenliveslot = retTup.getSecond();
422                 long seqn = retTup.getThird();
423
424                 doArbitration(s);
425
426                 Transaction trans = new Transaction(s,
427                                                     s.getSequenceNumber(),
428                                                     localmachineid,
429                                                     pendingTrans.getArbitrator(),
430                                                     pendingTrans.getKVUpdates(),
431                                                     pendingTrans.getKVGuard());
432                 boolean insertedTrans = false;
433                 if (s.hasSpace(trans)) {
434                         s.addEntry(trans);
435                         insertedTrans = true;
436                 }
437
438                 doOptionalRescue(s, seenliveslot, seqn, resize);
439                 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
440         }
441
442         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize)  throws ServerException {
443                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
444                 int newsize = 0;
445                 if (liveslotcount > resizethreshold) {
446                         resize = true; //Resize is forced
447                 }
448
449                 if (resize) {
450                         newsize = (int) (numslots * RESIZE_MULTIPLE);
451                         TableStatus status = new TableStatus(s, newsize);
452                         s.addEntry(status);
453                 }
454
455                 doRejectedMessages(s);
456                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
457
458                 // Resize was needed so redo call
459                 if (retTup.getFirst()) {
460                         return tryput(keyName, arbMachineid, true);
461                 }
462
463                 // Extract working variables
464                 boolean seenliveslot = retTup.getSecond();
465                 long seqn = retTup.getThird();
466
467                 doArbitration(s);
468
469                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
470
471                 boolean insertedNewKey = false;
472                 if (s.hasSpace(newKey)) {
473                         s.addEntry(newKey);
474                         insertedNewKey = true;
475                 }
476
477                 doOptionalRescue(s, seenliveslot, seqn, resize);
478                 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
479         }
480
481         private void doRejectedMessages(Slot s) {
482                 if (! rejectedmessagelist.isEmpty()) {
483                         /* TODO: We should avoid generating a rejected message entry if
484                          * there is already a sufficient entry in the queue (e.g.,
485                          * equalsto value of true and same sequence number).  */
486
487                         long old_seqn = rejectedmessagelist.firstElement();
488                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
489                                 long new_seqn = rejectedmessagelist.lastElement();
490                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
491                                 s.addEntry(rm);
492                         } else {
493                                 long prev_seqn = -1;
494                                 int i = 0;
495                                 /* Go through list of missing messages */
496                                 for (; i < rejectedmessagelist.size(); i++) {
497                                         long curr_seqn = rejectedmessagelist.get(i);
498                                         Slot s_msg = buffer.getSlot(curr_seqn);
499                                         if (s_msg != null)
500                                                 break;
501                                         prev_seqn = curr_seqn;
502                                 }
503                                 /* Generate rejected message entry for missing messages */
504                                 if (prev_seqn != -1) {
505                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
506                                         s.addEntry(rm);
507                                 }
508                                 /* Generate rejected message entries for present messages */
509                                 for (; i < rejectedmessagelist.size(); i++) {
510                                         long curr_seqn = rejectedmessagelist.get(i);
511                                         Slot s_msg = buffer.getSlot(curr_seqn);
512                                         long machineid = s_msg.getMachineID();
513                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
514                                         s.addEntry(rm);
515                                 }
516                         }
517                 }
518         }
519
520         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
521                 long newestseqnum = buffer.getNewestSeqNum();
522                 long oldestseqnum = buffer.getOldestSeqNum();
523                 if (lastliveslotseqn < oldestseqnum)
524                         lastliveslotseqn = oldestseqnum;
525
526                 long seqn = lastliveslotseqn;
527                 boolean seenliveslot = false;
528                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
529                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
530
531
532                 // Mandatory Rescue
533                 for (; seqn < threshold; seqn++) {
534                         Slot prevslot = buffer.getSlot(seqn);
535                         // Push slot number forward
536                         if (! seenliveslot)
537                                 lastliveslotseqn = seqn;
538
539                         if (! prevslot.isLive())
540                                 continue;
541                         seenliveslot = true;
542                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
543                         for (Entry liveentry : liveentries) {
544                                 if (s.hasSpace(liveentry)) {
545                                         s.addEntry(liveentry);
546                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
547                                         if (!resize) {
548                                                 System.out.println("B"); //?
549                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
550                                         }
551                                 }
552                         }
553                 }
554
555                 // Did not resize
556                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
557         }
558
559         private boolean doArbitration(Slot s) {
560                 // Arbitrate
561                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
562                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
563
564                 // Sort from oldest to newest
565                 Collections.sort(transSeqNums);
566
567                 boolean didNeedArbitration = false;
568                 for (Long transNum : transSeqNums) {
569                         Transaction ut = uncommittedTransactionsMap.get(transNum);
570
571                         // Check if this machine arbitrates for this transaction
572                         if (ut.getArbitrator() != localmachineid ) {
573                                 continue;
574                         }
575
576                         // we did have something to arbitrate on
577                         didNeedArbitration = true;
578
579                         Entry newEntry = null;
580
581                         if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
582                                 // Guard evaluated as true
583
584                                 // update the local tmp current key set
585                                 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
586                                         speculativeTableTmp.put(kv.getKey(), kv);
587                                 }
588
589                                 // create the commit
590                                 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
591                         } else {
592                                 // Guard was false
593
594                                 // create the abort
595                                 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
596                         }
597
598                         if ((newEntry != null) && s.hasSpace(newEntry)) {
599                                 s.addEntry(newEntry);
600                         } else {
601                                 break;
602                         }
603                 }
604
605                 return didNeedArbitration;
606         }
607
608         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
609                 /* now go through live entries from least to greatest sequence number until
610                  * either all live slots added, or the slot doesn't have enough room
611                  * for SKIP_THRESHOLD consecutive entries*/
612                 int skipcount = 0;
613                 long newestseqnum = buffer.getNewestSeqNum();
614                 search:
615                 for (; seqn <= newestseqnum; seqn++) {
616                         Slot prevslot = buffer.getSlot(seqn);
617                         //Push slot number forward
618                         if (!seenliveslot)
619                                 lastliveslotseqn = seqn;
620
621                         if (!prevslot.isLive())
622                                 continue;
623                         seenliveslot = true;
624                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
625                         for (Entry liveentry : liveentries) {
626                                 if (s.hasSpace(liveentry))
627                                         s.addEntry(liveentry);
628                                 else {
629                                         skipcount++;
630                                         if (skipcount > SKIP_THRESHOLD)
631                                                 break search;
632                                 }
633                         }
634                 }
635         }
636
637         private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
638                 int max = 0;
639                 if (resize)
640                         max = newsize;
641                 Slot[] array = cloud.putSlot(s, max);
642                 if (array == null) {
643                         array = new Slot[] {s};
644                         rejectedmessagelist.clear();
645                 }       else {
646                         if (array.length == 0)
647                                 throw new Error("Server Error: Did not send any slots");
648                         rejectedmessagelist.add(s.getSequenceNumber());
649                         inserted = false;
650                 }
651
652                 validateandupdate(array, true);
653                 return inserted;
654         }
655
656         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
657                 /* The cloud communication layer has checked slot HMACs already
658                          before decoding */
659                 if (newslots.length == 0) return;
660
661                 // Reset the table status declared sizes
662                 smallestTableStatusSeen = -1;
663                 largestTableStatusSeen = -1;
664
665                 long firstseqnum = newslots[0].getSequenceNumber();
666                 if (firstseqnum <= sequencenumber) {
667                         throw new Error("Server Error: Sent older slots!");
668                 }
669
670                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
671                 checkHMACChain(indexer, newslots);
672
673                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
674
675                 // initExpectedSize(firstseqnum);
676                 for (Slot slot : newslots) {
677                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
678                         // updateExpectedSize();
679                 }
680
681                 /* If there is a gap, check to see if the server sent us everything. */
682                 if (firstseqnum != (sequencenumber + 1)) {
683
684                         // TODO: Check size
685                         checkNumSlots(newslots.length);
686                         if (!machineSet.isEmpty()) {
687                                 throw new Error("Missing record for machines: " + machineSet);
688                         }
689                 }
690
691
692                 commitNewMaxSize();
693
694                 /* Commit new to slots. */
695                 for (Slot slot : newslots) {
696                         buffer.putSlot(slot);
697                         liveslotcount++;
698                 }
699                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
700
701                 // Process all on key value pairs
702                 proccessAllNewCommits();
703
704                 // Go through all uncommitted transactions and kill the ones that are dead
705                 deleteDeadUncommittedTransactions();
706
707                 // Speculate on key value pairs
708                 createSpeculativeTable();
709         }
710
711         public void proccessAllNewCommits() {
712
713                 // Process only if there are commit
714                 if (newCommitMap.keySet().size() == 0) {
715                         return;
716                 }
717
718                 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
719
720                 // Sort from oldest to newest commit
721                 Collections.sort(commitSeqNums);
722
723                 // Go through each new commit one by one
724                 for (Long entrySeqNum : commitSeqNums) {
725                         Commit entry = newCommitMap.get(entrySeqNum);
726
727                         long lastCommitSeenSeqNum = 0;
728
729                         if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
730                                 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
731                         }
732
733                         if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
734
735                                 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
736
737                                 if (prevCommit != null) {
738                                         prevCommit.setDead();
739
740                                         for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
741                                                 committedMapByKey.put(kv.getKey(), entry);
742                                         }
743                                 }
744
745                                 continue;
746                         }
747
748                         Set<Commit> commitsToEditSet = new HashSet<Commit>();
749
750                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
751                                 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
752                         }
753
754                         commitsToEditSet.remove(null);
755
756                         for (Commit prevCommit : commitsToEditSet) {
757
758                                 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
759
760                                 if (!prevCommit.isLive()) {
761                                         commitMap.remove(prevCommit.getTransSequenceNumber());
762                                 }
763                         }
764
765                         // // Remove any old commits
766                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
767                         //      Commit prevCommit = i.next().getValue();
768                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
769
770                         //      if (!prevCommit.isLive()) {
771                         //              i.remove();
772                         //      }
773                         // }
774
775                         // Remove any old commits
776                         // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
777                         //      Commit prevCommit = i.next().getValue();
778
779                         //      if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
780                         //              continue;
781                         //      }
782
783                         //      prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
784
785                         //      if (!prevCommit.isLive()) {
786                         //              i.remove();
787                         //      }
788                         // }
789
790
791                         // Add the new commit
792                         commitMap.put(entry.getTransSequenceNumber(), entry);
793                         lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
794
795                         // Update the committed table list
796                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
797                                 IoTString key = kv.getKey();
798                                 commitedTable.put(key, kv);
799
800                                 committedMapByKey.put(key, entry);
801                         }
802                 }
803
804                 // Clear the new commits storage so we can use it later
805                 newCommitMap.clear();
806         }
807
808         private void deleteDeadUncommittedTransactions() {
809                 // Make dead the transactions
810                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
811                         Transaction prevtrans = i.next().getValue();
812                         long transArb = prevtrans.getArbitrator();
813
814                         if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
815                                 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
816                                 i.remove();
817                                 prevtrans.setDead();
818                         }
819                 }
820         }
821
822         private void createSpeculativeTable() {
823
824                 if (uncommittedTransactionsMap.keySet().size() == 0) {
825                         //      speculativeTable = commitedTable; // Ok that they are the same object
826                         return;
827                 }
828
829                 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
830                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
831
832                 // Sort from oldest to newest commit
833                 Collections.sort(utSeqNums);
834
835                 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
836
837                         speculativeTable.clear();
838                         lastUncommittedTransaction = -1;
839
840                         for (Long key : utSeqNums) {
841                                 Transaction trans = uncommittedTransactionsMap.get(key);
842
843                                 lastUncommittedTransaction = key;
844
845                                 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
846                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
847                                                 speculativeTableTmp.put(kv.getKey(), kv);
848                                         }
849                                 }
850
851                         }
852                 } else {
853                         for (Long key : utSeqNums) {
854
855                                 if (key <= lastUncommittedTransaction) {
856                                         continue;
857                                 }
858
859                                 lastUncommittedTransaction = key;
860
861                                 Transaction trans = uncommittedTransactionsMap.get(key);
862
863                                 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
864                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
865                                                 speculativeTableTmp.put(kv.getKey(), kv);
866                                         }
867                                 }
868                         }
869                 }
870
871                 for (IoTString key : speculativeTableTmp.keySet()) {
872                         speculativeTable.put(key, speculativeTableTmp.get(key));
873                 }
874
875                 // speculativeTable = speculativeTableTmp;
876         }
877
878         private int expectedsize, currmaxsize;
879
880         private void checkNumSlots(int numslots) {
881
882
883                 // We only have 1 size so we must have this many slots
884                 if (largestTableStatusSeen == smallestTableStatusSeen) {
885                         if (numslots != smallestTableStatusSeen) {
886                                 throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numslots);
887                         }
888                 } else {
889                         // We have more than 1
890                         if (numslots < smallestTableStatusSeen) {
891                                 throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
892                         }
893                 }
894
895                 // if (numslots != expectedsize) {
896                 // throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
897                 // }
898         }
899
900         private void initExpectedSize(long firstsequencenumber) {
901                 long prevslots = firstsequencenumber;
902                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
903                 currmaxsize = numslots;
904         }
905
906         private void updateExpectedSize() {
907                 expectedsize++;
908                 if (expectedsize > currmaxsize) {
909                         System.out.println("Maxing Out: " + expectedsize + "   " + currmaxsize);
910                         expectedsize = currmaxsize;
911                 }
912         }
913
914         private void updateCurrMaxSize(int newmaxsize) {
915                 currmaxsize = newmaxsize;
916         }
917
918         private void commitNewMaxSize() {
919
920                 if (largestTableStatusSeen == -1) {
921                         currmaxsize = numslots;
922                 } else {
923                         currmaxsize = largestTableStatusSeen;
924                 }
925
926                 if (numslots != currmaxsize) {
927                         buffer.resize(currmaxsize);
928                 }
929
930                 numslots = currmaxsize;
931                 setResizeThreshold();
932         }
933
934         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
935                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
936         }
937
938         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
939                 long oldseqnum = entry.getOldSeqNum();
940                 long newseqnum = entry.getNewSeqNum();
941                 boolean isequal = entry.getEqual();
942                 long machineid = entry.getMachineID();
943                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
944                         Slot slot = indexer.getSlot(seqnum);
945                         if (slot != null) {
946                                 long slotmachineid = slot.getMachineID();
947                                 if (isequal != (slotmachineid == machineid)) {
948                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
949                                 }
950                         }
951                 }
952
953                 HashSet<Long> watchset = new HashSet<Long>();
954                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
955                         long entry_mid = lastmsg_entry.getKey();
956                         /* We've seen it, don't need to continue to watch.  Our next
957                          * message will implicitly acknowledge it. */
958                         if (entry_mid == localmachineid)
959                                 continue;
960                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
961                         long entry_seqn = v.getFirst();
962                         if (entry_seqn < newseqnum) {
963                                 addWatchList(entry_mid, entry);
964                                 watchset.add(entry_mid);
965                         }
966                 }
967                 if (watchset.isEmpty())
968                         entry.setDead();
969                 else
970                         entry.setWatchSet(watchset);
971         }
972
973         private void processEntry(NewKey entry) {
974                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
975
976                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
977
978                 if (oldNewKey != null) {
979                         oldNewKey.setDead();
980                 }
981         }
982
983         private void processEntry(Transaction entry) {
984
985                 long arb = entry.getArbitrator();
986                 Long comLast = lastCommitSeenSeqNumMap.get(arb);
987                 Long abLast = lastAbortSeenSeqNumMap.get(arb);
988
989                 Transaction prevTrans = null;
990
991                 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
992                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
993                 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
994                         prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
995                 } else {
996                         prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
997                 }
998
999                 // Duplicate so delete old copy
1000                 if (prevTrans != null) {
1001                         prevTrans.setDead();
1002                 }
1003         }
1004
1005         private void processEntry(Abort entry) {
1006
1007                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1008                         // Abort has not been seen yet so we need to keep track of it
1009
1010                         Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1011                         if (prevAbort != null) {
1012                                 prevAbort.setDead(); // delete old version of the duplicate
1013                         }
1014                 } else {
1015                         // The machine already saw this so it is dead
1016                         entry.setDead();
1017                 }
1018
1019                 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1020                         lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1021                 }
1022         }
1023
1024         private void processEntry(Commit entry, Slot s) {
1025                 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
1026                 if (prevCommit != null) {
1027                         prevCommit.setDead();
1028                 }
1029         }
1030
1031         private void processEntry(TableStatus entry) {
1032                 int newnumslots = entry.getMaxSlots();
1033                 // updateCurrMaxSize(newnumslots);
1034                 if (lastTableStatus != null)
1035                         lastTableStatus.setDead();
1036                 lastTableStatus = entry;
1037
1038                 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1039                         smallestTableStatusSeen = newnumslots;
1040                 }
1041
1042                 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1043                         largestTableStatusSeen = newnumslots;
1044                 }
1045
1046                 // System.out.println("Table Stat: " + newnumslots + "   large: " + largestTableStatusSeen + "   small: " + smallestTableStatusSeen);
1047         }
1048
1049         private void addWatchList(long machineid, RejectedMessage entry) {
1050                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1051                 if (entries == null)
1052                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1053                 entries.add(entry);
1054         }
1055
1056         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1057                 machineSet.remove(machineid);
1058
1059                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1060                 if (watchset != null) {
1061                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1062                                 RejectedMessage rm = rmit.next();
1063                                 if (rm.getNewSeqNum() <= seqnum) {
1064                                         /* Remove it from our watchlist */
1065                                         rmit.remove();
1066                                         /* Decrement machines that need to see this notification */
1067                                         rm.removeWatcher(machineid);
1068                                 }
1069                         }
1070                 }
1071
1072                 if (machineid == localmachineid) {
1073                         /* Our own messages are immediately dead. */
1074                         if (liveness instanceof LastMessage) {
1075                                 ((LastMessage)liveness).setDead();
1076                         } else if (liveness instanceof Slot) {
1077                                 ((Slot)liveness).setDead();
1078                         } else {
1079                                 throw new Error("Unrecognized type");
1080                         }
1081                 }
1082
1083                 // Set dead the abort
1084                 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1085                         Abort abort = i.next().getValue();
1086
1087                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1088                                 abort.setDead();
1089                                 i.remove();
1090                         }
1091                 }
1092
1093                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1094                 if (lastmsgentry == null)
1095                         return;
1096
1097                 long lastmsgseqnum = lastmsgentry.getFirst();
1098                 Liveness lastentry = lastmsgentry.getSecond();
1099                 if (machineid != localmachineid) {
1100                         if (lastentry instanceof LastMessage) {
1101                                 ((LastMessage)lastentry).setDead();
1102                         } else if (lastentry instanceof Slot) {
1103                                 ((Slot)lastentry).setDead();
1104                         } else {
1105                                 throw new Error("Unrecognized type");
1106                         }
1107                 }
1108
1109                 if (machineid == localmachineid) {
1110                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1111                                 throw new Error("Server Error: Mismatch on local machine sequence number");
1112                 } else {
1113                         if (lastmsgseqnum > seqnum)
1114                                 throw new Error("Server Error: Rollback on remote machine sequence number");
1115                 }
1116         }
1117
1118         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1119                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1120                 for (Entry entry : slot.getEntries()) {
1121                         switch (entry.getType()) {
1122
1123                         case Entry.TypeNewKey:
1124                                 processEntry((NewKey)entry);
1125                                 break;
1126
1127                         case Entry.TypeCommit:
1128                                 processEntry((Commit)entry, slot);
1129                                 break;
1130
1131                         case Entry.TypeAbort:
1132                                 processEntry((Abort)entry);
1133                                 break;
1134
1135                         case Entry.TypeTransaction:
1136                                 processEntry((Transaction)entry);
1137                                 break;
1138
1139                         case Entry.TypeLastMessage:
1140                                 processEntry((LastMessage)entry, machineSet);
1141                                 break;
1142
1143                         case Entry.TypeRejectedMessage:
1144                                 processEntry((RejectedMessage)entry, indexer);
1145                                 break;
1146
1147                         case Entry.TypeTableStatus:
1148                                 processEntry((TableStatus)entry);
1149                                 break;
1150
1151                         default:
1152                                 throw new Error("Unrecognized type: " + entry.getType());
1153                         }
1154                 }
1155         }
1156
1157         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1158                 for (int i = 0; i < newslots.length; i++) {
1159                         Slot currslot = newslots[i];
1160                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1161                         if (prevslot != null &&
1162                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1163                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1164                 }
1165         }
1166 }