2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
19 * IoTTable data structure. Provides client inferface.
20 * @author Brian Demsky
24 final public class Table {
25 private int numslots; //number of slots stored in buffer
27 //table of key-value pairs
28 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
30 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
33 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34 private Vector<Long> rejectedmessagelist = new Vector<Long>();
35 private SlotBuffer buffer;
36 private CloudComm cloud;
37 private long sequencenumber; //Largest sequence number a client has received
38 private long localmachineid;
39 private TableStatus lastTableStatus;
40 static final int FREE_SLOTS = 10; //number of slots that should be kept free
41 static final int SKIP_THRESHOLD = 10;
42 public long liveslotcount = 0; // TODO: MAKE PRIVATE
44 static final double RESIZE_MULTIPLE = 1.2;
45 static final double RESIZE_THRESHOLD = 0.75;
46 static final int REJECTED_THRESHOLD = 5;
47 public int resizethreshold; // TODO: MAKE PRIVATE
48 private long lastliveslotseqn; //smallest sequence number with a live entry
49 private Random random = new Random();
50 private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
52 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54 private List<Commit> commitList = null; // List of all the most recent live commits
55 private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
57 private Set<Abort> abortSet = null; // Set of the live aborts
58 public Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV TODO: Make Private
59 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
60 public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
61 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
62 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
63 // private Set<Abort> arbitratorTable = null; // Table of arbitrators
64 private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
68 public Table(String baseurl, String password, long _localmachineid) {
69 localmachineid = _localmachineid;
70 buffer = new SlotBuffer();
71 numslots = buffer.capacity();
74 cloud = new CloudComm(this, baseurl, password);
80 public Table(CloudComm _cloud, long _localmachineid) {
81 localmachineid = _localmachineid;
82 buffer = new SlotBuffer();
83 numslots = buffer.capacity();
91 private void setupDataStructs() {
92 pendingTransQueue = new LinkedList<PendingTransaction>();
93 commitList = new LinkedList<Commit>();
94 abortSet = new HashSet<Abort>();
95 commitedTable = new HashMap<IoTString, KeyValue>();
96 speculativeTable = new HashMap<IoTString, KeyValue>();
97 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
98 arbitratorTable = new HashMap<IoTString, Long>();
99 newKeyTable = new HashMap<IoTString, NewKey>();
100 newCommitMap = new HashMap<Long, Commit> ();
103 public void rebuild() {
104 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
105 validateandupdate(newslots, true);
108 // TODO: delete method
109 public void printSlots() {
110 long o = buffer.getOldestSeqNum();
111 long n = buffer.getNewestSeqNum();
113 int[] types = new int[10];
119 for (long i = o; i < (n + 1); i++) {
120 Slot s = buffer.getSlot(i);
122 Vector<Entry> entries = s.getEntries();
124 for (Entry e : entries) {
126 int type = e.getType();
127 types[type] = types[type] + 1;
136 for (int i = 0; i < 10; i++) {
137 System.out.println(i + " " + types[i]);
139 System.out.println("Live count: " + livec);
140 System.out.println("Dead count: " + deadc);
141 System.out.println("Old: " + o);
142 System.out.println("New: " + n);
143 System.out.println("Size: " + buffer.size());
144 System.out.println("Commits Map: " + commitedTable.size());
145 System.out.println("Commits List: " + commitList.size());
148 public IoTString getCommitted(IoTString key) {
149 KeyValue kv = commitedTable.get(key);
151 return kv.getValue();
157 public IoTString getSpeculative(IoTString key) {
158 KeyValue kv = speculativeTable.get(key);
160 return kv.getValue();
166 public void initTable() {
167 cloud.setSalt();//Set the salt
168 Slot s = new Slot(this, 1, localmachineid);
169 TableStatus status = new TableStatus(s, numslots);
171 Slot[] array = cloud.putSlot(s, numslots);
173 array = new Slot[] {s};
174 /* update data structure */
175 validateandupdate(array, true);
177 throw new Error("Error on initialization");
181 public String toString() {
182 String retString = " Committed Table: \n";
183 retString += "---------------------------\n";
184 retString += commitedTable.toString();
188 retString += " Speculative Table: \n";
189 retString += "---------------------------\n";
190 retString += speculativeTable.toString();
195 public void startTransaction() {
196 // Create a new transaction, invalidates any old pending transactions.
197 pendingTransBuild = new PendingTransaction();
200 public void commitTransaction() {
202 if (pendingTransBuild.getKVUpdates().size() == 0) {
203 // If no updates are made then there is no point inserting into the chain
207 // Add the pending transaction to the queue
208 pendingTransQueue.add(pendingTransBuild);
210 while (!pendingTransQueue.isEmpty()) {
211 if (tryput( pendingTransQueue.peek(), false)) {
212 pendingTransQueue.poll();
217 public void addKV(IoTString key, IoTString value) {
219 if (arbitratorTable.get(key) == null) {
220 throw new Error("Key not Found.");
223 // Make sure new key value pair matches the current arbitrator
224 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
225 // TODO: Maybe not throw en error
226 throw new Error("Not all Key Values Match.");
229 KeyValue kv = new KeyValue(key, value);
230 pendingTransBuild.addKV(kv);
233 public void addGuard(Guard guard) {
234 pendingTransBuild.addGuard(guard);
237 public void update() {
239 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
241 validateandupdate(newslots, false);
243 if (uncommittedTransactionsMap.keySet().size() > 0) {
245 boolean doEnd = false;
246 boolean needResize = false;
247 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
248 boolean resize = needResize;
251 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
253 if (liveslotcount > resizethreshold) {
254 resize = true; //Resize is forced
258 newsize = (int) (numslots * RESIZE_MULTIPLE);
259 TableStatus status = new TableStatus(s, newsize);
263 doRejectedMessages(s);
265 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
267 // Resize was needed so redo call
268 if (retTup.getFirst()) {
273 // Extract working variables
274 boolean seenliveslot = retTup.getSecond();
275 long seqn = retTup.getThird();
277 // Did need to arbitrate
278 doEnd = !doArbitration(s);
280 doOptionalRescue(s, seenliveslot, seqn, resize);
287 Slot[] array = cloud.putSlot(s, max);
289 array = new Slot[] {s};
290 rejectedmessagelist.clear();
292 if (array.length == 0)
293 throw new Error("Server Error: Did not send any slots");
294 rejectedmessagelist.add(s.getSequenceNumber());
298 /* update data structure */
299 validateandupdate(array, true);
304 public boolean createNewKey(IoTString keyName, long machineId) {
307 if (arbitratorTable.get(keyName) != null) {
308 // There is already an arbitrator
312 if (tryput(keyName, machineId, false)) {
314 // If successfully inserted
320 public void decrementLiveCount() {
322 // System.out.println("Decrement Live Count");
325 private void setResizeThreshold() {
326 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
327 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
330 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
331 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
334 if (liveslotcount > resizethreshold) {
335 resize = true; //Resize is forced
336 System.out.println("Live count resize: " + liveslotcount + " " + resizethreshold);
341 newsize = (int) (numslots * RESIZE_MULTIPLE);
343 System.out.println("New Size: " + newsize + " old: " + buffer.oldestseqn); // TODO remove
345 TableStatus status = new TableStatus(s, newsize);
349 doRejectedMessages(s);
353 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
355 // Resize was needed so redo call
356 if (retTup.getFirst()) {
357 return tryput(pendingTrans, true);
360 // Extract working variables
361 boolean seenliveslot = retTup.getSecond();
362 long seqn = retTup.getThird();
366 Transaction trans = new Transaction(s,
367 s.getSequenceNumber(),
369 pendingTrans.getKVUpdates(),
370 pendingTrans.getGuard());
371 boolean insertedTrans = false;
372 if (s.hasSpace(trans)) {
374 insertedTrans = true;
377 doOptionalRescue(s, seenliveslot, seqn, resize);
378 insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
381 // System.out.println("Inserted: " + trans.getSequenceNumber());
384 return insertedTrans;
387 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
388 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
390 if (liveslotcount > resizethreshold) {
391 resize = true; //Resize is forced
395 newsize = (int) (numslots * RESIZE_MULTIPLE);
396 TableStatus status = new TableStatus(s, newsize);
400 doRejectedMessages(s);
401 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
403 // Resize was needed so redo call
404 if (retTup.getFirst()) {
405 return tryput(keyName, arbMachineid, true);
408 // Extract working variables
409 boolean seenliveslot = retTup.getSecond();
410 long seqn = retTup.getThird();
415 NewKey newKey = new NewKey(s, keyName, arbMachineid);
417 boolean insertedNewKey = false;
418 if (s.hasSpace(newKey)) {
420 insertedNewKey = true;
423 doOptionalRescue(s, seenliveslot, seqn, resize);
424 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
427 private void doRejectedMessages(Slot s) {
428 if (! rejectedmessagelist.isEmpty()) {
429 /* TODO: We should avoid generating a rejected message entry if
430 * there is already a sufficient entry in the queue (e.g.,
431 * equalsto value of true and same sequence number). */
433 long old_seqn = rejectedmessagelist.firstElement();
434 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
435 long new_seqn = rejectedmessagelist.lastElement();
436 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
441 /* Go through list of missing messages */
442 for (; i < rejectedmessagelist.size(); i++) {
443 long curr_seqn = rejectedmessagelist.get(i);
444 Slot s_msg = buffer.getSlot(curr_seqn);
447 prev_seqn = curr_seqn;
449 /* Generate rejected message entry for missing messages */
450 if (prev_seqn != -1) {
451 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
454 /* Generate rejected message entries for present messages */
455 for (; i < rejectedmessagelist.size(); i++) {
456 long curr_seqn = rejectedmessagelist.get(i);
457 Slot s_msg = buffer.getSlot(curr_seqn);
458 long machineid = s_msg.getMachineID();
459 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
466 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
467 long newestseqnum = buffer.getNewestSeqNum();
468 long oldestseqnum = buffer.getOldestSeqNum();
469 if (lastliveslotseqn < oldestseqnum)
470 lastliveslotseqn = oldestseqnum;
472 long seqn = lastliveslotseqn;
473 boolean seenliveslot = false;
474 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
475 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
479 for (; seqn < threshold; seqn++) {
480 Slot prevslot = buffer.getSlot(seqn);
481 // Push slot number forward
483 lastliveslotseqn = seqn;
485 if (! prevslot.isLive())
488 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
489 for (Entry liveentry : liveentries) {
490 if (s.hasSpace(liveentry)) {
491 s.addEntry(liveentry);
492 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
494 System.out.println("B"); //?
496 System.out.println("==============================NEEEEDDDD RESIZING");
497 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
504 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
507 private boolean doArbitration(Slot s) {
509 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
511 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
513 // Sort from oldest to newest
514 Collections.sort(transSeqNums);
517 boolean didNeedArbitration = false;
518 for (Long transNum : transSeqNums) {
519 Transaction ut = uncommittedTransactionsMap.get(transNum);
521 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
522 // Check if this machine arbitrates for this transaction
523 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
527 // we did have something to arbitrate on
528 didNeedArbitration = true;
530 Entry newEntry = null;
533 if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
534 // Guard evaluated as true
536 // update the local tmp current key set
537 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
538 speculativeTableTmp.put(kv.getKey(), kv);
542 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
547 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
549 } catch (Exception e) {
553 if ((newEntry != null) && s.hasSpace(newEntry)) {
554 s.addEntry(newEntry);
560 return didNeedArbitration;
563 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
564 /* now go through live entries from least to greatest sequence number until
565 * either all live slots added, or the slot doesn't have enough room
566 * for SKIP_THRESHOLD consecutive entries*/
568 long newestseqnum = buffer.getNewestSeqNum();
570 for (; seqn <= newestseqnum; seqn++) {
571 Slot prevslot = buffer.getSlot(seqn);
572 //Push slot number forward
574 lastliveslotseqn = seqn;
576 if (!prevslot.isLive())
579 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
580 for (Entry liveentry : liveentries) {
581 if (s.hasSpace(liveentry))
582 s.addEntry(liveentry);
585 if (skipcount > SKIP_THRESHOLD)
592 private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
596 Slot[] array = cloud.putSlot(s, max);
598 array = new Slot[] {s};
599 rejectedmessagelist.clear();
601 if (array.length == 0)
602 throw new Error("Server Error: Did not send any slots");
603 rejectedmessagelist.add(s.getSequenceNumber());
607 /* update data structure */
608 validateandupdate(array, true);
613 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
614 /* The cloud communication layer has checked slot HMACs already
616 if (newslots.length == 0) return;
618 long firstseqnum = newslots[0].getSequenceNumber();
619 if (firstseqnum <= sequencenumber) {
620 throw new Error("Server Error: Sent older slots!");
623 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
624 checkHMACChain(indexer, newslots);
626 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
628 initExpectedSize(firstseqnum);
629 for (Slot slot : newslots) {
630 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
631 updateExpectedSize();
634 proccessAllNewCommits();
636 /* If there is a gap, check to see if the server sent us everything. */
637 if (firstseqnum != (sequencenumber + 1)) {
640 checkNumSlots(newslots.length);
641 if (!machineSet.isEmpty()) {
642 throw new Error("Missing record for machines: " + machineSet);
648 /* Commit new to slots. */
649 for (Slot slot : newslots) {
650 buffer.putSlot(slot);
653 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
655 // Speculate on key value pairs
656 createSpeculativeTable();
659 public void proccessAllNewCommits() {
661 // Process only if there are commit
662 if (newCommitMap.keySet().size() == 0) {
666 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
668 // Sort from oldest to newest commit
669 Collections.sort(commitSeqNums);
671 // Go through each new commit one by one
672 for (Long entrySeqNum : commitSeqNums) {
673 Commit entry = newCommitMap.get(entrySeqNum);
675 if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
677 // Remove any old commits
678 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
679 Commit prevcommit = i.next();
681 if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
682 prevcommit.setDead();
686 commitList.add(entry);
690 // Remove any old commits
691 for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
692 Commit prevcommit = i.next();
693 prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
695 if (!prevcommit.isLive()) {
700 // Add the new commit
701 commitList.add(entry);
702 lastCommitSeenSeqNum = entry.getTransSequenceNumber();
703 // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
706 // Update the committed table list
707 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
708 IoTString key = kv.getKey();
709 commitedTable.put(key, kv);
712 long committedTransSeq = entry.getTransSequenceNumber();
714 // Make dead the transactions
715 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
716 Transaction prevtrans = i.next().getValue();
718 if (prevtrans.getSequenceNumber() <= committedTransSeq) {
726 // Clear the new commits storage so we can use it later
727 newCommitMap.clear();
730 private void createSpeculativeTable() {
731 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
732 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
734 // Sort from oldest to newest commit
735 Collections.sort(utSeqNums);
738 for (Long key : utSeqNums) {
739 Transaction trans = uncommittedTransactionsMap.get(key);
742 if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
743 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
744 speculativeTableTmp.put(kv.getKey(), kv);
748 } catch (Exception e) {
753 speculativeTable = speculativeTableTmp;
756 private int expectedsize, currmaxsize;
758 private void checkNumSlots(int numslots) {
759 if (numslots != expectedsize) {
760 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
764 private void initExpectedSize(long firstsequencenumber) {
765 long prevslots = firstsequencenumber;
766 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
767 currmaxsize = numslots;
770 private void updateExpectedSize() {
772 if (expectedsize > currmaxsize) {
773 expectedsize = currmaxsize;
777 private void updateCurrMaxSize(int newmaxsize) {
778 currmaxsize = newmaxsize;
781 private void commitNewMaxSize() {
782 if (numslots != currmaxsize) {
783 System.out.println("Resizing the buffer"); // TODO: Remove
784 buffer.resize(currmaxsize);
787 numslots = currmaxsize;
788 setResizeThreshold();
791 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
792 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
795 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
796 long oldseqnum = entry.getOldSeqNum();
797 long newseqnum = entry.getNewSeqNum();
798 boolean isequal = entry.getEqual();
799 long machineid = entry.getMachineID();
800 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
801 Slot slot = indexer.getSlot(seqnum);
803 long slotmachineid = slot.getMachineID();
804 if (isequal != (slotmachineid == machineid)) {
805 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
810 HashSet<Long> watchset = new HashSet<Long>();
811 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
812 long entry_mid = lastmsg_entry.getKey();
813 /* We've seen it, don't need to continue to watch. Our next
814 * message will implicitly acknowledge it. */
815 if (entry_mid == localmachineid)
817 Pair<Long, Liveness> v = lastmsg_entry.getValue();
818 long entry_seqn = v.getFirst();
819 if (entry_seqn < newseqnum) {
820 addWatchList(entry_mid, entry);
821 watchset.add(entry_mid);
824 if (watchset.isEmpty())
827 entry.setWatchSet(watchset);
830 private void processEntry(NewKey entry) {
831 arbitratorTable.put(entry.getKey(), entry.getMachineID());
833 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
835 if (oldNewKey != null) {
840 private void processEntry(Transaction entry) {
841 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
843 // Duplicate so delete old copy
844 if (prevTrans != null) {
849 private void processEntry(Abort entry) {
851 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
852 // Abort has not been seen yet so we need to keep track of it
855 // The machine already saw this so it is dead
859 // Make dead the transactions
860 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
861 Transaction prevtrans = i.next().getValue();
863 if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
870 private void processEntry(Commit entry, Slot s) {
871 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
872 if (prevCommit != null) {
873 prevCommit.setDead();
877 private void processEntry(TableStatus entry) {
878 int newnumslots = entry.getMaxSlots();
879 updateCurrMaxSize(newnumslots);
880 if (lastTableStatus != null)
881 lastTableStatus.setDead();
882 lastTableStatus = entry;
886 private void addWatchList(long machineid, RejectedMessage entry) {
887 HashSet<RejectedMessage> entries = watchlist.get(machineid);
889 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
893 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
894 machineSet.remove(machineid);
896 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
897 if (watchset != null) {
898 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
899 RejectedMessage rm = rmit.next();
900 if (rm.getNewSeqNum() <= seqnum) {
901 /* Remove it from our watchlist */
903 /* Decrement machines that need to see this notification */
904 rm.removeWatcher(machineid);
909 if (machineid == localmachineid) {
910 /* Our own messages are immediately dead. */
911 if (liveness instanceof LastMessage) {
912 ((LastMessage)liveness).setDead();
913 } else if (liveness instanceof Slot) {
914 ((Slot)liveness).setDead();
916 throw new Error("Unrecognized type");
920 // Set dead the abort
921 for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
922 Abort abort = ait.next();
924 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
931 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
932 if (lastmsgentry == null)
935 long lastmsgseqnum = lastmsgentry.getFirst();
936 Liveness lastentry = lastmsgentry.getSecond();
937 if (machineid != localmachineid) {
938 if (lastentry instanceof LastMessage) {
939 ((LastMessage)lastentry).setDead();
940 } else if (lastentry instanceof Slot) {
941 ((Slot)lastentry).setDead();
943 throw new Error("Unrecognized type");
947 if (machineid == localmachineid) {
948 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
949 throw new Error("Server Error: Mismatch on local machine sequence number");
951 if (lastmsgseqnum > seqnum)
952 throw new Error("Server Error: Rollback on remote machine sequence number");
956 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
957 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
958 for (Entry entry : slot.getEntries()) {
959 switch (entry.getType()) {
961 case Entry.TypeNewKey:
962 processEntry((NewKey)entry);
965 case Entry.TypeCommit:
966 processEntry((Commit)entry, slot);
969 case Entry.TypeAbort:
970 processEntry((Abort)entry);
973 case Entry.TypeTransaction:
974 processEntry((Transaction)entry);
977 case Entry.TypeLastMessage:
978 processEntry((LastMessage)entry, machineSet);
981 case Entry.TypeRejectedMessage:
982 processEntry((RejectedMessage)entry, indexer);
985 case Entry.TypeTableStatus:
986 processEntry((TableStatus)entry);
990 throw new Error("Unrecognized type: " + entry.getType());
995 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
996 for (int i = 0; i < newslots.length; i++) {
997 Slot currslot = newslots[i];
998 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
999 if (prevslot != null &&
1000 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1001 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);