Fixes to tex doc; Code Updates
authorAli Younis <ayounis@uci.edu>
Thu, 1 Dec 2016 07:08:32 +0000 (23:08 -0800)
committerAli Younis <ayounis@uci.edu>
Thu, 1 Dec 2016 07:08:32 +0000 (23:08 -0800)
doc2/iotcloud.tex
src2/java/iotcloud/Commit.java
src2/java/iotcloud/Entry.java
src2/java/iotcloud/PendingTransaction.java
src2/java/iotcloud/Table.java
src2/java/iotcloud/Transaction.java

index e3ae679335db5eca99b30253d73daf8d68a3c54b..1d760d7a29214fe945e5ee91cb902f320d9a97e2 100644 (file)
@@ -1499,12 +1499,12 @@ This rescue is not mandatory.  This is trying to fill the remaining portion of t
         \EndIf\\\r
     \r
         \If{$\lnot$\Call{EvaluateGuard}{$Guard_t, CurrKV$}}\r
-            \State $abort_{de} \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
+            \State $abortde \gets $\Call{CreateAbort}{$seq_t, id_t$}\r
             \LeftComment{No more space so we cant arbitrate any further}\r
-            \If($\lnot$\Call{DeHasSpace}{$DE_a, abort_{de}$})\r
+            \If($lnot$\Call{DeHasSpace}{$DE_a, abortde$})\r
                 \State \Return{$DE_a$}\r
             \EndIf\r
