2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
19 * IoTTable data structure. Provides client inferface.
20 * @author Brian Demsky
24 final public class Table {
25 private int numslots; //number of slots stored in buffer
27 //table of key-value pairs
28 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
30 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
33 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34 private Vector<Long> rejectedmessagelist = new Vector<Long>();
35 private SlotBuffer buffer;
36 private CloudComm cloud;
37 private long sequencenumber; //Largest sequence number a client has received
38 private long localmachineid;
39 private TableStatus lastTableStatus;
40 static final int FREE_SLOTS = 10; //number of slots that should be kept free
41 static final int SKIP_THRESHOLD = 10;
42 public long liveslotcount = 0; // TODO: MAKE PRIVATE
44 static final double RESIZE_MULTIPLE = 1.2;
45 static final double RESIZE_THRESHOLD = 0.75;
46 static final int REJECTED_THRESHOLD = 5;
47 public int resizethreshold; // TODO: MAKE PRIVATE
48 private long lastliveslotseqn; //smallest sequence number with a live entry
49 private Random random = new Random();
50 private long lastUncommittedTransaction = 0;
52 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
53 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
54 private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
55 private Map<Long, Abort> abortMap = null; // Set of the live aborts
56 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV TODO: Make Private
57 public Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV TODO: Make Private
58 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
59 public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
60 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
61 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
62 // private Set<Abort> arbitratorTable = null; // Table of arbitrators
63 private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
64 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
65 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
69 public Table(String baseurl, String password, long _localmachineid) {
70 localmachineid = _localmachineid;
71 buffer = new SlotBuffer();
72 numslots = buffer.capacity();
75 cloud = new CloudComm(this, baseurl, password);
81 public Table(CloudComm _cloud, long _localmachineid) {
82 localmachineid = _localmachineid;
83 buffer = new SlotBuffer();
84 numslots = buffer.capacity();
92 private void setupDataStructs() {
93 pendingTransQueue = new LinkedList<PendingTransaction>();
94 commitMap = new HashMap<Long, Commit>();
95 abortMap = new HashMap<Long, Abort>();
96 committedMapByKey = new HashMap<IoTString, Commit>();
97 commitedTable = new HashMap<IoTString, KeyValue>();
98 speculativeTable = new HashMap<IoTString, KeyValue>();
99 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
100 arbitratorTable = new HashMap<IoTString, Long>();
101 newKeyTable = new HashMap<IoTString, NewKey>();
102 newCommitMap = new HashMap<Long, Commit>();
103 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
104 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
107 public void rebuild() {
108 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
109 validateandupdate(newslots, true);
112 // TODO: delete method
113 public void printSlots() {
114 long o = buffer.getOldestSeqNum();
115 long n = buffer.getNewestSeqNum();
117 int[] types = new int[10];
123 for (long i = o; i < (n + 1); i++) {
124 Slot s = buffer.getSlot(i);
126 Vector<Entry> entries = s.getEntries();
128 for (Entry e : entries) {
130 int type = e.getType();
131 types[type] = types[type] + 1;
140 for (int i = 0; i < 10; i++) {
141 System.out.println(i + " " + types[i]);
143 System.out.println("Live count: " + livec);
144 System.out.println("Dead count: " + deadc);
145 System.out.println("Old: " + o);
146 System.out.println("New: " + n);
147 System.out.println("Size: " + buffer.size());
148 System.out.println("Commits Map: " + commitedTable.size());
149 System.out.println("Commits List: " + commitMap.size());
152 public IoTString getCommitted(IoTString key) {
153 KeyValue kv = commitedTable.get(key);
155 return kv.getValue();
161 public IoTString getSpeculative(IoTString key) {
162 KeyValue kv = speculativeTable.get(key);
164 return kv.getValue();
170 public void initTable() {
171 cloud.setSalt();//Set the salt
172 Slot s = new Slot(this, 1, localmachineid);
173 TableStatus status = new TableStatus(s, numslots);
175 Slot[] array = cloud.putSlot(s, numslots);
177 array = new Slot[] {s};
178 /* update data structure */
179 validateandupdate(array, true);
181 throw new Error("Error on initialization");
185 public String toString() {
186 String retString = " Committed Table: \n";
187 retString += "---------------------------\n";
188 retString += commitedTable.toString();
192 retString += " Speculative Table: \n";
193 retString += "---------------------------\n";
194 retString += speculativeTable.toString();
199 public void startTransaction() {
200 // Create a new transaction, invalidates any old pending transactions.
201 pendingTransBuild = new PendingTransaction();
204 public void commitTransaction() {
206 if (pendingTransBuild.getKVUpdates().size() == 0) {
207 // If no updates are made then there is no point inserting into the chain
211 // Add the pending transaction to the queue
212 pendingTransQueue.add(pendingTransBuild);
214 while (!pendingTransQueue.isEmpty()) {
215 if (tryput( pendingTransQueue.peek(), false)) {
216 pendingTransQueue.poll();
221 public void addKV(IoTString key, IoTString value) {
223 if (arbitratorTable.get(key) == null) {
224 throw new Error("Key not Found.");
227 // Make sure new key value pair matches the current arbitrator
228 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
229 // TODO: Maybe not throw en error
230 throw new Error("Not all Key Values Match.");
233 KeyValue kv = new KeyValue(key, value);
234 pendingTransBuild.addKV(kv);
237 public void addGuard(Guard guard) {
238 pendingTransBuild.addGuard(guard);
241 public void update() {
243 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
245 validateandupdate(newslots, false);
247 if (uncommittedTransactionsMap.keySet().size() > 0) {
249 boolean doEnd = false;
250 boolean needResize = false;
251 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
252 boolean resize = needResize;
255 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
257 if (liveslotcount > resizethreshold) {
258 resize = true; //Resize is forced
262 newsize = (int) (numslots * RESIZE_MULTIPLE);
263 TableStatus status = new TableStatus(s, newsize);
267 doRejectedMessages(s);
269 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
271 // Resize was needed so redo call
272 if (retTup.getFirst()) {
277 // Extract working variables
278 boolean seenliveslot = retTup.getSecond();
279 long seqn = retTup.getThird();
281 // Did need to arbitrate
282 doEnd = !doArbitration(s);
284 doOptionalRescue(s, seenliveslot, seqn, resize);
291 Slot[] array = cloud.putSlot(s, max);
293 array = new Slot[] {s};
294 rejectedmessagelist.clear();
296 if (array.length == 0)
297 throw new Error("Server Error: Did not send any slots");
298 rejectedmessagelist.add(s.getSequenceNumber());
302 /* update data structure */
303 validateandupdate(array, true);
308 public boolean createNewKey(IoTString keyName, long machineId) {
311 if (arbitratorTable.get(keyName) != null) {
312 // There is already an arbitrator
316 if (tryput(keyName, machineId, false)) {
318 // If successfully inserted
324 public void decrementLiveCount() {
326 // System.out.println("Decrement Live Count");
329 private void setResizeThreshold() {
330 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
331 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
334 private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
335 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
338 if (liveslotcount > resizethreshold) {
339 resize = true; //Resize is forced
343 newsize = (int) (numslots * RESIZE_MULTIPLE);
344 TableStatus status = new TableStatus(s, newsize);
348 doRejectedMessages(s);
350 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
352 // Resize was needed so redo call
353 if (retTup.getFirst()) {
354 return tryput(pendingTrans, true);
357 // Extract working variables
358 boolean seenliveslot = retTup.getSecond();
359 long seqn = retTup.getThird();
364 Transaction trans = new Transaction(s,
365 s.getSequenceNumber(),
367 pendingTrans.getArbitrator(),
368 pendingTrans.getKVUpdates(),
369 pendingTrans.getGuard());
370 boolean insertedTrans = false;
371 if (s.hasSpace(trans)) {
373 insertedTrans = true;
376 doOptionalRescue(s, seenliveslot, seqn, resize);
377 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
380 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
381 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
383 if (liveslotcount > resizethreshold) {
384 resize = true; //Resize is forced
388 newsize = (int) (numslots * RESIZE_MULTIPLE);
389 TableStatus status = new TableStatus(s, newsize);
393 doRejectedMessages(s);
394 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
396 // Resize was needed so redo call
397 if (retTup.getFirst()) {
398 return tryput(keyName, arbMachineid, true);
401 // Extract working variables
402 boolean seenliveslot = retTup.getSecond();
403 long seqn = retTup.getThird();
408 NewKey newKey = new NewKey(s, keyName, arbMachineid);
410 boolean insertedNewKey = false;
411 if (s.hasSpace(newKey)) {
413 insertedNewKey = true;
416 doOptionalRescue(s, seenliveslot, seqn, resize);
417 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
420 private void doRejectedMessages(Slot s) {
421 if (! rejectedmessagelist.isEmpty()) {
422 /* TODO: We should avoid generating a rejected message entry if
423 * there is already a sufficient entry in the queue (e.g.,
424 * equalsto value of true and same sequence number). */
426 long old_seqn = rejectedmessagelist.firstElement();
427 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
428 long new_seqn = rejectedmessagelist.lastElement();
429 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
434 /* Go through list of missing messages */
435 for (; i < rejectedmessagelist.size(); i++) {
436 long curr_seqn = rejectedmessagelist.get(i);
437 Slot s_msg = buffer.getSlot(curr_seqn);
440 prev_seqn = curr_seqn;
442 /* Generate rejected message entry for missing messages */
443 if (prev_seqn != -1) {
444 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
447 /* Generate rejected message entries for present messages */
448 for (; i < rejectedmessagelist.size(); i++) {
449 long curr_seqn = rejectedmessagelist.get(i);
450 Slot s_msg = buffer.getSlot(curr_seqn);
451 long machineid = s_msg.getMachineID();
452 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
459 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
460 long newestseqnum = buffer.getNewestSeqNum();
461 long oldestseqnum = buffer.getOldestSeqNum();
462 if (lastliveslotseqn < oldestseqnum)
463 lastliveslotseqn = oldestseqnum;
465 long seqn = lastliveslotseqn;
466 boolean seenliveslot = false;
467 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
468 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
472 for (; seqn < threshold; seqn++) {
473 Slot prevslot = buffer.getSlot(seqn);
474 // Push slot number forward
476 lastliveslotseqn = seqn;
478 if (! prevslot.isLive())
481 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
482 for (Entry liveentry : liveentries) {
483 if (s.hasSpace(liveentry)) {
484 s.addEntry(liveentry);
485 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
487 System.out.println("B"); //?
490 System.out.println("==============================NEEEEDDDD RESIZING");
491 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
498 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
501 private boolean doArbitration(Slot s) {
503 Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
505 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
507 // Sort from oldest to newest
508 Collections.sort(transSeqNums);
511 boolean didNeedArbitration = false;
512 for (Long transNum : transSeqNums) {
513 Transaction ut = uncommittedTransactionsMap.get(transNum);
515 KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
516 // Check if this machine arbitrates for this transaction
517 if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
520 // if (localmachineid == 351) {
521 // System.out.println("Mis match Machine: " + localmachineid + " Key: " + keyVal.getKey().toString());
527 // if (localmachineid == 351) {
528 // System.out.println("Full Match Machine: " + localmachineid + " Key: " + keyVal.getKey().toString());
532 // we did have something to arbitrate on
533 didNeedArbitration = true;
535 Entry newEntry = null;
538 if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
539 // Guard evaluated as true
541 // update the local tmp current key set
542 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
543 speculativeTableTmp.put(kv.getKey(), kv);
547 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
552 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
554 } catch (Exception e) {
558 if ((newEntry != null) && s.hasSpace(newEntry)) {
559 s.addEntry(newEntry);
565 return didNeedArbitration;
568 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
569 /* now go through live entries from least to greatest sequence number until
570 * either all live slots added, or the slot doesn't have enough room
571 * for SKIP_THRESHOLD consecutive entries*/
573 long newestseqnum = buffer.getNewestSeqNum();
575 for (; seqn <= newestseqnum; seqn++) {
576 Slot prevslot = buffer.getSlot(seqn);
577 //Push slot number forward
579 lastliveslotseqn = seqn;
581 if (!prevslot.isLive())
584 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
585 for (Entry liveentry : liveentries) {
586 if (s.hasSpace(liveentry))
587 s.addEntry(liveentry);
590 if (skipcount > SKIP_THRESHOLD)
597 private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
601 Slot[] array = cloud.putSlot(s, max);
603 array = new Slot[] {s};
604 rejectedmessagelist.clear();
606 if (array.length == 0)
607 throw new Error("Server Error: Did not send any slots");
608 rejectedmessagelist.add(s.getSequenceNumber());
613 // TODO remove Timers
614 // long startTime = System.currentTimeMillis();
615 /* update data structure */
616 validateandupdate(array, true);
617 // long endTime = System.currentTimeMillis();
619 // long diff = endTime - startTime;
621 // System.out.println("Time Taken: " + diff);
627 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
628 /* The cloud communication layer has checked slot HMACs already
630 if (newslots.length == 0) return;
632 long firstseqnum = newslots[0].getSequenceNumber();
633 if (firstseqnum <= sequencenumber) {
634 throw new Error("Server Error: Sent older slots!");
637 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
638 checkHMACChain(indexer, newslots);
640 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
642 initExpectedSize(firstseqnum);
643 for (Slot slot : newslots) {
644 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
645 updateExpectedSize();
649 boolean hasGap = false;
650 /* If there is a gap, check to see if the server sent us everything. */
651 if (firstseqnum != (sequencenumber + 1)) {
654 checkNumSlots(newslots.length);
655 if (!machineSet.isEmpty()) {
656 throw new Error("Missing record for machines: " + machineSet);
662 /* Commit new to slots. */
663 for (Slot slot : newslots) {
664 buffer.putSlot(slot);
667 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
669 // Process all on key value pairs
670 proccessAllNewCommits();
672 // Go through all uncommitted transactions and kill the ones that are dead
673 deleteDeadUncommittedTransactions();
675 // Speculate on key value pairs
676 createSpeculativeTable();
679 public void proccessAllNewCommits() {
681 // Process only if there are commit
682 if (newCommitMap.keySet().size() == 0) {
686 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
688 // Sort from oldest to newest commit
689 Collections.sort(commitSeqNums);
691 // Go through each new commit one by one
692 for (Long entrySeqNum : commitSeqNums) {
693 Commit entry = newCommitMap.get(entrySeqNum);
695 long lastCommitSeenSeqNum = 0;
697 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
698 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
701 if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
703 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
705 if (prevCommit != null) {
706 prevCommit.setDead();
708 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
709 committedMapByKey.put(kv.getKey(), entry);
716 Set<Commit> commitsToEditSet = new HashSet<Commit>();
718 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
719 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
722 commitsToEditSet.remove(null);
724 for (Commit prevCommit : commitsToEditSet) {
726 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
728 if (!prevCommit.isLive()) {
729 commitMap.remove(prevCommit.getTransSequenceNumber());
733 // // Remove any old commits
734 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
735 // Commit prevCommit = i.next().getValue();
736 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
738 // if (!prevCommit.isLive()) {
743 // Remove any old commits
744 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
745 // Commit prevCommit = i.next().getValue();
747 // if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
751 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
753 // if (!prevCommit.isLive()) {
759 // Add the new commit
760 commitMap.put(entry.getTransSequenceNumber(), entry);
761 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
763 // Update the committed table list
764 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
765 IoTString key = kv.getKey();
766 commitedTable.put(key, kv);
768 committedMapByKey.put(key, entry);
772 // Clear the new commits storage so we can use it later
773 newCommitMap.clear();
776 private void deleteDeadUncommittedTransactions() {
777 // Make dead the transactions
778 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
779 Transaction prevtrans = i.next().getValue();
780 long transArb = prevtrans.getArbitrator();
782 if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
783 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
790 private void createSpeculativeTable() {
792 if (uncommittedTransactionsMap.keySet().size() == 0) {
793 speculativeTable = commitedTable; // Ok that they are the same object
797 Map speculativeTableTmp = null;
798 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
800 // Sort from oldest to newest commit
801 Collections.sort(utSeqNums);
803 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
804 speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
806 for (Long key : utSeqNums) {
807 Transaction trans = uncommittedTransactionsMap.get(key);
809 lastUncommittedTransaction = key;
812 if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
813 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
814 speculativeTableTmp.put(kv.getKey(), kv);
818 } catch (Exception e) {
823 speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
825 for (Long key : utSeqNums) {
827 if (key <= lastUncommittedTransaction) {
831 lastUncommittedTransaction = key;
833 Transaction trans = uncommittedTransactionsMap.get(key);
836 if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
837 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
838 speculativeTableTmp.put(kv.getKey(), kv);
842 } catch (Exception e) {
848 speculativeTable = speculativeTableTmp;
851 private int expectedsize, currmaxsize;
853 private void checkNumSlots(int numslots) {
854 if (numslots != expectedsize) {
855 throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
859 private void initExpectedSize(long firstsequencenumber) {
860 long prevslots = firstsequencenumber;
861 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
862 currmaxsize = numslots;
865 private void updateExpectedSize() {
867 if (expectedsize > currmaxsize) {
868 expectedsize = currmaxsize;
872 private void updateCurrMaxSize(int newmaxsize) {
873 currmaxsize = newmaxsize;
876 private void commitNewMaxSize() {
877 if (numslots != currmaxsize) {
878 buffer.resize(currmaxsize);
881 numslots = currmaxsize;
882 setResizeThreshold();
885 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
886 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
889 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
890 long oldseqnum = entry.getOldSeqNum();
891 long newseqnum = entry.getNewSeqNum();
892 boolean isequal = entry.getEqual();
893 long machineid = entry.getMachineID();
894 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
895 Slot slot = indexer.getSlot(seqnum);
897 long slotmachineid = slot.getMachineID();
898 if (isequal != (slotmachineid == machineid)) {
899 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
904 HashSet<Long> watchset = new HashSet<Long>();
905 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
906 long entry_mid = lastmsg_entry.getKey();
907 /* We've seen it, don't need to continue to watch. Our next
908 * message will implicitly acknowledge it. */
909 if (entry_mid == localmachineid)
911 Pair<Long, Liveness> v = lastmsg_entry.getValue();
912 long entry_seqn = v.getFirst();
913 if (entry_seqn < newseqnum) {
914 addWatchList(entry_mid, entry);
915 watchset.add(entry_mid);
918 if (watchset.isEmpty())
921 entry.setWatchSet(watchset);
924 private void processEntry(NewKey entry) {
925 arbitratorTable.put(entry.getKey(), entry.getMachineID());
927 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
929 if (oldNewKey != null) {
934 private void processEntry(Transaction entry) {
935 Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
937 // Duplicate so delete old copy
938 if (prevTrans != null) {
943 private void processEntry(Abort entry) {
945 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
946 // Abort has not been seen yet so we need to keep track of it
948 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
949 if (prevAbort != null) {
950 prevAbort.setDead(); // delete old version of the duplicate
953 // The machine already saw this so it is dead
957 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
960 private void processEntry(Commit entry, Slot s) {
961 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
962 if (prevCommit != null) {
963 prevCommit.setDead();
967 private void processEntry(TableStatus entry) {
968 int newnumslots = entry.getMaxSlots();
969 updateCurrMaxSize(newnumslots);
970 if (lastTableStatus != null)
971 lastTableStatus.setDead();
972 lastTableStatus = entry;
975 private void addWatchList(long machineid, RejectedMessage entry) {
976 HashSet<RejectedMessage> entries = watchlist.get(machineid);
978 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
982 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
983 machineSet.remove(machineid);
985 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
986 if (watchset != null) {
987 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
988 RejectedMessage rm = rmit.next();
989 if (rm.getNewSeqNum() <= seqnum) {
990 /* Remove it from our watchlist */
992 /* Decrement machines that need to see this notification */
993 rm.removeWatcher(machineid);
998 if (machineid == localmachineid) {
999 /* Our own messages are immediately dead. */
1000 if (liveness instanceof LastMessage) {
1001 ((LastMessage)liveness).setDead();
1002 } else if (liveness instanceof Slot) {
1003 ((Slot)liveness).setDead();
1005 throw new Error("Unrecognized type");
1009 // Set dead the abort
1010 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1011 Abort abort = i.next().getValue();
1013 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1019 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1020 if (lastmsgentry == null)
1023 long lastmsgseqnum = lastmsgentry.getFirst();
1024 Liveness lastentry = lastmsgentry.getSecond();
1025 if (machineid != localmachineid) {
1026 if (lastentry instanceof LastMessage) {
1027 ((LastMessage)lastentry).setDead();
1028 } else if (lastentry instanceof Slot) {
1029 ((Slot)lastentry).setDead();
1031 throw new Error("Unrecognized type");
1035 if (machineid == localmachineid) {
1036 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1037 throw new Error("Server Error: Mismatch on local machine sequence number");
1039 if (lastmsgseqnum > seqnum)
1040 throw new Error("Server Error: Rollback on remote machine sequence number");
1044 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1045 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1046 for (Entry entry : slot.getEntries()) {
1047 switch (entry.getType()) {
1049 case Entry.TypeNewKey:
1050 processEntry((NewKey)entry);
1053 case Entry.TypeCommit:
1054 processEntry((Commit)entry, slot);
1057 case Entry.TypeAbort:
1058 processEntry((Abort)entry);
1061 case Entry.TypeTransaction:
1062 processEntry((Transaction)entry);
1065 case Entry.TypeLastMessage:
1066 processEntry((LastMessage)entry, machineSet);
1069 case Entry.TypeRejectedMessage:
1070 processEntry((RejectedMessage)entry, indexer);
1073 case Entry.TypeTableStatus:
1074 processEntry((TableStatus)entry);
1078 throw new Error("Unrecognized type: " + entry.getType());
1083 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1084 for (int i = 0; i < newslots.length; i++) {
1085 Slot currslot = newslots[i];
1086 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1087 if (prevslot != null &&
1088 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1089 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);