2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
19 * IoTTable data structure. Provides client inferface.
20 * @author Brian Demsky
24 final public class Table {
25 private int numslots; //number of slots stored in buffer
27 //table of key-value pairs
28 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
30 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
33 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34 private Vector<Long> rejectedmessagelist = new Vector<Long>();
35 private SlotBuffer buffer;
36 private CloudComm cloud;
37 private long sequencenumber; //Largest sequence number a client has received
38 private long localmachineid;
39 private TableStatus lastTableStatus;
40 static final int FREE_SLOTS = 10; //number of slots that should be kept free
41 static final int SKIP_THRESHOLD = 10;
42 private long liveslotcount = 0;
44 static final double RESIZE_MULTIPLE = 1.2;
45 static final double RESIZE_THRESHOLD = 0.75;
46 static final int REJECTED_THRESHOLD = 5;
47 private int resizethreshold;
48 private long lastliveslotseqn; //smallest sequence number with a live entry
49 private Random random = new Random();
50 private long lastUncommittedTransaction = 0;
52 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54 private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
55 private Map<Long, Abort> abortMap = null; // Set of the live aborts
56 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
57 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
58 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
59 private Map<Long, Transaction> uncommittedTransactionsMap = null;
60 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
61 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
62 private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
63 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
64 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
68 public Table(String baseurl, String password, long _localmachineid) {
69 localmachineid = _localmachineid;
70 buffer = new SlotBuffer();
71 numslots = buffer.capacity();
74 cloud = new CloudComm(this, baseurl, password);
80 public Table(CloudComm _cloud, long _localmachineid) {
81 localmachineid = _localmachineid;
82 buffer = new SlotBuffer();
83 numslots = buffer.capacity();
91 private void setupDataStructs() {
92 pendingTransQueue = new LinkedList<PendingTransaction>();
93 commitMap = new HashMap<Long, Commit>();
94 abortMap = new HashMap<Long, Abort>();
95 committedMapByKey = new HashMap<IoTString, Commit>();
96 commitedTable = new HashMap<IoTString, KeyValue>();
97 speculativeTable = new HashMap<IoTString, KeyValue>();
98 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
99 arbitratorTable = new HashMap<IoTString, Long>();
100 newKeyTable = new HashMap<IoTString, NewKey>();
101 newCommitMap = new HashMap<Long, Commit>();
102 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
103 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
106 public void rebuild() {
107 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
108 validateandupdate(newslots, true);
111 // TODO: delete method
112 // public void printSlots() {
113 // long o = buffer.getOldestSeqNum();
114 // long n = buffer.getNewestSeqNum();
116 // int[] types = new int[10];
122 // for (long i = o; i < (n + 1); i++) {
123 // Slot s = buffer.getSlot(i);
125 // Vector<Entry> entries = s.getEntries();
127 // for (Entry e : entries) {
129 // int type = e.getType();
130 // types[type] = types[type] + 1;
139 // for (int i = 0; i < 10; i++) {
140 // System.out.println(i + " " + types[i]);
142 // System.out.println("Live count: " + livec);
143 // System.out.println("Dead count: " + deadc);
144 // System.out.println("Old: " + o);
145 // System.out.println("New: " + n);
146 // System.out.println("Size: " + buffer.size());
147 // System.out.println("Commits Map: " + commitedTable.size());
148 // System.out.println("Commits List: " + commitMap.size());
151 public IoTString getCommitted(IoTString key) {
152 KeyValue kv = commitedTable.get(key);
154 return kv.getValue();
160 public IoTString getSpeculative(IoTString key) {
161 KeyValue kv = speculativeTable.get(key);
163 return kv.getValue();
169 public Long getArbitrator(IoTString key) {
170 return arbitratorTable.get(key);
173 public void initTable() {
174 cloud.setSalt();//Set the salt
175 Slot s = new Slot(this, 1, localmachineid);
176 TableStatus status = new TableStatus(s, numslots);
178 Slot[] array = cloud.putSlot(s, numslots);
180 array = new Slot[] {s};
181 /* update data structure */
182 validateandupdate(array, true);
184 throw new Error("Error on initialization");
188 public String toString() {
189 String retString = " Committed Table: \n";
190 retString += "---------------------------\n";
191 retString += commitedTable.toString();
195 retString += " Speculative Table: \n";
196 retString += "---------------------------\n";
197 retString += speculativeTable.toString();
202 public void startTransaction() {
203 // Create a new transaction, invalidates any old pending transactions.
204 pendingTransBuild = new PendingTransaction();
207 public void commitTransaction() {
209 if (pendingTransBuild.getKVUpdates().size() == 0) {
210 // If no updates are made then there is no point inserting into the chain
214 // Add the pending transaction to the queue
215 pendingTransQueue.add(pendingTransBuild);
217 while (!pendingTransQueue.isEmpty()) {
218 if (tryput( pendingTransQueue.peek(), false)) {
219 pendingTransQueue.poll();
224 public void addKV(IoTString key, IoTString value) {
226 if (arbitratorTable.get(key) == null) {
227 throw new Error("Key not Found.");
230 // Make sure new key value pair matches the current arbitrator
231 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
232 // TODO: Maybe not throw en error
233 throw new Error("Not all Key Values Match.");
236 KeyValue kv = new KeyValue(key, value);
237 pendingTransBuild.addKV(kv);
240 public void addGuard(Guard guard) {
241 pendingTransBuild.addGuard(guard);
244 public void update() {
246 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
248 validateandupdate(newslots, false);
250 if (uncommittedTransactionsMap.keySet().size() > 0) {
252 boolean doEnd = false;
253 boolean needResize = false;
254 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
255 boolean resize = needResize;
258 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
260 if (liveslotcount > resizethreshold) {
261 resize = true; //Resize is forced
265 newsize = (int) (numslots * RESIZE_MULTIPLE);
266 TableStatus status = new TableStatus(s, newsize);
270 doRejectedMessages(s);
272 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
274 // Resize was needed so redo call
275 if (retTup.getFirst()) {
280 // Extract working variables
281 boolean seenliveslot = retTup.getSecond();
282 long seqn = retTup.getThird();
284 // Did need to arbitrate
285 doEnd = !doArbitration(s);
287 doOptionalRescue(s, seenliveslot, seqn, resize);
294 Slot[] array = cloud.putSlot(s, max);
296 array = new Slot[] {s};
297 rejectedmessagelist.clear();
299 if (array.length == 0)
300 throw new Error("Server Error: Did not send any slots");
301 rejectedmessagelist.add(s.getSequenceNumber());
305 /* update data structure */
306 validateandupdate(array, true);
311 public boolean createNewKey(IoTString keyName, long machineId) {
314 if (arbitratorTable.get(keyName) != null) {
315 // There is already an arbitrator
319 if (tryput(keyName, machineId, false)) {
320 // If successfully inserted
326 public void decrementLiveCount() {
330 private void setResizeThreshold() {
331 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
332 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
335 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
336 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
339 if (liveslotcount > resizethreshold) {
340 resize = true; //Resize is forced
344 newsize = (int) (numslots * RESIZE_MULTIPLE);
345 TableStatus status = new TableStatus(s, newsize);
349 doRejectedMessages(s);
351 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
353 // Resize was needed so redo call
354 if (retTup.getFirst()) {
355 return tryput(pendingTrans, true);
358 // Extract working variables
359 boolean seenliveslot = retTup.getSecond();
360 long seqn = retTup.getThird();
365 Transaction trans = new Transaction(s,
366 s.getSequenceNumber(),
368 pendingTrans.getArbitrator(),
369 pendingTrans.getKVUpdates(),
370 pendingTrans.getGuard());
371 boolean insertedTrans = false;
372 if (s.hasSpace(trans)) {
374 insertedTrans = true;
377 doOptionalRescue(s, seenliveslot, seqn, resize);
378 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
381 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
382 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
384 if (liveslotcount > resizethreshold) {
385 resize = true; //Resize is forced
389 newsize = (int) (numslots * RESIZE_MULTIPLE);
390 TableStatus status = new TableStatus(s, newsize);
394 doRejectedMessages(s);
395 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
397 // Resize was needed so redo call
398 if (retTup.getFirst()) {
399 return tryput(keyName, arbMachineid, true);
402 // Extract working variables
403 boolean seenliveslot = retTup.getSecond();
404 long seqn = retTup.getThird();
409 NewKey newKey = new NewKey(s, keyName, arbMachineid);
411 boolean insertedNewKey = false;
412 if (s.hasSpace(newKey)) {
414 insertedNewKey = true;
417 doOptionalRescue(s, seenliveslot, seqn, resize);
418 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
421 private void doRejectedMessages(Slot s) {
422 if (! rejectedmessagelist.isEmpty()) {
423 /* TODO: We should avoid generating a rejected message entry if
424 * there is already a sufficient entry in the queue (e.g.,
425 * equalsto value of true and same sequence number). */
427 long old_seqn = rejectedmessagelist.firstElement();
428 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
429 long new_seqn = rejectedmessagelist.lastElement();
430 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
435 /* Go through list of missing messages */
436 for (; i < rejectedmessagelist.size(); i++) {
437 long curr_seqn = rejectedmessagelist.get(i);
438 Slot s_msg = buffer.getSlot(curr_seqn);
441 prev_seqn = curr_seqn;
443 /* Generate rejected message entry for missing messages */
444 if (prev_seqn != -1) {
445 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
448 /* Generate rejected message entries for present messages */
449 for (; i < rejectedmessagelist.size(); i++) {
450 long curr_seqn = rejectedmessagelist.get(i);
451 Slot s_msg = buffer.getSlot(curr_seqn);
452 long machineid = s_msg.getMachineID();
453 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
460 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
461 long newestseqnum = buffer.getNewestSeqNum();
462 long oldestseqnum = buffer.getOldestSeqNum();
463 if (lastliveslotseqn < oldestseqnum)
464 lastliveslotseqn = oldestseqnum;
466 long seqn = lastliveslotseqn;
467 boolean seenliveslot = false;
468 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
469 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
473 for (; seqn < threshold; seqn++) {
474 Slot prevslot = buffer.getSlot(seqn);
475 // Push slot number forward
477 lastliveslotseqn = seqn;
479 if (! prevslot.isLive())
482 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
483 for (Entry liveentry : liveentries) {
484 if (s.hasSpace(liveentry)) {
485 s.addEntry(liveentry);
486 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
488 System.out.println("B"); //?
489 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
496 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
499 private boolean doArbitration(Slot s) {
501 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
503 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
505 // Sort from oldest to newest
506 Collections.sort(transSeqNums);
509 boolean didNeedArbitration = false;
510 for (Long transNum : transSeqNums) {
511 Transaction ut = uncommittedTransactionsMap.get(transNum);
513 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
514 // Check if this machine arbitrates for this transaction
515 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
519 // we did have something to arbitrate on
520 didNeedArbitration = true;
522 Entry newEntry = null;
525 if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
526 // Guard evaluated as true
528 // update the local tmp current key set
529 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
530 speculativeTableTmp.put(kv.getKey(), kv);
534 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
539 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
541 } catch (Exception e) {
545 if ((newEntry != null) && s.hasSpace(newEntry)) {
546 s.addEntry(newEntry);
552 return didNeedArbitration;
555 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
556 /* now go through live entries from least to greatest sequence number until
557 * either all live slots added, or the slot doesn't have enough room
558 * for SKIP_THRESHOLD consecutive entries*/
560 long newestseqnum = buffer.getNewestSeqNum();
562 for (; seqn <= newestseqnum; seqn++) {
563 Slot prevslot = buffer.getSlot(seqn);
564 //Push slot number forward
566 lastliveslotseqn = seqn;
568 if (!prevslot.isLive())
571 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
572 for (Entry liveentry : liveentries) {
573 if (s.hasSpace(liveentry))
574 s.addEntry(liveentry);
577 if (skipcount > SKIP_THRESHOLD)
584 private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
588 Slot[] array = cloud.putSlot(s, max);
590 array = new Slot[] {s};
591 rejectedmessagelist.clear();
593 if (array.length == 0)
594 throw new Error("Server Error: Did not send any slots");
595 rejectedmessagelist.add(s.getSequenceNumber());
599 validateandupdate(array, true);
603 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
604 /* The cloud communication layer has checked slot HMACs already
606 if (newslots.length == 0) return;
608 long firstseqnum = newslots[0].getSequenceNumber();
609 if (firstseqnum <= sequencenumber) {
610 throw new Error("Server Error: Sent older slots!");
613 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
614 checkHMACChain(indexer, newslots);
616 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
618 initExpectedSize(firstseqnum);
619 for (Slot slot : newslots) {
620 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
621 updateExpectedSize();
625 boolean hasGap = false;
626 /* If there is a gap, check to see if the server sent us everything. */
627 if (firstseqnum != (sequencenumber + 1)) {
630 checkNumSlots(newslots.length);
631 if (!machineSet.isEmpty()) {
632 throw new Error("Missing record for machines: " + machineSet);
638 /* Commit new to slots. */
639 for (Slot slot : newslots) {
640 buffer.putSlot(slot);
643 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
645 // Process all on key value pairs
646 proccessAllNewCommits();
648 // Go through all uncommitted transactions and kill the ones that are dead
649 deleteDeadUncommittedTransactions();
651 // Speculate on key value pairs
652 createSpeculativeTable();
655 public void proccessAllNewCommits() {
657 // Process only if there are commit
658 if (newCommitMap.keySet().size() == 0) {
662 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
664 // Sort from oldest to newest commit
665 Collections.sort(commitSeqNums);
667 // Go through each new commit one by one
668 for (Long entrySeqNum : commitSeqNums) {
669 Commit entry = newCommitMap.get(entrySeqNum);
671 long lastCommitSeenSeqNum = 0;
673 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
674 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
677 if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
679 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
681 if (prevCommit != null) {
682 prevCommit.setDead();
684 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
685 committedMapByKey.put(kv.getKey(), entry);
692 Set<Commit> commitsToEditSet = new HashSet<Commit>();
694 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
695 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
698 commitsToEditSet.remove(null);
700 for (Commit prevCommit : commitsToEditSet) {
702 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
704 if (!prevCommit.isLive()) {
705 commitMap.remove(prevCommit.getTransSequenceNumber());
709 // // Remove any old commits
710 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
711 // Commit prevCommit = i.next().getValue();
712 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
714 // if (!prevCommit.isLive()) {
719 // Remove any old commits
720 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
721 // Commit prevCommit = i.next().getValue();
723 // if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
727 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
729 // if (!prevCommit.isLive()) {
735 // Add the new commit
736 commitMap.put(entry.getTransSequenceNumber(), entry);
737 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
739 // Update the committed table list
740 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
741 IoTString key = kv.getKey();
742 commitedTable.put(key, kv);
744 committedMapByKey.put(key, entry);
748 // Clear the new commits storage so we can use it later
749 newCommitMap.clear();
752 private void deleteDeadUncommittedTransactions() {
753 // Make dead the transactions
754 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
755 Transaction prevtrans = i.next().getValue();
756 long transArb = prevtrans.getArbitrator();
758 if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
759 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
766 private void createSpeculativeTable() {
768 if (uncommittedTransactionsMap.keySet().size() == 0) {
769 speculativeTable = commitedTable; // Ok that they are the same object
773 Map speculativeTableTmp = null;
774 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
776 // Sort from oldest to newest commit
777 Collections.sort(utSeqNums);
779 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
780 speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
782 for (Long key : utSeqNums) {
783 Transaction trans = uncommittedTransactionsMap.get(key);
785 lastUncommittedTransaction = key;
788 if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
789 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
790 speculativeTableTmp.put(kv.getKey(), kv);
794 } catch (Exception e) {
799 speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
801 for (Long key : utSeqNums) {
803 if (key <= lastUncommittedTransaction) {
807 lastUncommittedTransaction = key;
809 Transaction trans = uncommittedTransactionsMap.get(key);
812 if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
813 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
814 speculativeTableTmp.put(kv.getKey(), kv);
818 } catch (Exception e) {
824 speculativeTable = speculativeTableTmp;
827 private int expectedsize, currmaxsize;
829 private void checkNumSlots(int numslots) {
830 if (numslots != expectedsize) {
831 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
835 private void initExpectedSize(long firstsequencenumber) {
836 long prevslots = firstsequencenumber;
837 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
838 currmaxsize = numslots;
841 private void updateExpectedSize() {
843 if (expectedsize > currmaxsize) {
844 expectedsize = currmaxsize;
848 private void updateCurrMaxSize(int newmaxsize) {
849 currmaxsize = newmaxsize;
852 private void commitNewMaxSize() {
853 if (numslots != currmaxsize) {
854 buffer.resize(currmaxsize);
857 numslots = currmaxsize;
858 setResizeThreshold();
861 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
862 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
865 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
866 long oldseqnum = entry.getOldSeqNum();
867 long newseqnum = entry.getNewSeqNum();
868 boolean isequal = entry.getEqual();
869 long machineid = entry.getMachineID();
870 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
871 Slot slot = indexer.getSlot(seqnum);
873 long slotmachineid = slot.getMachineID();
874 if (isequal != (slotmachineid == machineid)) {
875 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
880 HashSet<Long> watchset = new HashSet<Long>();
881 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
882 long entry_mid = lastmsg_entry.getKey();
883 /* We've seen it, don't need to continue to watch. Our next
884 * message will implicitly acknowledge it. */
885 if (entry_mid == localmachineid)
887 Pair<Long, Liveness> v = lastmsg_entry.getValue();
888 long entry_seqn = v.getFirst();
889 if (entry_seqn < newseqnum) {
890 addWatchList(entry_mid, entry);
891 watchset.add(entry_mid);
894 if (watchset.isEmpty())
897 entry.setWatchSet(watchset);
900 private void processEntry(NewKey entry) {
901 arbitratorTable.put(entry.getKey(), entry.getMachineID());
903 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
905 if (oldNewKey != null) {
910 private void processEntry(Transaction entry) {
911 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
913 // Duplicate so delete old copy
914 if (prevTrans != null) {
919 private void processEntry(Abort entry) {
921 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
922 // Abort has not been seen yet so we need to keep track of it
924 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
925 if (prevAbort != null) {
926 prevAbort.setDead(); // delete old version of the duplicate
929 // The machine already saw this so it is dead
933 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
936 private void processEntry(Commit entry, Slot s) {
937 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
938 if (prevCommit != null) {
939 prevCommit.setDead();
943 private void processEntry(TableStatus entry) {
944 int newnumslots = entry.getMaxSlots();
945 updateCurrMaxSize(newnumslots);
946 if (lastTableStatus != null)
947 lastTableStatus.setDead();
948 lastTableStatus = entry;
951 private void addWatchList(long machineid, RejectedMessage entry) {
952 HashSet<RejectedMessage> entries = watchlist.get(machineid);
954 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
958 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
959 machineSet.remove(machineid);
961 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
962 if (watchset != null) {
963 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
964 RejectedMessage rm = rmit.next();
965 if (rm.getNewSeqNum() <= seqnum) {
966 /* Remove it from our watchlist */
968 /* Decrement machines that need to see this notification */
969 rm.removeWatcher(machineid);
974 if (machineid == localmachineid) {
975 /* Our own messages are immediately dead. */
976 if (liveness instanceof LastMessage) {
977 ((LastMessage)liveness).setDead();
978 } else if (liveness instanceof Slot) {
979 ((Slot)liveness).setDead();
981 throw new Error("Unrecognized type");
985 // Set dead the abort
986 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
987 Abort abort = i.next().getValue();
989 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
995 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
996 if (lastmsgentry == null)
999 long lastmsgseqnum = lastmsgentry.getFirst();
1000 Liveness lastentry = lastmsgentry.getSecond();
1001 if (machineid != localmachineid) {
1002 if (lastentry instanceof LastMessage) {
1003 ((LastMessage)lastentry).setDead();
1004 } else if (lastentry instanceof Slot) {
1005 ((Slot)lastentry).setDead();
1007 throw new Error("Unrecognized type");
1011 if (machineid == localmachineid) {
1012 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1013 throw new Error("Server Error: Mismatch on local machine sequence number");
1015 if (lastmsgseqnum > seqnum)
1016 throw new Error("Server Error: Rollback on remote machine sequence number");
1020 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1021 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1022 for (Entry entry : slot.getEntries()) {
1023 switch (entry.getType()) {
1025 case Entry.TypeNewKey:
1026 processEntry((NewKey)entry);
1029 case Entry.TypeCommit:
1030 processEntry((Commit)entry, slot);
1033 case Entry.TypeAbort:
1034 processEntry((Abort)entry);
1037 case Entry.TypeTransaction:
1038 processEntry((Transaction)entry);
1041 case Entry.TypeLastMessage:
1042 processEntry((LastMessage)entry, machineSet);
1045 case Entry.TypeRejectedMessage:
1046 processEntry((RejectedMessage)entry, indexer);
1049 case Entry.TypeTableStatus:
1050 processEntry((TableStatus)entry);
1054 throw new Error("Unrecognized type: " + entry.getType());
1059 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1060 for (int i = 0; i < newslots.length; i++) {
1061 Slot currslot = newslots[i];
1062 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1063 if (prevslot != null &&
1064 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1065 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);