2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
19 * IoTTable data structure. Provides client inferface.
20 * @author Brian Demsky
24 final public class Table {
25 private int numslots; //number of slots stored in buffer
27 //table of key-value pairs
28 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
30 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
33 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34 private Vector<Long> rejectedmessagelist = new Vector<Long>();
35 private SlotBuffer buffer;
36 private CloudComm cloud;
37 private long sequencenumber; //Largest sequence number a client has received
38 private long localmachineid;
39 private TableStatus lastTableStatus;
40 static final int FREE_SLOTS = 10; //number of slots that should be kept free
41 static final int SKIP_THRESHOLD = 10;
42 private long liveslotcount = 0;
44 static final double RESIZE_MULTIPLE = 1.2;
45 static final double RESIZE_THRESHOLD = 0.75;
46 static final int REJECTED_THRESHOLD = 5;
47 private int resizethreshold;
48 private long lastliveslotseqn; //smallest sequence number with a live entry
49 private Random random = new Random();
50 private long lastUncommittedTransaction = 0;
52 private int smallestTableStatusSeen = -1;
53 private int largestTableStatusSeen = -1;
54 private int lastSeenPendingTransactionSpeculateIndex = 0;
56 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
57 private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
58 private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
59 private Map<Long, Abort> abortMap = null; // Set of the live aborts
60 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
61 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
62 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
63 private Map<Long, Transaction> uncommittedTransactionsMap = null;
64 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
65 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
66 private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
67 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
69 private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
73 public Table(String baseurl, String password, long _localmachineid) {
74 localmachineid = _localmachineid;
75 buffer = new SlotBuffer();
76 numslots = buffer.capacity();
79 cloud = new CloudComm(this, baseurl, password);
85 public Table(CloudComm _cloud, long _localmachineid) {
86 localmachineid = _localmachineid;
87 buffer = new SlotBuffer();
88 numslots = buffer.capacity();
96 private void setupDataStructs() {
97 pendingTransQueue = new LinkedList<PendingTransaction>();
98 commitMap = new HashMap<Long, Commit>();
99 abortMap = new HashMap<Long, Abort>();
100 committedMapByKey = new HashMap<IoTString, Commit>();
101 commitedTable = new HashMap<IoTString, KeyValue>();
102 speculativeTable = new HashMap<IoTString, KeyValue>();
103 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
104 arbitratorTable = new HashMap<IoTString, Long>();
105 newKeyTable = new HashMap<IoTString, NewKey>();
106 newCommitMap = new HashMap<Long, Commit>();
107 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
108 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
109 pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
112 public void rebuild() throws ServerException {
113 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
114 validateandupdate(newslots, true);
117 // // TODO: delete method
118 // public void printSlots() {
119 // long o = buffer.getOldestSeqNum();
120 // long n = buffer.getNewestSeqNum();
122 // int[] types = new int[10];
128 // for (long i = o; i < (n + 1); i++) {
129 // Slot s = buffer.getSlot(i);
131 // Vector<Entry> entries = s.getEntries();
133 // for (Entry e : entries) {
135 // int type = e.getType();
136 // types[type] = types[type] + 1;
145 // for (int i = 0; i < 10; i++) {
146 // System.out.println(i + " " + types[i]);
148 // System.out.println("Live count: " + livec);
149 // System.out.println("Dead count: " + deadc);
150 // System.out.println("Old: " + o);
151 // System.out.println("New: " + n);
152 // System.out.println("Size: " + buffer.size());
153 // System.out.println("Commits Map: " + commitedTable.size());
154 // System.out.println("Commits List: " + commitMap.size());
157 public IoTString getCommitted(IoTString key) {
158 KeyValue kv = commitedTable.get(key);
160 return kv.getValue();
166 public IoTString getSpeculative(IoTString key) {
167 KeyValue kv = pendingTransSpeculativeTable.get(key);
170 kv = speculativeTable.get(key);
174 kv = commitedTable.get(key);
178 return kv.getValue();
184 public IoTString getCommittedAtomic(IoTString key) {
185 KeyValue kv = commitedTable.get(key);
187 if (arbitratorTable.get(key) == null) {
188 throw new Error("Key not Found.");
191 // Make sure new key value pair matches the current arbitrator
192 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
193 // TODO: Maybe not throw en error
194 throw new Error("Not all Key Values Match Arbitrator.");
198 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
199 return kv.getValue();
201 pendingTransBuild.addKVGuard(new KeyValue(key, null));
206 public IoTString getSpeculativeAtomic(IoTString key) {
208 if (arbitratorTable.get(key) == null) {
209 throw new Error("Key not Found.");
212 // Make sure new key value pair matches the current arbitrator
213 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
214 // TODO: Maybe not throw en error
215 throw new Error("Not all Key Values Match Arbitrator.");
218 KeyValue kv = pendingTransSpeculativeTable.get(key);
221 kv = speculativeTable.get(key);
225 kv = commitedTable.get(key);
229 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
230 return kv.getValue();
232 pendingTransBuild.addKVGuard(new KeyValue(key, null));
237 public Long getArbitrator(IoTString key) {
238 return arbitratorTable.get(key);
241 public void initTable() throws ServerException {
242 cloud.setSalt();//Set the salt
243 Slot s = new Slot(this, 1, localmachineid);
244 TableStatus status = new TableStatus(s, numslots);
246 Slot[] array = cloud.putSlot(s, numslots);
248 array = new Slot[] {s};
249 /* update data structure */
250 validateandupdate(array, true);
252 throw new Error("Error on initialization");
256 public String toString() {
257 String retString = " Committed Table: \n";
258 retString += "---------------------------\n";
259 retString += commitedTable.toString();
263 retString += " Speculative Table: \n";
264 retString += "---------------------------\n";
265 retString += speculativeTable.toString();
270 public void startTransaction() {
271 // Create a new transaction, invalidates any old pending transactions.
272 pendingTransBuild = new PendingTransaction();
275 public void commitTransaction() throws ServerException {
277 if (pendingTransBuild.getKVUpdates().size() == 0) {
278 // If no updates are made then there is no point inserting into the chain
282 // Add the pending transaction to the queue
283 pendingTransQueue.add(pendingTransBuild);
285 for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
286 PendingTransaction pt = pendingTransQueue.get(i);
288 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
290 lastSeenPendingTransactionSpeculateIndex = i;
292 for (KeyValue kv : pt.getKVUpdates()) {
293 pendingTransSpeculativeTable.put(kv.getKey(), kv);
300 // Delete since already inserted
301 pendingTransBuild = new PendingTransaction();
303 while (!pendingTransQueue.isEmpty()) {
304 if (tryput( pendingTransQueue.peek(), false)) {
305 pendingTransQueue.poll();
310 public void addKV(IoTString key, IoTString value) {
312 if (arbitratorTable.get(key) == null) {
313 throw new Error("Key not Found.");
316 // Make sure new key value pair matches the current arbitrator
317 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
318 // TODO: Maybe not throw en error
319 throw new Error("Not all Key Values Match Arbitrator.");
322 KeyValue kv = new KeyValue(key, value);
323 pendingTransBuild.addKV(kv);
326 public void update() throws ServerException {
327 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
328 validateandupdate(newslots, false);
330 if (!pendingTransQueue.isEmpty()) {
331 System.out.println("Full Update");
333 // We have a pending transaction so do full insertion
335 while (!pendingTransQueue.isEmpty()) {
336 if (tryput( pendingTransQueue.peek(), false)) {
337 pendingTransQueue.poll();
341 // We dont have a pending transaction so do minimal effort
342 if (uncommittedTransactionsMap.keySet().size() > 0) {
344 boolean doEnd = false;
345 boolean needResize = false;
346 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
347 boolean resize = needResize;
350 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
352 if (liveslotcount > resizethreshold) {
353 resize = true; //Resize is forced
357 newsize = (int) (numslots * RESIZE_MULTIPLE);
358 TableStatus status = new TableStatus(s, newsize);
362 doRejectedMessages(s);
364 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
366 // Resize was needed so redo call
367 if (retTup.getFirst()) {
372 // Extract working variables
373 boolean seenliveslot = retTup.getSecond();
374 long seqn = retTup.getThird();
376 // Did need to arbitrate
377 doEnd = !doArbitration(s);
379 doOptionalRescue(s, seenliveslot, seqn, resize);
386 Slot[] array = cloud.putSlot(s, max);
388 array = new Slot[] {s};
389 rejectedmessagelist.clear();
391 if (array.length == 0)
392 throw new Error("Server Error: Did not send any slots");
393 rejectedmessagelist.add(s.getSequenceNumber());
397 /* update data structure */
398 validateandupdate(array, true);
404 public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
407 if (arbitratorTable.get(keyName) != null) {
408 // There is already an arbitrator
412 if (tryput(keyName, machineId, false)) {
413 // If successfully inserted
419 public void decrementLiveCount() {
423 private void setResizeThreshold() {
424 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
425 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
428 private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
429 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
432 if (liveslotcount > resizethreshold) {
433 resize = true; //Resize is forced
437 newsize = (int) (numslots * RESIZE_MULTIPLE);
438 TableStatus status = new TableStatus(s, newsize);
442 doRejectedMessages(s);
444 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
446 // Resize was needed so redo call
447 if (retTup.getFirst()) {
448 return tryput(pendingTrans, true);
451 // Extract working variables
452 boolean seenliveslot = retTup.getSecond();
453 long seqn = retTup.getThird();
457 Transaction trans = new Transaction(s,
458 s.getSequenceNumber(),
460 pendingTrans.getArbitrator(),
461 pendingTrans.getKVUpdates(),
462 pendingTrans.getKVGuard());
463 boolean insertedTrans = false;
464 if (s.hasSpace(trans)) {
466 insertedTrans = true;
469 doOptionalRescue(s, seenliveslot, seqn, resize);
470 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
473 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
474 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
476 if (liveslotcount > resizethreshold) {
477 resize = true; //Resize is forced
481 newsize = (int) (numslots * RESIZE_MULTIPLE);
482 TableStatus status = new TableStatus(s, newsize);
486 doRejectedMessages(s);
487 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
489 // Resize was needed so redo call
490 if (retTup.getFirst()) {
491 return tryput(keyName, arbMachineid, true);
494 // Extract working variables
495 boolean seenliveslot = retTup.getSecond();
496 long seqn = retTup.getThird();
500 NewKey newKey = new NewKey(s, keyName, arbMachineid);
502 boolean insertedNewKey = false;
503 if (s.hasSpace(newKey)) {
505 insertedNewKey = true;
508 doOptionalRescue(s, seenliveslot, seqn, resize);
509 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
512 private void doRejectedMessages(Slot s) {
513 if (! rejectedmessagelist.isEmpty()) {
514 /* TODO: We should avoid generating a rejected message entry if
515 * there is already a sufficient entry in the queue (e.g.,
516 * equalsto value of true and same sequence number). */
518 long old_seqn = rejectedmessagelist.firstElement();
519 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
520 long new_seqn = rejectedmessagelist.lastElement();
521 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
526 /* Go through list of missing messages */
527 for (; i < rejectedmessagelist.size(); i++) {
528 long curr_seqn = rejectedmessagelist.get(i);
529 Slot s_msg = buffer.getSlot(curr_seqn);
532 prev_seqn = curr_seqn;
534 /* Generate rejected message entry for missing messages */
535 if (prev_seqn != -1) {
536 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
539 /* Generate rejected message entries for present messages */
540 for (; i < rejectedmessagelist.size(); i++) {
541 long curr_seqn = rejectedmessagelist.get(i);
542 Slot s_msg = buffer.getSlot(curr_seqn);
543 long machineid = s_msg.getMachineID();
544 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
551 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
552 long newestseqnum = buffer.getNewestSeqNum();
553 long oldestseqnum = buffer.getOldestSeqNum();
554 if (lastliveslotseqn < oldestseqnum)
555 lastliveslotseqn = oldestseqnum;
557 long seqn = lastliveslotseqn;
558 boolean seenliveslot = false;
559 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
560 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
564 for (; seqn < threshold; seqn++) {
565 Slot prevslot = buffer.getSlot(seqn);
566 // Push slot number forward
568 lastliveslotseqn = seqn;
570 if (! prevslot.isLive())
573 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
574 for (Entry liveentry : liveentries) {
575 if (s.hasSpace(liveentry)) {
576 s.addEntry(liveentry);
577 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
579 System.out.println("B"); //?
580 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
587 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
590 private boolean doArbitration(Slot s) {
592 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
593 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
595 // Sort from oldest to newest
596 Collections.sort(transSeqNums);
598 boolean didNeedArbitration = false;
599 for (Long transNum : transSeqNums) {
600 Transaction ut = uncommittedTransactionsMap.get(transNum);
602 // Check if this machine arbitrates for this transaction
603 if (ut.getArbitrator() != localmachineid ) {
607 // we did have something to arbitrate on
608 didNeedArbitration = true;
610 Entry newEntry = null;
612 if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
613 // Guard evaluated as true
615 // update the local tmp current key set
616 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
617 speculativeTableTmp.put(kv.getKey(), kv);
621 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
626 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
629 if ((newEntry != null) && s.hasSpace(newEntry)) {
630 s.addEntry(newEntry);
636 return didNeedArbitration;
639 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
640 /* now go through live entries from least to greatest sequence number until
641 * either all live slots added, or the slot doesn't have enough room
642 * for SKIP_THRESHOLD consecutive entries*/
644 long newestseqnum = buffer.getNewestSeqNum();
646 for (; seqn <= newestseqnum; seqn++) {
647 Slot prevslot = buffer.getSlot(seqn);
648 //Push slot number forward
650 lastliveslotseqn = seqn;
652 if (!prevslot.isLive())
655 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
656 for (Entry liveentry : liveentries) {
657 if (s.hasSpace(liveentry))
658 s.addEntry(liveentry);
661 if (skipcount > SKIP_THRESHOLD)
668 private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
673 Slot[] array = cloud.putSlot(s, max);
675 array = new Slot[] {s};
676 rejectedmessagelist.clear();
678 // if (array.length == 0)
679 // throw new Error("Server Error: Did not send any slots");
680 rejectedmessagelist.add(s.getSequenceNumber());
684 if (array.length != 0) {
685 validateandupdate(array, true);
691 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
692 /* The cloud communication layer has checked slot HMACs already
694 if (newslots.length == 0) return;
696 // Reset the table status declared sizes
697 smallestTableStatusSeen = -1;
698 largestTableStatusSeen = -1;
700 long firstseqnum = newslots[0].getSequenceNumber();
701 if (firstseqnum <= sequencenumber) {
702 throw new Error("Server Error: Sent older slots!");
705 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
706 checkHMACChain(indexer, newslots);
708 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
710 // initExpectedSize(firstseqnum);
711 for (Slot slot : newslots) {
712 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
713 // updateExpectedSize();
716 /* If there is a gap, check to see if the server sent us everything. */
717 if (firstseqnum != (sequencenumber + 1)) {
720 checkNumSlots(newslots.length);
721 if (!machineSet.isEmpty()) {
722 throw new Error("Missing record for machines: " + machineSet);
729 /* Commit new to slots. */
730 for (Slot slot : newslots) {
731 buffer.putSlot(slot);
734 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
736 // Process all on key value pairs
737 boolean didCommitOrSpeculate = proccessAllNewCommits();
739 // Go through all uncommitted transactions and kill the ones that are dead
740 deleteDeadUncommittedTransactions();
742 // Speculate on key value pairs
743 didCommitOrSpeculate |= createSpeculativeTable();
745 createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
748 public boolean proccessAllNewCommits() {
750 // Process only if there are commit
751 if (newCommitMap.keySet().size() == 0) {
755 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
757 // Sort from oldest to newest commit
758 Collections.sort(commitSeqNums);
760 boolean didProcessNewCommit = false;
762 // Go through each new commit one by one
763 for (Long entrySeqNum : commitSeqNums) {
764 Commit entry = newCommitMap.get(entrySeqNum);
766 long lastCommitSeenSeqNum = -1;
768 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
769 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
772 if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
774 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
776 if (prevCommit != null) {
777 prevCommit.setDead();
779 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
780 committedMapByKey.put(kv.getKey(), entry);
787 Set<Commit> commitsToEditSet = new HashSet<Commit>();
789 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
790 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
793 commitsToEditSet.remove(null);
795 for (Commit prevCommit : commitsToEditSet) {
797 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
799 if (!prevCommit.isLive()) {
800 commitMap.remove(prevCommit.getTransSequenceNumber());
804 // Add the new commit
805 commitMap.put(entry.getTransSequenceNumber(), entry);
806 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
807 didProcessNewCommit = true;
809 // Update the committed table list
810 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
811 IoTString key = kv.getKey();
812 commitedTable.put(key, kv);
814 committedMapByKey.put(key, entry);
818 // Clear the new commits storage so we can use it later
819 newCommitMap.clear();
821 return didProcessNewCommit;
824 private void deleteDeadUncommittedTransactions() {
825 // Make dead the transactions
826 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
827 Transaction prevtrans = i.next().getValue();
828 long transArb = prevtrans.getArbitrator();
830 if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
831 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
838 private boolean createSpeculativeTable() {
839 if (uncommittedTransactionsMap.keySet().size() == 0) {
843 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
844 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
846 // Sort from oldest to newest commit
847 Collections.sort(utSeqNums);
849 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
851 speculativeTable.clear();
852 lastUncommittedTransaction = -1;
854 for (Long key : utSeqNums) {
855 Transaction trans = uncommittedTransactionsMap.get(key);
857 lastUncommittedTransaction = key;
859 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
860 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
861 speculativeTableTmp.put(kv.getKey(), kv);
867 for (Long key : utSeqNums) {
869 if (key <= lastUncommittedTransaction) {
873 lastUncommittedTransaction = key;
875 Transaction trans = uncommittedTransactionsMap.get(key);
877 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
878 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
879 speculativeTableTmp.put(kv.getKey(), kv);
885 for (IoTString key : speculativeTableTmp.keySet()) {
886 speculativeTable.put(key, speculativeTableTmp.get(key));
892 private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
894 if (didCommitOrSpeculate) {
895 pendingTransSpeculativeTable.clear();
896 lastSeenPendingTransactionSpeculateIndex = 0;
899 for (PendingTransaction pt : pendingTransQueue) {
900 if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
902 lastSeenPendingTransactionSpeculateIndex = index;
905 for (KeyValue kv : pt.getKVUpdates()) {
906 pendingTransSpeculativeTable.put(kv.getKey(), kv);
914 private int expectedsize, currmaxsize;
916 private void checkNumSlots(int numslots) {
919 // We only have 1 size so we must have this many slots
920 if (largestTableStatusSeen == smallestTableStatusSeen) {
921 if (numslots != smallestTableStatusSeen) {
922 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
925 // We have more than 1
926 if (numslots < smallestTableStatusSeen) {
927 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
931 // if (numslots != expectedsize) {
932 // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
936 private void initExpectedSize(long firstsequencenumber) {
937 long prevslots = firstsequencenumber;
938 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
939 currmaxsize = numslots;
942 private void updateExpectedSize() {
944 if (expectedsize > currmaxsize) {
945 System.out.println("Maxing Out: " + expectedsize + " " + currmaxsize);
946 expectedsize = currmaxsize;
950 private void updateCurrMaxSize(int newmaxsize) {
951 currmaxsize = newmaxsize;
954 private void commitNewMaxSize() {
956 if (largestTableStatusSeen == -1) {
957 currmaxsize = numslots;
959 currmaxsize = largestTableStatusSeen;
962 if (numslots != currmaxsize) {
963 buffer.resize(currmaxsize);
966 numslots = currmaxsize;
967 setResizeThreshold();
970 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
971 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
974 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
975 long oldseqnum = entry.getOldSeqNum();
976 long newseqnum = entry.getNewSeqNum();
977 boolean isequal = entry.getEqual();
978 long machineid = entry.getMachineID();
979 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
980 Slot slot = indexer.getSlot(seqnum);
982 long slotmachineid = slot.getMachineID();
983 if (isequal != (slotmachineid == machineid)) {
984 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
989 HashSet<Long> watchset = new HashSet<Long>();
990 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
991 long entry_mid = lastmsg_entry.getKey();
992 /* We've seen it, don't need to continue to watch. Our next
993 * message will implicitly acknowledge it. */
994 if (entry_mid == localmachineid)
996 Pair<Long, Liveness> v = lastmsg_entry.getValue();
997 long entry_seqn = v.getFirst();
998 if (entry_seqn < newseqnum) {
999 addWatchList(entry_mid, entry);
1000 watchset.add(entry_mid);
1003 if (watchset.isEmpty())
1006 entry.setWatchSet(watchset);
1009 private void processEntry(NewKey entry) {
1010 arbitratorTable.put(entry.getKey(), entry.getMachineID());
1012 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
1014 if (oldNewKey != null) {
1015 oldNewKey.setDead();
1019 private void processEntry(Transaction entry) {
1021 long arb = entry.getArbitrator();
1022 Long comLast = lastCommitSeenSeqNumMap.get(arb);
1023 Long abLast = lastAbortSeenSeqNumMap.get(arb);
1025 Transaction prevTrans = null;
1027 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
1028 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1029 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
1030 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
1032 prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
1035 // Duplicate so delete old copy
1036 if (prevTrans != null) {
1037 prevTrans.setDead();
1041 private void processEntry(Abort entry) {
1043 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1044 // Abort has not been seen yet so we need to keep track of it
1046 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1047 if (prevAbort != null) {
1048 prevAbort.setDead(); // delete old version of the duplicate
1051 // The machine already saw this so it is dead
1055 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1056 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1060 private void processEntry(Commit entry, Slot s) {
1061 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
1062 if (prevCommit != null) {
1063 prevCommit.setDead();
1067 private void processEntry(TableStatus entry) {
1068 int newnumslots = entry.getMaxSlots();
1069 // updateCurrMaxSize(newnumslots);
1070 if (lastTableStatus != null)
1071 lastTableStatus.setDead();
1072 lastTableStatus = entry;
1074 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1075 smallestTableStatusSeen = newnumslots;
1078 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1079 largestTableStatusSeen = newnumslots;
1082 // System.out.println("Table Stat: " + newnumslots + " large: " + largestTableStatusSeen + " small: " + smallestTableStatusSeen);
1085 private void addWatchList(long machineid, RejectedMessage entry) {
1086 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1087 if (entries == null)
1088 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1092 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1093 machineSet.remove(machineid);
1095 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1096 if (watchset != null) {
1097 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1098 RejectedMessage rm = rmit.next();
1099 if (rm.getNewSeqNum() <= seqnum) {
1100 /* Remove it from our watchlist */
1102 /* Decrement machines that need to see this notification */
1103 rm.removeWatcher(machineid);
1108 if (machineid == localmachineid) {
1109 /* Our own messages are immediately dead. */
1110 if (liveness instanceof LastMessage) {
1111 ((LastMessage)liveness).setDead();
1112 } else if (liveness instanceof Slot) {
1113 ((Slot)liveness).setDead();
1115 throw new Error("Unrecognized type");
1119 // Set dead the abort
1120 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1121 Abort abort = i.next().getValue();
1123 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1129 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1130 if (lastmsgentry == null)
1133 long lastmsgseqnum = lastmsgentry.getFirst();
1134 Liveness lastentry = lastmsgentry.getSecond();
1135 if (machineid != localmachineid) {
1136 if (lastentry instanceof LastMessage) {
1137 ((LastMessage)lastentry).setDead();
1138 } else if (lastentry instanceof Slot) {
1139 ((Slot)lastentry).setDead();
1141 throw new Error("Unrecognized type");
1145 if (machineid == localmachineid) {
1146 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1147 throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
1149 if (lastmsgseqnum > seqnum)
1150 throw new Error("Server Error: Rollback on remote machine sequence number");
1154 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1155 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1156 for (Entry entry : slot.getEntries()) {
1157 switch (entry.getType()) {
1159 case Entry.TypeNewKey:
1160 processEntry((NewKey)entry);
1163 case Entry.TypeCommit:
1164 processEntry((Commit)entry, slot);
1167 case Entry.TypeAbort:
1168 processEntry((Abort)entry);
1171 case Entry.TypeTransaction:
1172 processEntry((Transaction)entry);
1175 case Entry.TypeLastMessage:
1176 processEntry((LastMessage)entry, machineSet);
1179 case Entry.TypeRejectedMessage:
1180 processEntry((RejectedMessage)entry, indexer);
1183 case Entry.TypeTableStatus:
1184 processEntry((TableStatus)entry);
1188 throw new Error("Unrecognized type: " + entry.getType());
1193 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1194 for (int i = 0; i < newslots.length; i++) {
1195 Slot currslot = newslots[i];
1196 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1197 if (prevslot != null &&
1198 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1199 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);