2 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.HashSet;
6 import java.util.Arrays;
7 import java.util.Vector;
8 import java.util.Random;
9 import java.util.Queue;
10 import java.util.LinkedList;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.Collection;
15 import java.util.Collections;
19 * IoTTable data structure. Provides client inferface.
20 * @author Brian Demsky
24 final public class Table {
25 private int numslots; //number of slots stored in buffer
27 //table of key-value pairs
28 //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
30 // machine id -> (sequence number, Slot or LastMessage); records last message by each client
31 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
33 private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
34 private Vector<Long> rejectedmessagelist = new Vector<Long>();
35 private SlotBuffer buffer;
36 private CloudComm cloud;
37 private long sequencenumber; //Largest sequence number a client has received
38 private long localmachineid;
39 private TableStatus lastTableStatus;
40 static final int FREE_SLOTS = 10; //number of slots that should be kept free
41 static final int SKIP_THRESHOLD = 10;
42 private long liveslotcount = 0;
44 static final double RESIZE_MULTIPLE = 1.2;
45 static final double RESIZE_THRESHOLD = 0.75;
46 static final int REJECTED_THRESHOLD = 5;
47 private int resizethreshold;
48 private long lastliveslotseqn; //smallest sequence number with a live entry
49 private Random random = new Random();
50 private long lastUncommittedTransaction = 0;
52 private int smallestTableStatusSeen = -1;
53 private int largestTableStatusSeen = -1;
55 private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
56 private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
57 private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
58 private Map<Long, Abort> abortMap = null; // Set of the live aborts
59 private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
60 private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
61 private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
62 private Map<Long, Transaction> uncommittedTransactionsMap = null;
63 private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
64 private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
65 private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
66 private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
67 private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
71 public Table(String baseurl, String password, long _localmachineid) {
72 localmachineid = _localmachineid;
73 buffer = new SlotBuffer();
74 numslots = buffer.capacity();
77 cloud = new CloudComm(this, baseurl, password);
83 public Table(CloudComm _cloud, long _localmachineid) {
84 localmachineid = _localmachineid;
85 buffer = new SlotBuffer();
86 numslots = buffer.capacity();
94 private void setupDataStructs() {
95 pendingTransQueue = new LinkedList<PendingTransaction>();
96 commitMap = new HashMap<Long, Commit>();
97 abortMap = new HashMap<Long, Abort>();
98 committedMapByKey = new HashMap<IoTString, Commit>();
99 commitedTable = new HashMap<IoTString, KeyValue>();
100 speculativeTable = new HashMap<IoTString, KeyValue>();
101 uncommittedTransactionsMap = new HashMap<Long, Transaction>();
102 arbitratorTable = new HashMap<IoTString, Long>();
103 newKeyTable = new HashMap<IoTString, NewKey>();
104 newCommitMap = new HashMap<Long, Commit>();
105 lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
106 lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
109 public void rebuild() throws ServerException {
110 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
111 validateandupdate(newslots, true);
114 // // TODO: delete method
115 // public void printSlots() {
116 // long o = buffer.getOldestSeqNum();
117 // long n = buffer.getNewestSeqNum();
119 // int[] types = new int[10];
125 // for (long i = o; i < (n + 1); i++) {
126 // Slot s = buffer.getSlot(i);
128 // Vector<Entry> entries = s.getEntries();
130 // for (Entry e : entries) {
132 // int type = e.getType();
133 // types[type] = types[type] + 1;
142 // for (int i = 0; i < 10; i++) {
143 // System.out.println(i + " " + types[i]);
145 // System.out.println("Live count: " + livec);
146 // System.out.println("Dead count: " + deadc);
147 // System.out.println("Old: " + o);
148 // System.out.println("New: " + n);
149 // System.out.println("Size: " + buffer.size());
150 // System.out.println("Commits Map: " + commitedTable.size());
151 // System.out.println("Commits List: " + commitMap.size());
154 public IoTString getCommitted(IoTString key) {
155 KeyValue kv = commitedTable.get(key);
157 return kv.getValue();
163 public IoTString getSpeculative(IoTString key) {
164 KeyValue kv = speculativeTable.get(key);
166 return kv.getValue();
172 public IoTString getCommittedAtomic(IoTString key) {
173 KeyValue kv = commitedTable.get(key);
175 if (arbitratorTable.get(key) == null) {
176 throw new Error("Key not Found.");
179 // Make sure new key value pair matches the current arbitrator
180 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
181 // TODO: Maybe not throw en error
182 throw new Error("Not all Key Values Match Arbitrator.");
186 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
187 return kv.getValue();
189 pendingTransBuild.addKVGuard(new KeyValue(key, null));
194 public IoTString getSpeculativeAtomic(IoTString key) {
196 if (arbitratorTable.get(key) == null) {
197 throw new Error("Key not Found.");
200 // Make sure new key value pair matches the current arbitrator
201 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
202 // TODO: Maybe not throw en error
203 throw new Error("Not all Key Values Match Arbitrator.");
206 KeyValue kv = speculativeTable.get(key);
208 kv = commitedTable.get(key);
212 pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
213 return kv.getValue();
215 pendingTransBuild.addKVGuard(new KeyValue(key, null));
220 public Long getArbitrator(IoTString key) {
221 return arbitratorTable.get(key);
224 public void initTable() throws ServerException {
225 cloud.setSalt();//Set the salt
226 Slot s = new Slot(this, 1, localmachineid);
227 TableStatus status = new TableStatus(s, numslots);
229 Slot[] array = cloud.putSlot(s, numslots);
231 array = new Slot[] {s};
232 /* update data structure */
233 validateandupdate(array, true);
235 throw new Error("Error on initialization");
239 public String toString() {
240 String retString = " Committed Table: \n";
241 retString += "---------------------------\n";
242 retString += commitedTable.toString();
246 retString += " Speculative Table: \n";
247 retString += "---------------------------\n";
248 retString += speculativeTable.toString();
253 public void startTransaction() {
254 // Create a new transaction, invalidates any old pending transactions.
255 pendingTransBuild = new PendingTransaction();
258 public void commitTransaction() throws ServerException {
260 if (pendingTransBuild.getKVUpdates().size() == 0) {
261 // If no updates are made then there is no point inserting into the chain
265 // Add the pending transaction to the queue
266 pendingTransQueue.add(pendingTransBuild);
268 // Delete since already inserted
269 pendingTransBuild = new PendingTransaction();
271 while (!pendingTransQueue.isEmpty()) {
272 if (tryput( pendingTransQueue.peek(), false)) {
273 pendingTransQueue.poll();
278 public void addKV(IoTString key, IoTString value) {
280 if (arbitratorTable.get(key) == null) {
281 throw new Error("Key not Found.");
284 // Make sure new key value pair matches the current arbitrator
285 if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
286 // TODO: Maybe not throw en error
287 throw new Error("Not all Key Values Match Arbitrator.");
290 KeyValue kv = new KeyValue(key, value);
291 pendingTransBuild.addKV(kv);
294 public void update() throws ServerException {
296 Slot[] newslots = cloud.getSlots(sequencenumber + 1);
298 validateandupdate(newslots, false);
301 if (!pendingTransQueue.isEmpty()) {
302 // We have a pending transaction so do full insertion
304 while (!pendingTransQueue.isEmpty()) {
305 if (tryput( pendingTransQueue.peek(), false)) {
306 pendingTransQueue.poll();
310 // We dont have a pending transaction so do minimal effort
311 if (uncommittedTransactionsMap.keySet().size() > 0) {
313 boolean doEnd = false;
314 boolean needResize = false;
315 while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
316 boolean resize = needResize;
319 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
321 if (liveslotcount > resizethreshold) {
322 resize = true; //Resize is forced
326 newsize = (int) (numslots * RESIZE_MULTIPLE);
327 TableStatus status = new TableStatus(s, newsize);
331 doRejectedMessages(s);
333 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
335 // Resize was needed so redo call
336 if (retTup.getFirst()) {
341 // Extract working variables
342 boolean seenliveslot = retTup.getSecond();
343 long seqn = retTup.getThird();
345 // Did need to arbitrate
346 doEnd = !doArbitration(s);
348 doOptionalRescue(s, seenliveslot, seqn, resize);
355 Slot[] array = cloud.putSlot(s, max);
357 array = new Slot[] {s};
358 rejectedmessagelist.clear();
360 if (array.length == 0)
361 throw new Error("Server Error: Did not send any slots");
362 rejectedmessagelist.add(s.getSequenceNumber());
366 /* update data structure */
367 validateandupdate(array, true);
373 public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
376 if (arbitratorTable.get(keyName) != null) {
377 // There is already an arbitrator
381 if (tryput(keyName, machineId, false)) {
382 // If successfully inserted
388 public void decrementLiveCount() {
392 private void setResizeThreshold() {
393 int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
394 resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
397 private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
398 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
401 if (liveslotcount > resizethreshold) {
402 resize = true; //Resize is forced
406 newsize = (int) (numslots * RESIZE_MULTIPLE);
407 TableStatus status = new TableStatus(s, newsize);
411 doRejectedMessages(s);
413 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
415 // Resize was needed so redo call
416 if (retTup.getFirst()) {
417 return tryput(pendingTrans, true);
420 // Extract working variables
421 boolean seenliveslot = retTup.getSecond();
422 long seqn = retTup.getThird();
426 Transaction trans = new Transaction(s,
427 s.getSequenceNumber(),
429 pendingTrans.getArbitrator(),
430 pendingTrans.getKVUpdates(),
431 pendingTrans.getKVGuard());
432 boolean insertedTrans = false;
433 if (s.hasSpace(trans)) {
435 insertedTrans = true;
438 doOptionalRescue(s, seenliveslot, seqn, resize);
439 return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
442 private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
443 Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
445 if (liveslotcount > resizethreshold) {
446 resize = true; //Resize is forced
450 newsize = (int) (numslots * RESIZE_MULTIPLE);
451 TableStatus status = new TableStatus(s, newsize);
455 doRejectedMessages(s);
456 ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
458 // Resize was needed so redo call
459 if (retTup.getFirst()) {
460 return tryput(keyName, arbMachineid, true);
463 // Extract working variables
464 boolean seenliveslot = retTup.getSecond();
465 long seqn = retTup.getThird();
469 NewKey newKey = new NewKey(s, keyName, arbMachineid);
471 boolean insertedNewKey = false;
472 if (s.hasSpace(newKey)) {
474 insertedNewKey = true;
477 doOptionalRescue(s, seenliveslot, seqn, resize);
478 return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
481 private void doRejectedMessages(Slot s) {
482 if (! rejectedmessagelist.isEmpty()) {
483 /* TODO: We should avoid generating a rejected message entry if
484 * there is already a sufficient entry in the queue (e.g.,
485 * equalsto value of true and same sequence number). */
487 long old_seqn = rejectedmessagelist.firstElement();
488 if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
489 long new_seqn = rejectedmessagelist.lastElement();
490 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
495 /* Go through list of missing messages */
496 for (; i < rejectedmessagelist.size(); i++) {
497 long curr_seqn = rejectedmessagelist.get(i);
498 Slot s_msg = buffer.getSlot(curr_seqn);
501 prev_seqn = curr_seqn;
503 /* Generate rejected message entry for missing messages */
504 if (prev_seqn != -1) {
505 RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
508 /* Generate rejected message entries for present messages */
509 for (; i < rejectedmessagelist.size(); i++) {
510 long curr_seqn = rejectedmessagelist.get(i);
511 Slot s_msg = buffer.getSlot(curr_seqn);
512 long machineid = s_msg.getMachineID();
513 RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
520 private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
521 long newestseqnum = buffer.getNewestSeqNum();
522 long oldestseqnum = buffer.getOldestSeqNum();
523 if (lastliveslotseqn < oldestseqnum)
524 lastliveslotseqn = oldestseqnum;
526 long seqn = lastliveslotseqn;
527 boolean seenliveslot = false;
528 long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
529 long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
533 for (; seqn < threshold; seqn++) {
534 Slot prevslot = buffer.getSlot(seqn);
535 // Push slot number forward
537 lastliveslotseqn = seqn;
539 if (! prevslot.isLive())
542 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
543 for (Entry liveentry : liveentries) {
544 if (s.hasSpace(liveentry)) {
545 s.addEntry(liveentry);
546 } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
548 System.out.println("B"); //?
549 return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
556 return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
559 private boolean doArbitration(Slot s) {
561 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
562 List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
564 // Sort from oldest to newest
565 Collections.sort(transSeqNums);
567 boolean didNeedArbitration = false;
568 for (Long transNum : transSeqNums) {
569 Transaction ut = uncommittedTransactionsMap.get(transNum);
571 // Check if this machine arbitrates for this transaction
572 if (ut.getArbitrator() != localmachineid ) {
576 // we did have something to arbitrate on
577 didNeedArbitration = true;
579 Entry newEntry = null;
581 if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
582 // Guard evaluated as true
584 // update the local tmp current key set
585 for (KeyValue kv : ut.getkeyValueUpdateSet()) {
586 speculativeTableTmp.put(kv.getKey(), kv);
590 newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
595 newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
598 if ((newEntry != null) && s.hasSpace(newEntry)) {
599 s.addEntry(newEntry);
605 return didNeedArbitration;
608 private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
609 /* now go through live entries from least to greatest sequence number until
610 * either all live slots added, or the slot doesn't have enough room
611 * for SKIP_THRESHOLD consecutive entries*/
613 long newestseqnum = buffer.getNewestSeqNum();
615 for (; seqn <= newestseqnum; seqn++) {
616 Slot prevslot = buffer.getSlot(seqn);
617 //Push slot number forward
619 lastliveslotseqn = seqn;
621 if (!prevslot.isLive())
624 Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
625 for (Entry liveentry : liveentries) {
626 if (s.hasSpace(liveentry))
627 s.addEntry(liveentry);
630 if (skipcount > SKIP_THRESHOLD)
637 private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
641 Slot[] array = cloud.putSlot(s, max);
643 array = new Slot[] {s};
644 rejectedmessagelist.clear();
646 if (array.length == 0)
647 throw new Error("Server Error: Did not send any slots");
648 rejectedmessagelist.add(s.getSequenceNumber());
652 validateandupdate(array, true);
656 private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
657 /* The cloud communication layer has checked slot HMACs already
659 if (newslots.length == 0) return;
661 // Reset the table status declared sizes
662 smallestTableStatusSeen = -1;
663 largestTableStatusSeen = -1;
665 long firstseqnum = newslots[0].getSequenceNumber();
666 if (firstseqnum <= sequencenumber) {
667 throw new Error("Server Error: Sent older slots!");
670 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
671 checkHMACChain(indexer, newslots);
673 HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
675 // initExpectedSize(firstseqnum);
676 for (Slot slot : newslots) {
677 processSlot(indexer, slot, acceptupdatestolocal, machineSet);
678 // updateExpectedSize();
681 /* If there is a gap, check to see if the server sent us everything. */
682 if (firstseqnum != (sequencenumber + 1)) {
685 checkNumSlots(newslots.length);
686 if (!machineSet.isEmpty()) {
687 throw new Error("Missing record for machines: " + machineSet);
694 /* Commit new to slots. */
695 for (Slot slot : newslots) {
696 buffer.putSlot(slot);
699 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
701 // Process all on key value pairs
702 proccessAllNewCommits();
704 // Go through all uncommitted transactions and kill the ones that are dead
705 deleteDeadUncommittedTransactions();
707 // Speculate on key value pairs
708 createSpeculativeTable();
711 public void proccessAllNewCommits() {
713 // Process only if there are commit
714 if (newCommitMap.keySet().size() == 0) {
718 List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
720 // Sort from oldest to newest commit
721 Collections.sort(commitSeqNums);
723 // Go through each new commit one by one
724 for (Long entrySeqNum : commitSeqNums) {
725 Commit entry = newCommitMap.get(entrySeqNum);
727 long lastCommitSeenSeqNum = 0;
729 if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
730 lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
733 if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
735 Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
737 if (prevCommit != null) {
738 prevCommit.setDead();
740 for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
741 committedMapByKey.put(kv.getKey(), entry);
748 Set<Commit> commitsToEditSet = new HashSet<Commit>();
750 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
751 commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
754 commitsToEditSet.remove(null);
756 for (Commit prevCommit : commitsToEditSet) {
758 Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
760 if (!prevCommit.isLive()) {
761 commitMap.remove(prevCommit.getTransSequenceNumber());
765 // // Remove any old commits
766 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
767 // Commit prevCommit = i.next().getValue();
768 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
770 // if (!prevCommit.isLive()) {
775 // Remove any old commits
776 // for (Iterator<Map.Entry<Long, Commit>> i = commitMap.entrySet().iterator(); i.hasNext();) {
777 // Commit prevCommit = i.next().getValue();
779 // if (prevCommit.getTransArbitrator() != entry.getTransArbitrator()) {
783 // prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
785 // if (!prevCommit.isLive()) {
791 // Add the new commit
792 commitMap.put(entry.getTransSequenceNumber(), entry);
793 lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
795 // Update the committed table list
796 for (KeyValue kv : entry.getkeyValueUpdateSet()) {
797 IoTString key = kv.getKey();
798 commitedTable.put(key, kv);
800 committedMapByKey.put(key, entry);
804 // Clear the new commits storage so we can use it later
805 newCommitMap.clear();
808 private void deleteDeadUncommittedTransactions() {
809 // Make dead the transactions
810 for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
811 Transaction prevtrans = i.next().getValue();
812 long transArb = prevtrans.getArbitrator();
814 if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
815 (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
822 private void createSpeculativeTable() {
824 if (uncommittedTransactionsMap.keySet().size() == 0) {
825 // speculativeTable = commitedTable; // Ok that they are the same object
829 Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
830 List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
832 // Sort from oldest to newest commit
833 Collections.sort(utSeqNums);
835 if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
837 speculativeTable.clear();
838 lastUncommittedTransaction = -1;
840 for (Long key : utSeqNums) {
841 Transaction trans = uncommittedTransactionsMap.get(key);
843 lastUncommittedTransaction = key;
845 if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
846 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
847 speculativeTableTmp.put(kv.getKey(), kv);
853 for (Long key : utSeqNums) {
855 if (key <= lastUncommittedTransaction) {
859 lastUncommittedTransaction = key;
861 Transaction trans = uncommittedTransactionsMap.get(key);
863 if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
864 for (KeyValue kv : trans.getkeyValueUpdateSet()) {
865 speculativeTableTmp.put(kv.getKey(), kv);
871 for (IoTString key : speculativeTableTmp.keySet()) {
872 speculativeTable.put(key, speculativeTableTmp.get(key));
875 // speculativeTable = speculativeTableTmp;
878 private int expectedsize, currmaxsize;
880 private void checkNumSlots(int numslots) {
883 // We only have 1 size so we must have this many slots
884 if (largestTableStatusSeen == smallestTableStatusSeen) {
885 if (numslots != smallestTableStatusSeen) {
886 throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
889 // We have more than 1
890 if (numslots < smallestTableStatusSeen) {
891 throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
895 // if (numslots != expectedsize) {
896 // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
900 private void initExpectedSize(long firstsequencenumber) {
901 long prevslots = firstsequencenumber;
902 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
903 currmaxsize = numslots;
906 private void updateExpectedSize() {
908 if (expectedsize > currmaxsize) {
909 System.out.println("Maxing Out: " + expectedsize + " " + currmaxsize);
910 expectedsize = currmaxsize;
914 private void updateCurrMaxSize(int newmaxsize) {
915 currmaxsize = newmaxsize;
918 private void commitNewMaxSize() {
920 if (largestTableStatusSeen == -1) {
921 currmaxsize = numslots;
923 currmaxsize = largestTableStatusSeen;
926 if (numslots != currmaxsize) {
927 buffer.resize(currmaxsize);
930 numslots = currmaxsize;
931 setResizeThreshold();
934 private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
935 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
938 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
939 long oldseqnum = entry.getOldSeqNum();
940 long newseqnum = entry.getNewSeqNum();
941 boolean isequal = entry.getEqual();
942 long machineid = entry.getMachineID();
943 for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
944 Slot slot = indexer.getSlot(seqnum);
946 long slotmachineid = slot.getMachineID();
947 if (isequal != (slotmachineid == machineid)) {
948 throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
953 HashSet<Long> watchset = new HashSet<Long>();
954 for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
955 long entry_mid = lastmsg_entry.getKey();
956 /* We've seen it, don't need to continue to watch. Our next
957 * message will implicitly acknowledge it. */
958 if (entry_mid == localmachineid)
960 Pair<Long, Liveness> v = lastmsg_entry.getValue();
961 long entry_seqn = v.getFirst();
962 if (entry_seqn < newseqnum) {
963 addWatchList(entry_mid, entry);
964 watchset.add(entry_mid);
967 if (watchset.isEmpty())
970 entry.setWatchSet(watchset);
973 private void processEntry(NewKey entry) {
974 arbitratorTable.put(entry.getKey(), entry.getMachineID());
976 NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
978 if (oldNewKey != null) {
983 private void processEntry(Transaction entry) {
985 long arb = entry.getArbitrator();
986 Long comLast = lastCommitSeenSeqNumMap.get(arb);
987 Long abLast = lastAbortSeenSeqNumMap.get(arb);
989 Transaction prevTrans = null;
991 if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
992 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
993 } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
994 prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
996 prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
999 // Duplicate so delete old copy
1000 if (prevTrans != null) {
1001 prevTrans.setDead();
1005 private void processEntry(Abort entry) {
1007 if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
1008 // Abort has not been seen yet so we need to keep track of it
1010 Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
1011 if (prevAbort != null) {
1012 prevAbort.setDead(); // delete old version of the duplicate
1015 // The machine already saw this so it is dead
1019 if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
1020 lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
1024 private void processEntry(Commit entry, Slot s) {
1025 Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
1026 if (prevCommit != null) {
1027 prevCommit.setDead();
1031 private void processEntry(TableStatus entry) {
1032 int newnumslots = entry.getMaxSlots();
1033 // updateCurrMaxSize(newnumslots);
1034 if (lastTableStatus != null)
1035 lastTableStatus.setDead();
1036 lastTableStatus = entry;
1038 if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
1039 smallestTableStatusSeen = newnumslots;
1042 if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
1043 largestTableStatusSeen = newnumslots;
1046 // System.out.println("Table Stat: " + newnumslots + " large: " + largestTableStatusSeen + " small: " + smallestTableStatusSeen);
1049 private void addWatchList(long machineid, RejectedMessage entry) {
1050 HashSet<RejectedMessage> entries = watchlist.get(machineid);
1051 if (entries == null)
1052 watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
1056 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1057 machineSet.remove(machineid);
1059 HashSet<RejectedMessage> watchset = watchlist.get(machineid);
1060 if (watchset != null) {
1061 for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
1062 RejectedMessage rm = rmit.next();
1063 if (rm.getNewSeqNum() <= seqnum) {
1064 /* Remove it from our watchlist */
1066 /* Decrement machines that need to see this notification */
1067 rm.removeWatcher(machineid);
1072 if (machineid == localmachineid) {
1073 /* Our own messages are immediately dead. */
1074 if (liveness instanceof LastMessage) {
1075 ((LastMessage)liveness).setDead();
1076 } else if (liveness instanceof Slot) {
1077 ((Slot)liveness).setDead();
1079 throw new Error("Unrecognized type");
1083 // Set dead the abort
1084 for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
1085 Abort abort = i.next().getValue();
1087 if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
1093 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
1094 if (lastmsgentry == null)
1097 long lastmsgseqnum = lastmsgentry.getFirst();
1098 Liveness lastentry = lastmsgentry.getSecond();
1099 if (machineid != localmachineid) {
1100 if (lastentry instanceof LastMessage) {
1101 ((LastMessage)lastentry).setDead();
1102 } else if (lastentry instanceof Slot) {
1103 ((Slot)lastentry).setDead();
1105 throw new Error("Unrecognized type");
1109 if (machineid == localmachineid) {
1110 if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
1111 throw new Error("Server Error: Mismatch on local machine sequence number");
1113 if (lastmsgseqnum > seqnum)
1114 throw new Error("Server Error: Rollback on remote machine sequence number");
1118 private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
1119 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
1120 for (Entry entry : slot.getEntries()) {
1121 switch (entry.getType()) {
1123 case Entry.TypeNewKey:
1124 processEntry((NewKey)entry);
1127 case Entry.TypeCommit:
1128 processEntry((Commit)entry, slot);
1131 case Entry.TypeAbort:
1132 processEntry((Abort)entry);
1135 case Entry.TypeTransaction:
1136 processEntry((Transaction)entry);
1139 case Entry.TypeLastMessage:
1140 processEntry((LastMessage)entry, machineSet);
1143 case Entry.TypeRejectedMessage:
1144 processEntry((RejectedMessage)entry, indexer);
1147 case Entry.TypeTableStatus:
1148 processEntry((TableStatus)entry);
1152 throw new Error("Unrecognized type: " + entry.getType());
1157 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
1158 for (int i = 0; i < newslots.length; i++) {
1159 Slot currslot = newslots[i];
1160 Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
1161 if (prevslot != null &&
1162 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
1163 throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);