Commits working, Transactions Working, Arbitrations working, Still needs a lot of...
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
1 package iotcloud;
2 import java.util.HashMap;
3 import java.util.Map;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.Collection;
15 import java.util.Collections;
16
17
18 /**
19  * IoTTable data structure.  Provides client inferface.
20  * @author Brian Demsky
21  * @version 1.0
22  */
23
24 final public class Table {
25         private int numslots;   //number of slots stored in buffer
26
27         //table of key-value pairs
28         //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
29
30         // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31         private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
32         // machine id -> ...
33         private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34         private Vector<Long> rejectedmessagelist = new Vector<Long>();
35         private SlotBuffer buffer;
36         private CloudComm cloud;
37         private long sequencenumber; //Largest sequence number a client has received
38         private long localmachineid;
39         private TableStatus lastTableStatus;
40         static final int FREE_SLOTS = 10; //number of slots that should be kept free
41         static final int SKIP_THRESHOLD = 10;
42         public long liveslotcount = 0;  // TODO:  MAKE PRIVATE
43         private int chance;
44         static final double RESIZE_MULTIPLE = 1.2;
45         static final double RESIZE_THRESHOLD = 0.75;
46         static final int REJECTED_THRESHOLD = 5;
47         public int resizethreshold;     // TODO:  MAKE PRIVATE
48         private long lastliveslotseqn;  //smallest sequence number with a live entry
49         private Random random = new Random();
50         private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
51
52         private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53         private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54         private List<Commit> commitList = null; // List of all the most recent live commits
55         private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
56
57         private Set<Abort> abortSet = null; // Set of the live aborts
58         public  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV       TODO: Make Private
59         private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
60         public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
61         private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
62         private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
63         // private Set<Abort> arbitratorTable = null; // Table of arbitrators
64         private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
65
66
67
68         public Table(String baseurl, String password, long _localmachineid) {
69                 localmachineid = _localmachineid;
70                 buffer = new SlotBuffer();
71                 numslots = buffer.capacity();
72                 setResizeThreshold();
73                 sequencenumber = 0;
74                 cloud = new CloudComm(this, baseurl, password);
75                 lastliveslotseqn = 1;
76
77                 setupDataStructs();
78         }
79
80         public Table(CloudComm _cloud, long _localmachineid) {
81                 localmachineid = _localmachineid;
82                 buffer = new SlotBuffer();
83                 numslots = buffer.capacity();
84                 setResizeThreshold();
85                 sequencenumber = 0;
86                 cloud = _cloud;
87
88                 setupDataStructs();
89         }
90
91         private void setupDataStructs() {
92                 pendingTransQueue = new LinkedList<PendingTransaction>();
93                 commitList = new LinkedList<Commit>();
94                 abortSet = new HashSet<Abort>();
95                 commitedTable = new HashMap<IoTString, KeyValue>();
96                 speculativeTable = new HashMap<IoTString, KeyValue>();
97                 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
98                 arbitratorTable = new HashMap<IoTString, Long>();
99                 newKeyTable = new HashMap<IoTString, NewKey>();
100                 newCommitMap = new HashMap<Long, Commit> ();
101         }
102
103         public void rebuild() {
104                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
105                 validateandupdate(newslots, true);
106         }
107
108         // TODO: delete method
109         public void printSlots() {
110                 long o = buffer.getOldestSeqNum();
111                 long n = buffer.getNewestSeqNum();
112
113                 int[] types = new int[10];
114
115                 int num = 0;
116
117                 int livec = 0;
118                 int deadc = 0;
119                 for (long i = o; i < (n + 1); i++) {
120                         Slot s = buffer.getSlot(i);
121
122                         Vector<Entry> entries = s.getEntries();
123
124                         for (Entry e : entries) {
125                                 if (e.isLive()) {
126                                         int type = e.getType();
127                                         types[type] = types[type] + 1;
128                                         num++;
129                                         livec++;
130                                 } else {
131                                         deadc++;
132                                 }
133                         }
134                 }
135
136                 for (int i = 0; i < 10; i++) {
137                         System.out.println(i + "    " + types[i]);
138                 }
139                 System.out.println("Live count:   " + livec);
140                 System.out.println("Dead count:   " + deadc);
141                 System.out.println("Old:   " + o);
142                 System.out.println("New:   " + n);
143                 System.out.println("Size:   " + buffer.size());
144                 System.out.println("Commits Map:   " + commitedTable.size());
145                 System.out.println("Commits List:   " + commitList.size());
146         }
147
148         public IoTString getCommitted(IoTString key) {
149                 KeyValue kv = commitedTable.get(key);
150                 if (kv != null) {
151                         return kv.getValue();
152                 } else {
153                         return null;
154                 }
155         }
156
157         public IoTString getSpeculative(IoTString key) {
158                 KeyValue kv = speculativeTable.get(key);
159                 if (kv != null) {
160                         return kv.getValue();
161                 } else {
162                         return null;
163                 }
164         }
165
166         public void initTable() {
167                 cloud.setSalt();//Set the salt
168                 Slot s = new Slot(this, 1, localmachineid);
169                 TableStatus status = new TableStatus(s, numslots);
170                 s.addEntry(status);
171                 Slot[] array = cloud.putSlot(s, numslots);
172                 if (array == null) {
173                         array = new Slot[] {s};
174                         /* update data structure */
175                         validateandupdate(array, true);
176                 } else {
177                         throw new Error("Error on initialization");
178                 }
179         }
180
181         public String toString() {
182                 String retString = " Committed Table: \n";
183                 retString += "---------------------------\n";
184                 retString += commitedTable.toString();
185
186                 retString += "\n\n";
187
188                 retString += " Speculative Table: \n";
189                 retString += "---------------------------\n";
190                 retString += speculativeTable.toString();
191
192                 return retString;
193         }
194
195         public void startTransaction() {
196                 // Create a new transaction, invalidates any old pending transactions.
197                 pendingTransBuild = new PendingTransaction();
198         }
199
200         public void commitTransaction() {
201
202                 if (pendingTransBuild.getKVUpdates().size() == 0) {
203                         // If no updates are made then there is no point inserting into the chain
204                         return;
205                 }
206
207                 // Add the pending transaction to the queue
208                 pendingTransQueue.add(pendingTransBuild);
209
210                 while (!pendingTransQueue.isEmpty()) {
211                         if (tryput( pendingTransQueue.peek(), false)) {
212                                 pendingTransQueue.poll();
213                         }
214                 }
215         }
216
217         public void addKV(IoTString key, IoTString value) {
218
219                 if (arbitratorTable.get(key) == null) {
220                         throw new Error("Key not Found.");
221                 }
222
223                 // Make sure new key value pair matches the current arbitrator
224                 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
225                         // TODO: Maybe not throw en error
226                         throw new Error("Not all Key Values Match.");
227                 }
228
229                 KeyValue kv = new KeyValue(key, value);
230                 pendingTransBuild.addKV(kv);
231         }
232
233         public void addGuard(Guard guard) {
234                 pendingTransBuild.addGuard(guard);
235         }
236
237         public void update() {
238
239                 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
240
241                 validateandupdate(newslots, false);
242
243                 if (uncommittedTransactionsMap.keySet().size() > 0) {
244
245                         boolean doEnd = false;
246                         boolean needResize = false;
247                         while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
248                                 boolean resize = needResize;
249                                 needResize = false;
250
251                                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
252                                 int newsize = 0;
253                                 if (liveslotcount > resizethreshold) {
254                                         resize = true; //Resize is forced
255                                 }
256
257                                 if (resize) {
258                                         newsize = (int) (numslots * RESIZE_MULTIPLE);
259                                         TableStatus status = new TableStatus(s, newsize);
260                                         s.addEntry(status);
261                                 }
262
263                                 doRejectedMessages(s);
264
265                                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
266
267                                 // Resize was needed so redo call
268                                 if (retTup.getFirst()) {
269                                         needResize = true;
270                                         continue;
271                                 }
272
273                                 // Extract working variables
274                                 boolean seenliveslot = retTup.getSecond();
275                                 long seqn = retTup.getThird();
276
277                                 // Did need to arbitrate
278                                 doEnd = !doArbitration(s);
279
280                                 doOptionalRescue(s, seenliveslot, seqn, resize);
281
282                                 int max = 0;
283                                 if (resize) {
284                                         max = newsize;
285                                 }
286
287                                 Slot[] array = cloud.putSlot(s, max);
288                                 if (array == null) {
289                                         array = new Slot[] {s};
290                                         rejectedmessagelist.clear();
291                                 }       else {
292                                         if (array.length == 0)
293                                                 throw new Error("Server Error: Did not send any slots");
294                                         rejectedmessagelist.add(s.getSequenceNumber());
295                                         doEnd = false;
296                                 }
297
298                                 /* update data structure */
299                                 validateandupdate(array, true);
300                         }
301                 }
302         }
303
304         public boolean createNewKey(IoTString keyName, long machineId) {
305
306                 while (true) {
307                         if (arbitratorTable.get(keyName) != null) {
308                                 // There is already an arbitrator
309                                 return false;
310                         }
311
312                         if (tryput(keyName, machineId, false)) {
313
314                                 // If successfully inserted
315                                 return true;
316                         }
317                 }
318         }
319
320         public void decrementLiveCount() {
321                 liveslotcount--;
322                 // System.out.println("Decrement Live Count");
323         }
324
325         private void setResizeThreshold() {
326                 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
327                 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
328         }
329
330         private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
331                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
332
333                 int newsize = 0;
334                 if (liveslotcount > resizethreshold) {
335                         resize = true; //Resize is forced
336                         System.out.println("Live count resize: " + liveslotcount + "   " + resizethreshold);
337
338                 }
339
340                 if (resize) {
341                         newsize = (int) (numslots * RESIZE_MULTIPLE);
342
343                         System.out.println("New Size:  " + newsize + "  old: " + buffer.oldestseqn); // TODO remove
344
345                         TableStatus status = new TableStatus(s, newsize);
346                         s.addEntry(status);
347                 }
348
349                 doRejectedMessages(s);
350
351
352
353                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
354
355                 // Resize was needed so redo call
356                 if (retTup.getFirst()) {
357                         return tryput(pendingTrans, true);
358                 }
359
360                 // Extract working variables
361                 boolean seenliveslot = retTup.getSecond();
362                 long seqn = retTup.getThird();
363
364                 doArbitration(s);
365
366                 Transaction trans = new Transaction(s,
367                                                     s.getSequenceNumber(),
368                                                     localmachineid,
369                                                     pendingTrans.getKVUpdates(),
370                                                     pendingTrans.getGuard());
371                 boolean insertedTrans = false;
372                 if (s.hasSpace(trans)) {
373                         s.addEntry(trans);
374                         insertedTrans = true;
375                 }
376
377                 doOptionalRescue(s, seenliveslot, seqn, resize);
378                 insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
379
380                 if (insertedTrans) {
381                         // System.out.println("Inserted: " + trans.getSequenceNumber());
382                 }
383
384                 return insertedTrans;
385         }
386
387         private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
388                 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
389                 int newsize = 0;
390                 if (liveslotcount > resizethreshold) {
391                         resize = true; //Resize is forced
392                 }
393
394                 if (resize) {
395                         newsize = (int) (numslots * RESIZE_MULTIPLE);
396                         TableStatus status = new TableStatus(s, newsize);
397                         s.addEntry(status);
398                 }
399
400                 doRejectedMessages(s);
401                 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
402
403                 // Resize was needed so redo call
404                 if (retTup.getFirst()) {
405                         return tryput(keyName, arbMachineid, true);
406                 }
407
408                 // Extract working variables
409                 boolean seenliveslot = retTup.getSecond();
410                 long seqn = retTup.getThird();
411
412
413                 doArbitration(s);
414
415                 NewKey newKey = new NewKey(s, keyName, arbMachineid);
416
417                 boolean insertedNewKey = false;
418                 if (s.hasSpace(newKey)) {
419                         s.addEntry(newKey);
420                         insertedNewKey = true;
421                 }
422
423                 doOptionalRescue(s, seenliveslot, seqn, resize);
424                 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
425         }
426
427         private void doRejectedMessages(Slot s) {
428                 if (! rejectedmessagelist.isEmpty()) {
429                         /* TODO: We should avoid generating a rejected message entry if
430                          * there is already a sufficient entry in the queue (e.g.,
431                          * equalsto value of true and same sequence number).  */
432
433                         long old_seqn = rejectedmessagelist.firstElement();
434                         if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
435                                 long new_seqn = rejectedmessagelist.lastElement();
436                                 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
437                                 s.addEntry(rm);
438                         } else {
439                                 long prev_seqn = -1;
440                                 int i = 0;
441                                 /* Go through list of missing messages */
442                                 for (; i < rejectedmessagelist.size(); i++) {
443                                         long curr_seqn = rejectedmessagelist.get(i);
444                                         Slot s_msg = buffer.getSlot(curr_seqn);
445                                         if (s_msg != null)
446                                                 break;
447                                         prev_seqn = curr_seqn;
448                                 }
449                                 /* Generate rejected message entry for missing messages */
450                                 if (prev_seqn != -1) {
451                                         RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
452                                         s.addEntry(rm);
453                                 }
454                                 /* Generate rejected message entries for present messages */
455                                 for (; i < rejectedmessagelist.size(); i++) {
456                                         long curr_seqn = rejectedmessagelist.get(i);
457                                         Slot s_msg = buffer.getSlot(curr_seqn);
458                                         long machineid = s_msg.getMachineID();
459                                         RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
460                                         s.addEntry(rm);
461                                 }
462                         }
463                 }
464         }
465
466         private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
467                 long newestseqnum = buffer.getNewestSeqNum();
468                 long oldestseqnum = buffer.getOldestSeqNum();
469                 if (lastliveslotseqn < oldestseqnum)
470                         lastliveslotseqn = oldestseqnum;
471
472                 long seqn = lastliveslotseqn;
473                 boolean seenliveslot = false;
474                 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
475                 long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
476
477
478                 // Mandatory Rescue
479                 for (; seqn < threshold; seqn++) {
480                         Slot prevslot = buffer.getSlot(seqn);
481                         // Push slot number forward
482                         if (! seenliveslot)
483                                 lastliveslotseqn = seqn;
484
485                         if (! prevslot.isLive())
486                                 continue;
487                         seenliveslot = true;
488                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
489                         for (Entry liveentry : liveentries) {
490                                 if (s.hasSpace(liveentry)) {
491                                         s.addEntry(liveentry);
492                                 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
493                                         if (!resize) {
494                                                 System.out.println("B"); //?
495
496                                                 System.out.println("==============================NEEEEDDDD RESIZING");
497                                                 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
498                                         }
499                                 }
500                         }
501                 }
502
503                 // Did not resize
504                 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
505         }
506
507         private boolean doArbitration(Slot s) {
508                 // Arbitrate
509                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
510
511                 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
512
513                 // Sort from oldest to newest
514                 Collections.sort(transSeqNums);
515
516
517                 boolean didNeedArbitration = false;
518                 for (Long transNum : transSeqNums) {
519                         Transaction ut = uncommittedTransactionsMap.get(transNum);
520
521                         KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
522                         // Check if this machine arbitrates for this transaction
523                         if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
524                                 continue;
525                         }
526
527                         // we did have something to arbitrate on
528                         didNeedArbitration = true;
529
530                         Entry newEntry = null;
531
532                         try {
533                                 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
534                                         // Guard evaluated as true
535
536                                         // update the local tmp current key set
537                                         for (KeyValue kv : ut.getkeyValueUpdateSet()) {
538                                                 speculativeTableTmp.put(kv.getKey(), kv);
539                                         }
540
541                                         // create the commit
542                                         newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
543                                 } else {
544                                         // Guard was false
545
546                                         // create the abort
547                                         newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
548                                 }
549                         } catch (Exception e) {
550                                 e.printStackTrace();
551                         }
552
553                         if ((newEntry != null) && s.hasSpace(newEntry)) {
554                                 s.addEntry(newEntry);
555                         } else {
556                                 break;
557                         }
558                 }
559
560                 return didNeedArbitration;
561         }
562
563         private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
564                 /* now go through live entries from least to greatest sequence number until
565                  * either all live slots added, or the slot doesn't have enough room
566                  * for SKIP_THRESHOLD consecutive entries*/
567                 int skipcount = 0;
568                 long newestseqnum = buffer.getNewestSeqNum();
569                 search:
570                 for (; seqn <= newestseqnum; seqn++) {
571                         Slot prevslot = buffer.getSlot(seqn);
572                         //Push slot number forward
573                         if (!seenliveslot)
574                                 lastliveslotseqn = seqn;
575
576                         if (!prevslot.isLive())
577                                 continue;
578                         seenliveslot = true;
579                         Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
580                         for (Entry liveentry : liveentries) {
581                                 if (s.hasSpace(liveentry))
582                                         s.addEntry(liveentry);
583                                 else {
584                                         skipcount++;
585                                         if (skipcount > SKIP_THRESHOLD)
586                                                 break search;
587                                 }
588                         }
589                 }
590         }
591
592         private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
593                 int max = 0;
594                 if (resize)
595                         max = newsize;
596                 Slot[] array = cloud.putSlot(s, max);
597                 if (array == null) {
598                         array = new Slot[] {s};
599                         rejectedmessagelist.clear();
600                 }       else {
601                         if (array.length == 0)
602                                 throw new Error("Server Error: Did not send any slots");
603                         rejectedmessagelist.add(s.getSequenceNumber());
604                         inserted = false;
605                 }
606
607                 /* update data structure */
608                 validateandupdate(array, true);
609
610                 return inserted;
611         }
612
613         private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
614                 /* The cloud communication layer has checked slot HMACs already
615                          before decoding */
616                 if (newslots.length == 0) return;
617
618                 long firstseqnum = newslots[0].getSequenceNumber();
619                 if (firstseqnum <= sequencenumber) {
620                         throw new Error("Server Error: Sent older slots!");
621                 }
622
623                 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
624                 checkHMACChain(indexer, newslots);
625
626                 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
627
628                 initExpectedSize(firstseqnum);
629                 for (Slot slot : newslots) {
630                         processSlot(indexer, slot, acceptupdatestolocal, machineSet);
631                         updateExpectedSize();
632                 }
633
634                 proccessAllNewCommits();
635
636                 /* If there is a gap, check to see if the server sent us everything. */
637                 if (firstseqnum != (sequencenumber + 1)) {
638
639                         // TODO: Check size
640                         checkNumSlots(newslots.length);
641                         if (!machineSet.isEmpty()) {
642                                 throw new Error("Missing record for machines: " + machineSet);
643                         }
644                 }
645
646                 commitNewMaxSize();
647
648                 /* Commit new to slots. */
649                 for (Slot slot : newslots) {
650                         buffer.putSlot(slot);
651                         liveslotcount++;
652                 }
653                 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
654
655                 // Speculate on key value pairs
656                 createSpeculativeTable();
657         }
658
659         public void proccessAllNewCommits() {
660
661                 // Process only if there are commit
662                 if (newCommitMap.keySet().size() == 0) {
663                         return;
664                 }
665
666                 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
667
668                 // Sort from oldest to newest commit
669                 Collections.sort(commitSeqNums);
670
671                 // Go through each new commit one by one
672                 for (Long entrySeqNum : commitSeqNums) {
673                         Commit entry = newCommitMap.get(entrySeqNum);
674
675                         if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
676
677                                 // Remove any old commits
678                                 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
679                                         Commit prevcommit = i.next();
680
681                                         if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
682                                                 prevcommit.setDead();
683                                                 i.remove();
684                                         }
685                                 }
686                                 commitList.add(entry);
687                                 continue;
688                         }
689
690                         // Remove any old commits
691                         for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
692                                 Commit prevcommit = i.next();
693                                 prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
694
695                                 if (!prevcommit.isLive()) {
696                                         i.remove();
697                                 }
698                         }
699
700                         // Add the new commit
701                         commitList.add(entry);
702                         lastCommitSeenSeqNum = entry.getTransSequenceNumber();
703                         // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
704
705
706                         // Update the committed table list
707                         for (KeyValue kv : entry.getkeyValueUpdateSet()) {
708                                 IoTString key = kv.getKey();
709                                 commitedTable.put(key, kv);
710                         }
711
712                         long committedTransSeq = entry.getTransSequenceNumber();
713
714                         // Make dead the transactions
715                         for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
716                                 Transaction prevtrans = i.next().getValue();
717
718                                 if (prevtrans.getSequenceNumber() <= committedTransSeq) {
719                                         i.remove();
720                                         prevtrans.setDead();
721                                 }
722                         }
723                 }
724
725
726                 // Clear the new commits storage so we can use it later
727                 newCommitMap.clear();
728         }
729
730         private void createSpeculativeTable() {
731                 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
732                 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
733
734                 // Sort from oldest to newest commit
735                 Collections.sort(utSeqNums);
736
737
738                 for (Long key : utSeqNums) {
739                         Transaction trans = uncommittedTransactionsMap.get(key);
740
741                         try {
742                                 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
743                                         for (KeyValue kv : trans.getkeyValueUpdateSet()) {
744                                                 speculativeTableTmp.put(kv.getKey(), kv);
745                                         }
746                                 }
747
748                         } catch (Exception e) {
749                                 e.printStackTrace();
750                         }
751                 }
752
753                 speculativeTable = speculativeTableTmp;
754         }
755
756         private int expectedsize, currmaxsize;
757
758         private void checkNumSlots(int numslots) {
759                 if (numslots != expectedsize) {
760                         throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
761                 }
762         }
763
764         private void initExpectedSize(long firstsequencenumber) {
765                 long prevslots = firstsequencenumber;
766                 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
767                 currmaxsize = numslots;
768         }
769
770         private void updateExpectedSize() {
771                 expectedsize++;
772                 if (expectedsize > currmaxsize) {
773                         expectedsize = currmaxsize;
774                 }
775         }
776
777         private void updateCurrMaxSize(int newmaxsize) {
778                 currmaxsize = newmaxsize;
779         }
780
781         private void commitNewMaxSize() {
782                 if (numslots != currmaxsize) {
783                         System.out.println("Resizing the buffer"); // TODO: Remove
784                         buffer.resize(currmaxsize);
785                 }
786
787                 numslots = currmaxsize;
788                 setResizeThreshold();
789         }
790
791         private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
792                 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
793         }
794
795         private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
796                 long oldseqnum = entry.getOldSeqNum();
797                 long newseqnum = entry.getNewSeqNum();
798                 boolean isequal = entry.getEqual();
799                 long machineid = entry.getMachineID();
800                 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
801                         Slot slot = indexer.getSlot(seqnum);
802                         if (slot != null) {
803                                 long slotmachineid = slot.getMachineID();
804                                 if (isequal != (slotmachineid == machineid)) {
805                                         throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
806                                 }
807                         }
808                 }
809
810                 HashSet<Long> watchset = new HashSet<Long>();
811                 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
812                         long entry_mid = lastmsg_entry.getKey();
813                         /* We've seen it, don't need to continue to watch.  Our next
814                          * message will implicitly acknowledge it. */
815                         if (entry_mid == localmachineid)
816                                 continue;
817                         Pair<Long, Liveness> v = lastmsg_entry.getValue();
818                         long entry_seqn = v.getFirst();
819                         if (entry_seqn < newseqnum) {
820                                 addWatchList(entry_mid, entry);
821                                 watchset.add(entry_mid);
822                         }
823                 }
824                 if (watchset.isEmpty())
825                         entry.setDead();
826                 else
827                         entry.setWatchSet(watchset);
828         }
829
830         private void processEntry(NewKey entry) {
831                 arbitratorTable.put(entry.getKey(), entry.getMachineID());
832
833                 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
834
835                 if (oldNewKey != null) {
836                         oldNewKey.setDead();
837                 }
838         }
839
840         private void processEntry(Transaction entry) {
841                 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
842
843                 // Duplicate so delete old copy
844                 if (prevTrans != null) {
845                         prevTrans.setDead();
846                 }
847         }
848
849         private void processEntry(Abort entry) {
850
851                 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
852                         // Abort has not been seen yet so we need to keep track of it
853                         abortSet.add(entry);
854                 } else {
855                         // The machine already saw this so it is dead
856                         entry.setDead();
857                 }
858
859                 // Make dead the transactions
860                 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
861                         Transaction prevtrans = i.next().getValue();
862
863                         if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
864                                 i.remove();
865                                 prevtrans.setDead();
866                         }
867                 }
868         }
869
870         private void processEntry(Commit entry, Slot s) {
871                 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
872                 if (prevCommit != null) {
873                         prevCommit.setDead();
874                 }
875         }
876
877         private void processEntry(TableStatus entry) {
878                 int newnumslots = entry.getMaxSlots();
879                 updateCurrMaxSize(newnumslots);
880                 if (lastTableStatus != null)
881                         lastTableStatus.setDead();
882                 lastTableStatus = entry;
883         }
884
885
886         private void addWatchList(long machineid, RejectedMessage entry) {
887                 HashSet<RejectedMessage> entries = watchlist.get(machineid);
888                 if (entries == null)
889                         watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
890                 entries.add(entry);
891         }
892
893         private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
894                 machineSet.remove(machineid);
895
896                 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
897                 if (watchset != null) {
898                         for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
899                                 RejectedMessage rm = rmit.next();
900                                 if (rm.getNewSeqNum() <= seqnum) {
901                                         /* Remove it from our watchlist */
902                                         rmit.remove();
903                                         /* Decrement machines that need to see this notification */
904                                         rm.removeWatcher(machineid);
905                                 }
906                         }
907                 }
908
909                 if (machineid == localmachineid) {
910                         /* Our own messages are immediately dead. */
911                         if (liveness instanceof LastMessage) {
912                                 ((LastMessage)liveness).setDead();
913                         } else if (liveness instanceof Slot) {
914                                 ((Slot)liveness).setDead();
915                         } else {
916                                 throw new Error("Unrecognized type");
917                         }
918                 }
919
920                 // Set dead the abort
921                 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
922                         Abort abort = ait.next();
923
924                         if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
925                                 abort.setDead();
926                                 ait.remove();
927                         }
928                 }
929
930
931                 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
932                 if (lastmsgentry == null)
933                         return;
934
935                 long lastmsgseqnum = lastmsgentry.getFirst();
936                 Liveness lastentry = lastmsgentry.getSecond();
937                 if (machineid != localmachineid) {
938                         if (lastentry instanceof LastMessage) {
939                                 ((LastMessage)lastentry).setDead();
940                         } else if (lastentry instanceof Slot) {
941                                 ((Slot)lastentry).setDead();
942                         } else {
943                                 throw new Error("Unrecognized type");
944                         }
945                 }
946
947                 if (machineid == localmachineid) {
948                         if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
949                                 throw new Error("Server Error: Mismatch on local machine sequence number");
950                 } else {
951                         if (lastmsgseqnum > seqnum)
952                                 throw new Error("Server Error: Rollback on remote machine sequence number");
953                 }
954         }
955
956         private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
957                 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
958                 for (Entry entry : slot.getEntries()) {
959                         switch (entry.getType()) {
960
961                         case Entry.TypeNewKey:
962                                 processEntry((NewKey)entry);
963                                 break;
964
965                         case Entry.TypeCommit:
966                                 processEntry((Commit)entry, slot);
967                                 break;
968
969                         case Entry.TypeAbort:
970                                 processEntry((Abort)entry);
971                                 break;
972
973                         case Entry.TypeTransaction:
974                                 processEntry((Transaction)entry);
975                                 break;
976
977                         case Entry.TypeLastMessage:
978                                 processEntry((LastMessage)entry, machineSet);
979                                 break;
980
981                         case Entry.TypeRejectedMessage:
982                                 processEntry((RejectedMessage)entry, indexer);
983                                 break;
984
985                         case Entry.TypeTableStatus:
986                                 processEntry((TableStatus)entry);
987                                 break;
988
989                         default:
990                                 throw new Error("Unrecognized type: " + entry.getType());
991                         }
992                 }
993         }
994
995         private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
996                 for (int i = 0; i < newslots.length; i++) {
997                         Slot currslot = newslots[i];
998                         Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
999                         if (prevslot != null &&
1000                                 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1001                                 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
1002                 }
1003         }
1004 }