-            \State $DE_a \gets DE_a \cup abort_{de}$\r
+            \State $DE_a \gets DE_a \cup abortde$\r
         \Else\r
             \State $DKV \gets \{\tuple{k,v}| \tuple{k,v} \in KV \land \tuple{k',v'}\in KV_t \land k'=k\}$\r
             \State $KVTmp \gets (KV \setminus DKV) \cup KV'$\r
@@ -1847,7 +1847,7 @@ Creates a new key and specifies which machine ID is the arbitrator. If there is
             \State \Return{$false$} \Comment{Key already created}\r
         \EndIf\\\r
     \r
-        \State $success \gets$ \Call{TryInsertNewKey}{$k_a, id_a$}\r
+        \State $success \gets$ \Call{TryInsertNewKey}{$k_a, id_a, false$}\r
     \EndWhile\r
     \r
     \State \Return{$true$} \Comment{If got here then insertion was correct}\r
index c533c36bda67edd4cfc37044fa379f32943c7476..1ee044df3e7cca9f22eeb29f071f8666989e2f49 100644 (file)
@@ -15,7 +15,6 @@ import java.util.Iterator;
 class Commit extends Entry {
        private long seqnumtrans;
        private Set<KeyValue> keyValueUpdateSet = null;
-       private Set<KeyValue> liveValues = null;
 
 
        public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
@@ -23,12 +22,10 @@ class Commit extends Entry {
                seqnumtrans = _seqnumtrans;
 
                keyValueUpdateSet = new HashSet<KeyValue>();
-               liveValues = new HashSet<KeyValue>();
 
                for (KeyValue kv : _keyValueUpdateSet) {
                        KeyValue kvCopy = kv.getCopy();
                        keyValueUpdateSet.add(kvCopy);
-                       liveValues.add(kvCopy);
                }
        }
 
@@ -89,17 +86,17 @@ class Commit extends Entry {
                        return;
 
                for (KeyValue kv1 : kvSet) {
-                       for (Iterator<KeyValue> i = liveValues.iterator(); i.hasNext();) {
+                       for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
                                KeyValue kv2 = i.next();
 
                                if (kv1.getKey() == kv2.getKey()) {
-                                       liveValues.remove(kv2);
+                                       keyValueUpdateSet.remove(kv2);
                                        break;
                                }
                        }
                }
 
-               if (liveValues.size() == 0)
+               if (keyValueUpdateSet.size() == 0)
                        this.setDead();
        }
 }
\ No newline at end of file
index af99192c25a6b83ac08ff60193ce1776f80f78f8..8734bb1740d363c9dbec8fa4b313ae9b98bd8dda 100644 (file)
@@ -68,6 +68,11 @@ abstract class Entry implements Liveness {
         */
 
        public void setDead() {
+
+               if (!islive ) {
+                       return; // already dead
+               }
+
                islive = false;
                parentslot.decrementLiveCount();
        }
index 5253a94f841cc7c36c5e70c88e78ee67a9a9400e..3d39e12559ac3270d1e9c35cb05187215ae904d8 100644 (file)
@@ -14,6 +14,7 @@ class PendingTransaction {
 
     private Set<KeyValue> keyValueUpdateSet;
     private Guard guard;
+    private long arbitrator = -1;
 
     public PendingTransaction() {
         keyValueUpdateSet = new HashSet<KeyValue>();
@@ -40,6 +41,17 @@ class PendingTransaction {
         keyValueUpdateSet.add(newKV);
     }
 
+    public boolean checkArbitrator(long arb) {
+        if (arbitrator == -1) {
+            arbitrator = arb;
+            return true;
+        }
+
+        return arb == arbitrator;
+    }
+
+
+
     /**
      * Get the key value update set
      *
index 732867fc4cd7bc0f49286f202cd5b43f9b31f659..1cdb3d5b11b789d3cb29afac838cb4e0d2ca11d4 100644 (file)
@@ -9,6 +9,8 @@ import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.Collection;
 
 /**
  * IoTTable data structure.  Provides client inferface.
@@ -20,7 +22,7 @@ final public class Table {
        private int numslots;   //number of slots stored in buffer
 
        //table of key-value pairs
-       private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
+       //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
 
        // machine id -> (sequence number, Slot or LastMessage); records last message by each client
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
@@ -46,10 +48,12 @@ final public class Table {
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
        private List<Commit> commitList = null; // List of all the most recent live commits
+       private Set<Abort> abortSet = null; // Set of the live aborts
        private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
        private List<Transaction> uncommittedTransactionsList = null; //
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
-       private Set<Abort> arbitratorTable = null; // Table of arbitrators
+       // private Set<Abort> arbitratorTable = null; // Table of arbitrators
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -63,7 +67,9 @@ final public class Table {
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
+               abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
+               speculativeTable = new HashMap<IoTString, KeyValue>();
                uncommittedTransactionsList = new LinkedList<Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
        }
@@ -78,7 +84,9 @@ final public class Table {
 
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
+               abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
+               speculativeTable = new HashMap<IoTString, KeyValue>();
                uncommittedTransactionsList = new LinkedList<Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
        }
@@ -88,20 +96,27 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
-       public void update() {
-               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
-               validateandupdate(newslots, false);
+
+       public IoTString getCommitted(IoTString key) {
+               KeyValue kv = commitedTable.get(key);
+               if (kv != null) {
+                       return kv.getValue();
+               } else {
+                       return null;
+               }
        }
 
-       public IoTString get(IoTString key) {
-               KeyValue kv = table.get(key);
-               if (kv != null)
+       public IoTString getSpeculative(IoTString key) {
+               KeyValue kv = speculativeTable.get(key);
+               if (kv != null) {
                        return kv.getValue();
-               else
+               } else {
                        return null;
+               }
        }
 
+
        public void initTable() {
                cloud.setSalt();//Set the salt
                Slot s = new Slot(this, 1, localmachineid);
@@ -118,9 +133,26 @@ final public class Table {
        }
 
        public String toString() {
-               return table.toString();
+
+
+               String retString = " Committed Table: \n";
+               retString += "---------------------------\n";
+               retString += commitedTable.toString();
+
+               retString += "\n\n";
+
+               retString += " Speculative Table: \n";
+               retString += "---------------------------\n";
+               retString += speculativeTable.toString();
+
+               return retString;
        }
 
+
+
+
+
+
        public void startTransaction() {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
@@ -128,6 +160,11 @@ final public class Table {
 
        public void commitTransaction() {
 
+               if (pendingTransBuild.getKVUpdates().size() == 0) {
+                       // If no updates are made then there is no point inserting into the chain
+                       return;
+               }
+
                // Add the pending transaction to the queue
                pendingTransQueue.add(pendingTransBuild);
 
@@ -139,15 +176,48 @@ final public class Table {
        }
 
        public void addKV(IoTString key, IoTString value) {
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw and error
+                       throw new Error("Not all Key Values match");
+               }
+
+
+
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
 
+
+       // TODo: FIx Guard
        public void addGuard(IoTString key, IoTString value) {
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
 
+       public void update() {
+               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+
+               validateandupdate(newslots, false);
+       }
+
+       public boolean createNewKey(IoTString keyName, long machineId) {
+
+               while (true) {
+                       if (arbitratorTable.get(keyName) != null) {
+                               // There is already an arbitrator
+                               return false;
+                       }
+
+                       if (tryput(keyName, machineId, false)) {
+
+                               // If successfully inserted
+                               return true;
+                       }
+               }
+       }
+
        void decrementLiveCount() {
                liveslotcount--;
        }
@@ -218,6 +288,8 @@ final public class Table {
                long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
                long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
 
+
+               // Mandatory Rescue
                for (; seqn < threshold; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
                        //Push slot number forward
@@ -241,6 +313,46 @@ final public class Table {
                }
 
 
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               for (Transaction ut : uncommittedTransactionsList) {
+
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
+
+                       Entry newEntry = null;
+
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
+
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
+
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
+               }
+
                Transaction trans = new Transaction(s,
                                                    s.getSequenceNumber(),
                                                    localmachineid,
@@ -298,14 +410,195 @@ final public class Table {
                return insertedTrans;
        }
 
+       private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+               Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize = true; //Resize is forced
+               }
+
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
+               }
+
+               if (! rejectedmessagelist.isEmpty()) {
+                       /* TODO: We should avoid generating a rejected message entry if
+                        * there is already a sufficient entry in the queue (e.g.,
+                        * equalsto value of true and same sequence number).  */
+
+                       long old_seqn = rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn = rejectedmessagelist.lastElement();
+                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn = -1;
+                               int i = 0;
+                               /* Go through list of missing messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       if (s_msg != null)
+                                               break;
+                                       prev_seqn = curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for (; i < rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       long machineid = s_msg.getMachineID();
+                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+
+               long newestseqnum = buffer.getNewestSeqNum();
+               long oldestseqnum = buffer.getOldestSeqNum();
+               if (lastliveslotseqn < oldestseqnum)
+                       lastliveslotseqn = oldestseqnum;
+
+               long seqn = lastliveslotseqn;
+               boolean seenliveslot = false;
+               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+
+
+               // Mandatory Rescue
+               for (; seqn < threshold; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (! seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (! prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry)) {
+                                       s.addEntry(liveentry);
+                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+                                       if (!resize) {
+                                               System.out.print("B"); //?
+                                               return tryput(keyName, arbMachineid, true);
+                                       }
+                               }
+                       }
+               }
+
+
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               for (Transaction ut : uncommittedTransactionsList) {
+
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
+
+                       Entry newEntry = null;
+
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
+
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
+
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
+               }
+
+
+               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+               boolean insertedNewKey = false;
+               if (s.hasSpace(newKey)) {
+                       s.addEntry(newKey);
+                       insertedNewKey = true;
+               }
+
+               /* now go through live entries from least to greatest sequence number until
+                * either all live slots added, or the slot doesn't have enough room
+                * for SKIP_THRESHOLD consecutive entries*/
+               int skipcount = 0;
+               search:
+               for (; seqn <= newestseqnum; seqn++) {
+                       Slot prevslot = buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for (Entry liveentry : liveentries) {
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else {
+                                       skipcount++;
+                                       if (skipcount > SKIP_THRESHOLD)
+                                               break search;
+                               }
+                       }
+               }
+
+               int max = 0;
+               if (resize)
+                       max = newsize;
+               Slot[] array = cloud.putSlot(s, max);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
+                       insertedNewKey = false;
+               }
+
+               /* update data structure */
+               validateandupdate(array, true);
+
+               return insertedNewKey;
+       }
+
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
                /* The cloud communication layer has checked slot HMACs already
                         before decoding */
                if (newslots.length == 0) return;
 
                long firstseqnum = newslots[0].getSequenceNumber();
-               if (firstseqnum <= sequencenumber)
+               if (firstseqnum <= sequencenumber) {
                        throw new Error("Server Error: Sent older slots!");
+               }
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
@@ -320,9 +613,12 @@ final public class Table {
 
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
+
+                       // TODO: Check size
                        checkNumSlots(newslots.length);
-                       if (!machineSet.isEmpty())
+                       if (!machineSet.isEmpty()) {
                                throw new Error("Missing record for machines: " + machineSet);
+                       }
                }
 
                commitNewMaxSize();
@@ -333,13 +629,37 @@ final public class Table {
                        liveslotcount++;
                }
                sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+
+               // Speculate on key value pairs
+               createSpeculativeTable();
+       }
+
+       private void createSpeculativeTable() {
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+               for (Transaction trans : uncommittedTransactionsList) {
+
+                       try {
+                               if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
+                               }
+
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               speculativeTable = speculativeTableTmp;
        }
 
        private int expectedsize, currmaxsize;
 
        private void checkNumSlots(int numslots) {
-               if (numslots != expectedsize)
+               if (numslots != expectedsize) {
                        throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numslots);
+               }
        }
 
        private void initExpectedSize(long firstsequencenumber) {
@@ -350,8 +670,9 @@ final public class Table {
 
        private void updateExpectedSize() {
                expectedsize++;
-               if (expectedsize > currmaxsize)
+               if (expectedsize > currmaxsize) {
                        expectedsize = currmaxsize;
+               }
        }
 
        private void updateCurrMaxSize(int newmaxsize) {
@@ -366,6 +687,10 @@ final public class Table {
                setResizeThreshold();
        }
 
+
+
+
+
        private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
@@ -414,6 +739,16 @@ final public class Table {
        }
 
        private void processEntry(Abort entry) {
+
+
+               if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
+                       // Abort has not been seen yet so we need to keep track of it
+                       abortSet.add(entry);
+               } else {
+                       // The machine already saw this so it is dead
+                       entry.setDead();
+               }
+
                for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
                        Transaction prevtrans = i.next();
                        if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
@@ -456,13 +791,6 @@ final public class Table {
                }
        }
 
-       private void addWatchList(long machineid, RejectedMessage entry) {
-               HashSet<RejectedMessage> entries = watchlist.get(machineid);
-               if (entries == null)
-                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
-               entries.add(entry);
-       }
-
        private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
@@ -471,6 +799,14 @@ final public class Table {
                lastTableStatus = entry;
        }
 
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries = watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+               entries.add(entry);
+       }
+
        private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
                machineSet.remove(machineid);
 
@@ -498,6 +834,16 @@ final public class Table {
                        }
                }
 
+               // Set dead the abort
+               for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
+                       Abort abort = ait.next();
+
+                       if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
+                               abort.setDead();
+                               ait.remove();
+                       }
+               }
+
 
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
index d25403efab5a8b1935433cb6010b322f0267c0fa..18b8721b235050fb537db8bdc1bea265b55b6d38 100644 (file)
@@ -36,6 +36,9 @@ class Transaction extends Entry {
         return keyValueUpdateSet;
     }
 
+    public Guard getGuard() {
+        return guard;
+    }
 
     public byte getType() {
         return Entry.TypeTransaction;