2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.nio.ByteBuffer;
20 * IoTTable data structure. Provides client inferface.
21 * @author Brian Demsky
25 final public class Table {
26 private int numslots; //number of slots stored in buffer
28 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
29 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
31 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
32 private Vector<Long> rejectedmessagelist = new Vector<Long>();
33 private SlotBuffer buffer;
34 private CloudComm cloud;
35 private long sequencenumber; //Largest sequence number a client has received
36 private long localmachineid;
37 private TableStatus lastTableStatus;
38 static final int FREE_SLOTS = 10; //number of slots that should be kept free
39 static final int SKIP_THRESHOLD = 10;
40 private long liveslotcount = 0;
42 static final double RESIZE_MULTIPLE = 1.2;
43 static final double RESIZE_THRESHOLD = 0.75;
44 static final int REJECTED_THRESHOLD = 5;
45 private int resizethreshold;
46 private long lastliveslotseqn; //smallest sequence number with a live entry
47 private Random random = new Random();
48 private long lastUncommittedTransaction = 0;
50 private int smallestTableStatusSeen = -1;
51 private int largestTableStatusSeen = -1;
52 private int lastSeenPendingTransactionSpeculateIndex = 0;
53 private int commitSequenceNumber = 0;
54 private long localTransactionSequenceNumber = 0;
56 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
57 private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
58 private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
59 private Map<Long, Abort> abortMap = null; // Set of the live aborts
60 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
61 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
62 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
63 private Map<Long, Transaction> uncommittedTransactionsMap = null;
64 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
65 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
66 private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
67 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68 private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
69 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
70 private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
71 private List<Commit> pendingCommitsList = null;
72 private List<Commit> pendingCommitsToDelete = null;
73 private Map<Long, LocalComm> localCommunicationChannels;
74 private Map<Long, TransactionStatus> transactionStatusMap = null;
77 public Table(String hostname, String baseurl, String password, long _localmachineid) {
78 localmachineid = _localmachineid;
79 buffer = new SlotBuffer();
80 numslots = buffer.capacity();
83 cloud = new CloudComm(this, hostname, baseurl, password);
89 public Table(CloudComm _cloud, long _localmachineid) {
90 localmachineid = _localmachineid;
91 buffer = new SlotBuffer();
92 numslots = buffer.capacity();
100 private void setupDataStructs() {
101 pendingTransQueue = new LinkedList<PendingTransaction>();
102 commitMap = new HashMap<Long, Map<Long, Commit>>();
103 abortMap = new HashMap<Long, Abort>();
104 committedMapByKey = new HashMap<IoTString, Commit>();
105 commitedTable = new HashMap<IoTString, KeyValue>();
106 speculativeTable = new HashMap<IoTString, KeyValue>();
107 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
108 arbitratorTable = new HashMap<IoTString, Long>();
109 newKeyTable = new HashMap<IoTString, NewKey>();
110 newCommitMap = new HashMap<Long, Map<Long, Commit>>();
111 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
112 lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
113 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
114 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
115 pendingCommitsList = new LinkedList<Commit>();
116 pendingCommitsToDelete = new LinkedList<Commit>();
117 localCommunicationChannels = new HashMap<Long, LocalComm>();
118 transactionStatusMap = new HashMap<Long, TransactionStatus>();
121 public void initTable() throws ServerException {
122 cloud.setSalt();//Set the salt
123 Slot s = new Slot(this, 1, localmachineid);
124 TableStatus status = new TableStatus(s, numslots);
126 Slot[] array = cloud.putSlot(s, numslots);
128 array = new Slot[] {s};
129 /* update data structure */
130 validateandupdate(array, true);
132 throw new Error("Error on initialization");
136 public void rebuild() throws ServerException {
137 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
138 validateandupdate(newslots, true);
141 // TODO: delete method
142 public void printSlots() {
143 long o = buffer.getOldestSeqNum();
144 long n = buffer.getNewestSeqNum();
146 int[] types = new int[10];
152 for (long i = o; i < (n + 1); i++) {
153 Slot s = buffer.getSlot(i);
155 Vector<Entry> entries = s.getEntries();
157 for (Entry e : entries) {
159 int type = e.getType();
160 types[type] = types[type] + 1;
169 for (int i = 0; i < 10; i++) {
170 System.out.println(i + " " + types[i]);
172 System.out.println("Live count: " + livec);
173 System.out.println("Dead count: " + deadc);
174 System.out.println("Old: " + o);
175 System.out.println("New: " + n);
176 System.out.println("Size: " + buffer.size());
177 System.out.println("Commits Key Map: " + commitedTable.size());
178 // System.out.println("Commits Live Map: " + commitMap.size());
179 System.out.println("Pending: " + pendingTransQueue.size());
181 // List<IoTString> strList = new ArrayList<IoTString>();
182 // for (int i = 0; i < 100; i++) {
183 // String keyA = "a" + i;
184 // String keyB = "b" + i;
185 // String keyC = "c" + i;
186 // String keyD = "d" + i;
188 // IoTString iKeyA = new IoTString(keyA);
189 // IoTString iKeyB = new IoTString(keyB);
190 // IoTString iKeyC = new IoTString(keyC);
191 // IoTString iKeyD = new IoTString(keyD);
193 // strList.add(iKeyA);
194 // strList.add(iKeyB);
195 // strList.add(iKeyC);
196 // strList.add(iKeyD);
200 // for (Long l : commitMap.keySet()) {
201 // for (Long l2 : commitMap.get(l).keySet()) {
202 // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
203 // strList.remove(kv.getKey());
204 // System.out.print(kv.getKey() + " ");
209 // System.out.println();
210 // System.out.println();
212 // for (IoTString s : strList) {
213 // System.out.print(s + " ");
215 // System.out.println();
216 // System.out.println(strList.size());
219 public long getId() {
220 return localmachineid;
223 public boolean hasConnection() {
224 return cloud.hasConnection();
227 public String toString() {
228 String retString = " Committed Table: \n";
229 retString += "---------------------------\n";
230 retString += commitedTable.toString();
234 retString += " Speculative Table: \n";
235 retString += "---------------------------\n";
236 retString += speculativeTable.toString();
241 public void addLocalComm(long machineId, LocalComm lc) {
242 localCommunicationChannels.put(machineId, lc);
244 public Long getArbitrator(IoTString key) {
245 return arbitratorTable.get(key);
248 public IoTString getCommitted(IoTString key) {
249 KeyValue kv = commitedTable.get(key);
251 return kv.getValue();
257 public IoTString getSpeculative(IoTString key) {
258 KeyValue kv = pendingTransSpeculativeTable.get(key);
261 kv = speculativeTable.get(key);
265 kv = commitedTable.get(key);
269 return kv.getValue();
275 public IoTString getCommittedAtomic(IoTString key) {
276 KeyValue kv = commitedTable.get(key);
278 if (arbitratorTable.get(key) == null) {
279 throw new Error("Key not Found.");
282 // Make sure new key value pair matches the current arbitrator
283 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
284 // TODO: Maybe not throw en error
285 throw new Error("Not all Key Values Match Arbitrator.");
289 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
290 return kv.getValue();
292 pendingTransBuild.addKVGuard(new KeyValue(key, null));
297 public IoTString getSpeculativeAtomic(IoTString key) {
299 if (arbitratorTable.get(key) == null) {
300 throw new Error("Key not Found.");
303 // Make sure new key value pair matches the current arbitrator
304 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
305 // TODO: Maybe not throw en error
306 throw new Error("Not all Key Values Match Arbitrator.");
309 KeyValue kv = pendingTransSpeculativeTable.get(key);
312 kv = speculativeTable.get(key);
316 kv = commitedTable.get(key);
320 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
321 return kv.getValue();
323 pendingTransBuild.addKVGuard(new KeyValue(key, null));
328 public void update() {
330 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
331 validateandupdate(newslots, false);
333 if (!pendingTransQueue.isEmpty()) {
335 // We have a pending transaction so do full insertion
336 processPendingTrans();
339 // We dont have a pending transaction so do minimal effort
340 updateWithNotPendingTrans();
343 } catch (Exception e) {
344 // could not update so do nothing
348 public void startTransaction() {
349 // Create a new transaction, invalidates any old pending transactions.
350 pendingTransBuild = new PendingTransaction();
353 public void addKV(IoTString key, IoTString value) {
355 if (arbitratorTable.get(key) == null) {
356 throw new Error("Key not Found.");
359 // Make sure new key value pair matches the current arbitrator
360 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
361 // TODO: Maybe not throw en error
362 throw new Error("Not all Key Values Match Arbitrator.");
365 KeyValue kv = new KeyValue(key, value);
366 pendingTransBuild.addKV(kv);
369 public TransactionStatus commitTransaction() {
371 if (pendingTransBuild.getKVUpdates().size() == 0) {
373 // transaction with no updates will have no effect on the system
374 return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
377 TransactionStatus transStatus = null;
379 if (pendingTransBuild.getArbitrator() != localmachineid) {
381 // set the local sequence number so we can recognize this transaction later
382 pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
383 localTransactionSequenceNumber++;
385 transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
386 transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
388 // Add the pending transaction to the queue
389 pendingTransQueue.add(pendingTransBuild);
392 for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
393 PendingTransaction pt = pendingTransQueue.get(i);
395 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
397 lastSeenPendingTransactionSpeculateIndex = i;
399 for (KeyValue kv : pt.getKVUpdates()) {
400 pendingTransSpeculativeTable.put(kv.getKey(), kv);
406 Transaction ut = new Transaction(null,
409 pendingTransBuild.getArbitrator(),
410 pendingTransBuild.getKVUpdates(),
411 pendingTransBuild.getKVGuard());
413 Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
415 if (retData.getFirst()) {
416 transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
418 transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
422 // Try to insert transactions if possible
423 if (!pendingTransQueue.isEmpty()) {
424 // We have a pending transaction so do full insertion
425 processPendingTrans();
428 // We dont have a pending transaction so do minimal effort
429 updateWithNotPendingTrans();
430 } catch (Exception e) {
435 // reset it so next time is fresh
436 pendingTransBuild = new PendingTransaction();
441 public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
444 if (arbitratorTable.get(keyName) != null) {
445 // There is already an arbitrator
449 if (tryput(keyName, machineId, false)) {
450 // If successfully inserted
456 private void processPendingTrans() {
458 boolean sentAllPending = false;
460 while (!pendingTransQueue.isEmpty()) {
461 if (tryput( pendingTransQueue.peek(), false)) {
462 pendingTransQueue.poll();
466 // if got here then all pending transactions were sent
467 sentAllPending = true;
468 } catch (Exception e) {
469 // There was a connection error
471 sentAllPending = false;
475 if (!sentAllPending) {
477 for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
478 PendingTransaction pt = i.next();
479 LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
481 // Cant talk directly to arbitrator so cant do anything
486 Transaction ut = new Transaction(null,
489 pendingTransBuild.getArbitrator(),
490 pendingTransBuild.getKVUpdates(),
491 pendingTransBuild.getKVGuard());
494 Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
496 for (Commit commit : retData.getSecond()) {
497 // Prepare to process the commit
498 processEntry(commit);
501 boolean didCommitOrSpeculate = proccessAllNewCommits();
503 // Go through all uncommitted transactions and kill the ones that are dead
504 deleteDeadUncommittedTransactions();
506 // Speculate on key value pairs
507 didCommitOrSpeculate |= createSpeculativeTable();
508 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
511 if (retData.getFirst()) {
512 TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
513 if (transStatus != null) {
514 transStatus.setStatus(TransactionStatus.StatusCommitted);
518 TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
519 if (transStatus != null) {
520 transStatus.setStatus(TransactionStatus.StatusAborted);
527 private void updateWithNotPendingTrans() throws ServerException {
529 boolean doEnd = false;
530 boolean needResize = false;
531 while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
532 boolean resize = needResize;
535 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
537 if (liveslotcount > resizethreshold) {
538 resize = true; //Resize is forced
542 newsize = (int) (numslots * RESIZE_MULTIPLE);
543 TableStatus status = new TableStatus(s, newsize);
547 doRejectedMessages(s);
549 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
551 // Resize was needed so redo call
552 if (retTup.getFirst()) {
557 // Extract working variables
558 boolean seenliveslot = retTup.getSecond();
559 long seqn = retTup.getThird();
561 // Did need to arbitrate
562 doEnd = !doArbitration(s);
564 doOptionalRescue(s, seenliveslot, seqn, resize);
571 Slot[] array = cloud.putSlot(s, max);
573 array = new Slot[] {s};
574 rejectedmessagelist.clear();
576 // Delete pending commits that were sent to the cloud
577 deletePendingCommits();
580 if (array.length == 0)
581 throw new Error("Server Error: Did not send any slots");
582 rejectedmessagelist.add(s.getSequenceNumber());
586 /* update data structure */
587 validateandupdate(array, true);
591 private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
593 // encode the request
594 byte[] array = new byte[Long.BYTES + ut.getSize()];
595 ByteBuffer bbEncode = ByteBuffer.wrap(array);
596 Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
597 if (lastSeenCommit != null) {
598 bbEncode.putLong(lastSeenCommit);
604 byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
607 ByteBuffer bbDecode = ByteBuffer.wrap(data);
608 boolean didCommit = bbDecode.get() == 1;
609 int numberOfCommites = bbDecode.getInt();
611 List<Commit> newCommits = new LinkedList<Commit>();
612 for (int i = 0; i < numberOfCommites; i++ ) {
614 Commit com = (Commit)Commit.decode(null, bbDecode);
618 return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
621 public byte[] localCommInput(byte[] data) {
624 ByteBuffer bbDecode = ByteBuffer.wrap(data);
625 long lastSeenCommit = bbDecode.getLong();
627 Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
629 // Do the local update and arbitrate
630 Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
632 // Calculate the size of the response
633 int size = Byte.BYTES + Integer.BYTES;
634 for (Commit com : returnData.getSecond()) {
635 size += com.getSize();
638 // encode the response
639 byte[] array = new byte[size];
640 ByteBuffer bbEncode = ByteBuffer.wrap(array);
641 if (returnData.getFirst()) {
642 bbEncode.put((byte)1);
644 bbEncode.put((byte)0);
646 bbEncode.putInt(returnData.getSecond().size());
648 for (Commit com : returnData.getSecond()) {
649 com.encode(bbEncode);
652 return bbEncode.array();
655 private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
657 if (ut.getArbitrator() != localmachineid) {
658 // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
662 List<Commit> returnCommits = new ArrayList<Commit>();
664 if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
665 // There is a commit that the other client has not seen yet
667 Map<Long, Commit> cm = commitMap.get(localmachineid);
670 List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
671 Collections.sort(commitKeys);
674 for (int i = (commitKeys.size() - 1); i >= 0; i--) {
675 Commit com = cm.get(commitKeys.get(i));
677 if (com.getSequenceNumber() <= lastCommitSeen) {
680 returnCommits.add((Commit)com.getCopy(null));
685 if (!ut.evaluateGuard(commitedTable, null)) {
686 // Guard evaluated as false so return only the commits that the other device has not seen yet
687 return new Pair<Boolean, List<Commit>>(false, returnCommits);
691 Commit commit = new Commit(null,
693 commitSequenceNumber,
695 ut.getkeyValueUpdateSet());
696 commitSequenceNumber = commitSequenceNumber + 1;
698 // Add to the pending commits list
699 pendingCommitsList.add(commit);
701 // Add this commit so we can send it back
702 returnCommits.add(commit);
704 // Prepare to process the commit
705 processEntry(commit);
707 boolean didCommitOrSpeculate = proccessAllNewCommits();
709 // Go through all uncommitted transactions and kill the ones that are dead
710 deleteDeadUncommittedTransactions();
712 // Speculate on key value pairs
713 didCommitOrSpeculate |= createSpeculativeTable();
714 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
716 return new Pair<Boolean, List<Commit>>(true, returnCommits);
719 public void decrementLiveCount() {
723 private void setResizeThreshold() {
724 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
725 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
728 private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
729 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
732 if (liveslotcount > resizethreshold) {
733 resize = true; //Resize is forced
737 newsize = (int) (numslots * RESIZE_MULTIPLE);
738 TableStatus status = new TableStatus(s, newsize);
742 doRejectedMessages(s);
744 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
746 // Resize was needed so redo call
747 if (retTup.getFirst()) {
748 return tryput(pendingTrans, true);
751 // Extract working variables
752 boolean seenliveslot = retTup.getSecond();
753 long seqn = retTup.getThird();
757 Transaction trans = new Transaction(s,
758 s.getSequenceNumber(),
760 pendingTrans.getArbitrator(),
761 pendingTrans.getKVUpdates(),
762 pendingTrans.getKVGuard());
763 boolean insertedTrans = false;
764 if (s.hasSpace(trans)) {
766 insertedTrans = true;
769 doOptionalRescue(s, seenliveslot, seqn, resize);
770 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
772 if (sendRetData.getFirst()) {
773 // update the status and change what the sequence number is for the
774 TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
775 transStatus.setStatus(TransactionStatus.StatusSent);
776 transStatus.setSentTransaction();
777 transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
781 if (sendRetData.getSecond().length != 0) {
782 // insert into the local block chain
783 validateandupdate(sendRetData.getSecond(), true);
786 return sendRetData.getFirst();
789 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
790 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
792 if (liveslotcount > resizethreshold) {
793 resize = true; //Resize is forced
797 newsize = (int) (numslots * RESIZE_MULTIPLE);
798 TableStatus status = new TableStatus(s, newsize);
802 doRejectedMessages(s);
803 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
805 // Resize was needed so redo call
806 if (retTup.getFirst()) {
807 return tryput(keyName, arbMachineid, true);
810 // Extract working variables
811 boolean seenliveslot = retTup.getSecond();
812 long seqn = retTup.getThird();
816 NewKey newKey = new NewKey(s, keyName, arbMachineid);
818 boolean insertedNewKey = false;
819 if (s.hasSpace(newKey)) {
821 insertedNewKey = true;
824 doOptionalRescue(s, seenliveslot, seqn, resize);
825 Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
827 if (sendRetData.getSecond().length != 0) {
828 // insert into the local block chain
829 validateandupdate(sendRetData.getSecond(), true);
832 return sendRetData.getFirst();
835 private void doRejectedMessages(Slot s) {
836 if (! rejectedmessagelist.isEmpty()) {
837 /* TODO: We should avoid generating a rejected message entry if
838 * there is already a sufficient entry in the queue (e.g.,
839 * equalsto value of true and same sequence number). */
841 long old_seqn = rejectedmessagelist.firstElement();
842 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
843 long new_seqn = rejectedmessagelist.lastElement();
844 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
849 /* Go through list of missing messages */
850 for (; i < rejectedmessagelist.size(); i++) {
851 long curr_seqn = rejectedmessagelist.get(i);
852 Slot s_msg = buffer.getSlot(curr_seqn);
855 prev_seqn = curr_seqn;
857 /* Generate rejected message entry for missing messages */
858 if (prev_seqn != -1) {
859 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
862 /* Generate rejected message entries for present messages */
863 for (; i < rejectedmessagelist.size(); i++) {
864 long curr_seqn = rejectedmessagelist.get(i);
865 Slot s_msg = buffer.getSlot(curr_seqn);
866 long machineid = s_msg.getMachineID();
867 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
874 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
875 long newestseqnum = buffer.getNewestSeqNum();
876 long oldestseqnum = buffer.getOldestSeqNum();
877 if (lastliveslotseqn < oldestseqnum)
878 lastliveslotseqn = oldestseqnum;
880 long seqn = lastliveslotseqn;
881 boolean seenliveslot = false;
882 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
883 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
887 for (; seqn < threshold; seqn++) {
888 Slot prevslot = buffer.getSlot(seqn);
889 // Push slot number forward
891 lastliveslotseqn = seqn;
893 if (! prevslot.isLive())
896 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
897 for (Entry liveentry : liveentries) {
898 if (s.hasSpace(liveentry)) {
899 s.addEntry(liveentry);
900 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
902 System.out.println("B"); //?
903 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
910 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
913 private boolean doArbitration(Slot s) {
915 // flag whether we have finished all arbitration
916 boolean stillHasArbitration = false;
918 pendingCommitsToDelete.clear();
920 // First add queue commits
921 for (Commit commit : pendingCommitsList) {
922 if (s.hasSpace(commit)) {
924 pendingCommitsToDelete.add(commit);
926 // Ran out of space so move on but still not done
927 stillHasArbitration = true;
928 return stillHasArbitration;
933 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
934 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
936 // Sort from oldest to newest
937 Collections.sort(transSeqNums);
939 for (Long transNum : transSeqNums) {
940 Transaction ut = uncommittedTransactionsMap.get(transNum);
942 // Check if this machine arbitrates for this transaction
943 if (ut.getArbitrator() != localmachineid ) {
947 // we did have something to arbitrate on
948 stillHasArbitration = true;
950 Entry newEntry = null;
952 if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
953 // Guard evaluated as true
955 // update the local tmp current key set
956 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
957 speculativeTableTmp.put(kv.getKey(), kv);
961 newEntry = new Commit(s,
962 ut.getSequenceNumber(),
963 commitSequenceNumber,
965 ut.getkeyValueUpdateSet());
966 commitSequenceNumber = commitSequenceNumber + 1;
971 newEntry = new Abort(s,
972 ut.getSequenceNumber(),
977 if ((newEntry != null) && s.hasSpace(newEntry)) {
978 s.addEntry(newEntry);
984 return stillHasArbitration;
987 private void deletePendingCommits() {
988 for (Commit com : pendingCommitsToDelete) {
989 pendingCommitsList.remove(com);
991 pendingCommitsToDelete.clear();
994 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
995 /* now go through live entries from least to greatest sequence number until
996 * either all live slots added, or the slot doesn't have enough room
997 * for SKIP_THRESHOLD consecutive entries*/
999 long newestseqnum = buffer.getNewestSeqNum();
1001 for (; seqn <= newestseqnum; seqn++) {
1002 Slot prevslot = buffer.getSlot(seqn);
1003 //Push slot number forward
1005 lastliveslotseqn = seqn;
1007 if (!prevslot.isLive())
1009 seenliveslot = true;
1010 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
1011 for (Entry liveentry : liveentries) {
1012 if (s.hasSpace(liveentry))
1013 s.addEntry(liveentry);
1016 if (skipcount > SKIP_THRESHOLD)
1023 private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
1028 Slot[] array = cloud.putSlot(s, max);
1029 if (array == null) {
1030 array = new Slot[] {s};
1031 rejectedmessagelist.clear();
1033 // Delete pending commits that were sent to the cloud
1034 deletePendingCommits();
1036 // if (array.length == 0)
1037 // throw new Error("Server Error: Did not send any slots");
1038 rejectedmessagelist.add(s.getSequenceNumber());
1042 return new Pair<Boolean, Slot[]>(inserted, array);
1045 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
1046 /* The cloud communication layer has checked slot HMACs already
1048 if (newslots.length == 0) return;
1050 // Reset the table status declared sizes
1051 smallestTableStatusSeen = -1;
1052 largestTableStatusSeen = -1;
1054 long firstseqnum = newslots[0].getSequenceNumber();
1055 if (firstseqnum <= sequencenumber) {
1056 throw new Error("Server Error: Sent older slots!");
1059 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
1060 checkHMACChain(indexer, newslots);
1062 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
1064 // initExpectedSize(firstseqnum);
1065 for (Slot slot : newslots) {
1066 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
1067 // updateExpectedSize();
1070 /* If there is a gap, check to see if the server sent us everything. */
1071 if (firstseqnum != (sequencenumber + 1)) {
1074 checkNumSlots(newslots.length);
1075 if (!machineSet.isEmpty()) {
1076 throw new Error("Missing record for machines: " + machineSet);
1083 /* Commit new to slots. */
1084 for (Slot slot : newslots) {
1085 buffer.putSlot(slot);
1088 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
1090 // Process all on key value pairs
1091 boolean didCommitOrSpeculate = proccessAllNewCommits();
1093 // Go through all uncommitted transactions and kill the ones that are dead
1094 deleteDeadUncommittedTransactions();
1096 // Speculate on key value pairs
1097 didCommitOrSpeculate |= createSpeculativeTable();
1099 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
1102 private boolean proccessAllNewCommits() {
1103 // Process only if there are commit
1104 if (newCommitMap.keySet().size() == 0) {
1107 boolean didProcessNewCommit = false;
1109 for (Long arb : newCommitMap.keySet()) {
1111 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
1113 // Sort from oldest to newest commit
1114 Collections.sort(commitSeqNums);
1116 // Go through each new commit one by one
1117 for (Long entrySeqNum : commitSeqNums) {
1118 Commit entry = newCommitMap.get(arb).get(entrySeqNum);
1120 long lastCommitSeenSeqNum = -1;
1121 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
1122 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
1125 if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
1126 Map<Long, Commit> cm = commitMap.get(arb);
1128 cm = new HashMap<Long, Commit>();
1131 Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
1132 commitMap.put(arb, cm);
1134 if (prevCommit != null) {
1135 prevCommit.setDead();
1137 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
1138 committedMapByKey.put(kv.getKey(), entry);
1145 Set<Commit> commitsToEditSet = new HashSet<Commit>();
1147 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1148 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
1151 commitsToEditSet.remove(null);
1153 for (Commit prevCommit : commitsToEditSet) {
1155 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
1157 if (!prevCommit.isLive()) {
1158 Map<Long, Commit> cm = commitMap.get(arb);
1160 // remove it from the map so that it can be set as dead
1162 cm.remove(prevCommit.getSequenceNumber());
1163 commitMap.put(arb, cm);
1168 // Add the new commit
1169 Map<Long, Commit> cm = commitMap.get(arb);
1171 cm = new HashMap<Long, Commit>();
1173 cm.put(entry.getSequenceNumber(), entry);
1174 commitMap.put(arb, cm);
1176 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
1178 // set the trans sequence number if we are able to
1179 if (entry.getTransSequenceNumber() != -1) {
1180 lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1183 didProcessNewCommit = true;
1185 // Update the committed table list
1186 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
1187 IoTString key = kv.getKey();
1188 commitedTable.put(key, kv);
1189 committedMapByKey.put(key, entry);
1193 // Clear the new commits storage so we can use it later
1194 newCommitMap.clear();
1196 // go through all saved transactions and update the status of those that can be updated
1197 for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
1198 Map.Entry<Long, TransactionStatus> entry = i.next();
1199 long seqnum = entry.getKey();
1200 TransactionStatus status = entry.getValue();
1202 if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
1203 status.setStatus(TransactionStatus.StatusCommitted);
1208 return didProcessNewCommit;
1211 private void deleteDeadUncommittedTransactions() {
1212 // Make dead the transactions
1213 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
1214 Transaction prevtrans = i.next().getValue();
1215 long transArb = prevtrans.getArbitrator();
1217 Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
1218 Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
1220 if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
1221 ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
1223 prevtrans.setDead();
1228 private boolean createSpeculativeTable() {
1229 if (uncommittedTransactionsMap.keySet().size() == 0) {
1233 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
1234 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
1236 // Sort from oldest to newest commit
1237 Collections.sort(utSeqNums);
1239 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
1241 speculativeTable.clear();
1242 lastUncommittedTransaction = -1;
1244 for (Long key : utSeqNums) {
1245 Transaction trans = uncommittedTransactionsMap.get(key);
1247 lastUncommittedTransaction = key;
1249 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
1250 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1251 speculativeTableTmp.put(kv.getKey(), kv);
1257 for (Long key : utSeqNums) {
1259 if (key <= lastUncommittedTransaction) {
1263 lastUncommittedTransaction = key;
1265 Transaction trans = uncommittedTransactionsMap.get(key);
1267 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
1268 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
1269 speculativeTableTmp.put(kv.getKey(), kv);
1275 for (IoTString key : speculativeTableTmp.keySet()) {
1276 speculativeTable.put(key, speculativeTableTmp.get(key));
1282 private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
1284 if (didCommitOrSpeculate) {
1285 pendingTransSpeculativeTable.clear();
1286 lastSeenPendingTransactionSpeculateIndex = 0;
1289 for (PendingTransaction pt : pendingTransQueue) {
1290 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
1292 lastSeenPendingTransactionSpeculateIndex = index;
1295 for (KeyValue kv : pt.getKVUpdates()) {
1296 pendingTransSpeculativeTable.put(kv.getKey(), kv);
1304 private int expectedsize, currmaxsize;
1306 private void checkNumSlots(int numslots) {
1309 // We only have 1 size so we must have this many slots
1310 if (largestTableStatusSeen == smallestTableStatusSeen) {
1311 if (numslots != smallestTableStatusSeen) {
1312 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
1315 // We have more than 1
1316 if (numslots < smallestTableStatusSeen) {
1317 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
1321 // if (numslots != expectedsize) {
1322 // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
1326 private void initExpectedSize(long firstsequencenumber) {
1327 long prevslots = firstsequencenumber;
1328 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
1329 currmaxsize = numslots;
1332 private void updateExpectedSize() {
1334 if (expectedsize > currmaxsize) {
1335 expectedsize = currmaxsize;
1339 private void updateCurrMaxSize(int newmaxsize) {
1340 currmaxsize = newmaxsize;
1343 private void commitNewMaxSize() {
1345 if (largestTableStatusSeen == -1) {
1346 currmaxsize = numslots;
1348 currmaxsize = largestTableStatusSeen;
1351 if (numslots != currmaxsize) {
1352 buffer.resize(currmaxsize);
1355 numslots = currmaxsize;
1356 setResizeThreshold();
1359 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
1360 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
1363 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
1364 long oldseqnum = entry.getOldSeqNum();
1365 long newseqnum = entry.getNewSeqNum();
1366 boolean isequal = entry.getEqual();
1367 long machineid = entry.getMachineID();
1368 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
1369 Slot slot = indexer.getSlot(seqnum);
1371 long slotmachineid = slot.getMachineID();
1372 if (isequal != (slotmachineid == machineid)) {
1373 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
1378 HashSet<Long> watchset = new HashSet<Long>();
1379 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
1380 long entry_mid = lastmsg_entry.getKey();
1381 /* We've seen it, don't need to continue to watch. Our next
1382 * message will implicitly acknowledge it. */
1383 if (entry_mid == localmachineid)
1385 Pair<Long, Liveness> v = lastmsg_entry.getValue();
1386 long entry_seqn = v.getFirst();
1387 if (entry_seqn < newseqnum) {
1388 addWatchList(entry_mid, entry);
1389 watchset.add(entry_mid);
1392 if (watchset.isEmpty())
1395 entry.setWatchSet(watchset);
1398 private void processEntry(NewKey entry) {
1399 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1401 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1403 if (oldNewKey != null) {
1404 oldNewKey.setDead();
1408 private void processEntry(Transaction entry) {
1410 long arb = entry.getArbitrator();
1411 Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
1412 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1414 Transaction prevTrans = null;
1416 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1417 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1418 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1419 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1421 prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1424 // Duplicate so delete old copy
1425 if (prevTrans != null) {
1426 prevTrans.setDead();
1430 private void processEntry(Abort entry) {
1431 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1432 // Abort has not been seen yet so we need to keep track of it
1434 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1435 if (prevAbort != null) {
1436 prevAbort.setDead(); // delete old version of the duplicate
1439 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1440 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1443 // The machine already saw this so it is dead
1447 // Update the status of the transaction and remove it since we are done with this transaction
1448 TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
1449 if (status != null) {
1450 status.setStatus(TransactionStatus.StatusAborted);
1454 private void processEntry(Commit entry) {
1455 Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
1457 if (arbMap == null) {
1458 arbMap = new HashMap<Long, Commit>();
1461 Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
1462 newCommitMap.put(entry.getTransArbitrator(), arbMap);
1464 if (prevCommit != null) {
1465 prevCommit.setDead();
1469 private void processEntry(TableStatus entry) {
1470 int newnumslots = entry.getMaxSlots();
1471 // updateCurrMaxSize(newnumslots);
1472 if (lastTableStatus != null)
1473 lastTableStatus.setDead();
1474 lastTableStatus = entry;
1476 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1477 smallestTableStatusSeen = newnumslots;
1480 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1481 largestTableStatusSeen = newnumslots;
1485 private void addWatchList(long machineid, RejectedMessage entry) {
1486 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1487 if (entries == null)
1488 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1492 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1493 machineSet.remove(machineid);
1495 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1496 if (watchset != null) {
1497 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1498 RejectedMessage rm = rmit.next();
1499 if (rm.getNewSeqNum() <= seqnum) {
1500 /* Remove it from our watchlist */
1502 /* Decrement machines that need to see this notification */
1503 rm.removeWatcher(machineid);
1508 if (machineid == localmachineid) {
1509 /* Our own messages are immediately dead. */
1510 if (liveness instanceof LastMessage) {
1511 ((LastMessage)liveness).setDead();
1512 } else if (liveness instanceof Slot) {
1513 ((Slot)liveness).setDead();
1515 throw new Error("Unrecognized type");
1519 // Set dead the abort
1520 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1521 Abort abort = i.next().getValue();
1523 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1529 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1530 if (lastmsgentry == null)
1533 long lastmsgseqnum = lastmsgentry.getFirst();
1534 Liveness lastentry = lastmsgentry.getSecond();
1535 if (machineid != localmachineid) {
1536 if (lastentry instanceof LastMessage) {
1537 ((LastMessage)lastentry).setDead();
1538 } else if (lastentry instanceof Slot) {
1539 ((Slot)lastentry).setDead();
1541 throw new Error("Unrecognized type");
1545 if (machineid == localmachineid) {
1546 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1547 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
1549 if (lastmsgseqnum > seqnum)
1550 throw new Error("Server Error: Rollback on remote machine sequence number");
1554 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1555 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1556 for (Entry entry : slot.getEntries()) {
1557 switch (entry.getType()) {
1559 case Entry.TypeNewKey:
1560 processEntry((NewKey)entry);
1563 case Entry.TypeCommit:
1564 processEntry((Commit)entry);
1567 case Entry.TypeAbort:
1568 processEntry((Abort)entry);
1571 case Entry.TypeTransaction:
1572 processEntry((Transaction)entry);
1575 case Entry.TypeLastMessage:
1576 processEntry((LastMessage)entry, machineSet);
1579 case Entry.TypeRejectedMessage:
1580 processEntry((RejectedMessage)entry, indexer);
1583 case Entry.TypeTableStatus:
1584 processEntry((TableStatus)entry);
1588 throw new Error("Unrecognized type: " + entry.getType());
1593 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1594 for (int i = 0; i < newslots.length; i++) {
1595 Slot currslot = newslots[i];
1596 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1597 if (prevslot != null &&
1598 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1599 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);