import java.util.Set;
import java.util.Collection;
import java.util.Collections;
+import java.nio.ByteBuffer;
/**
final public class Table {
private int numslots; //number of slots stored in buffer
- //table of key-value pairs
- //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
-
// machine id -> (sequence number, Slot or LastMessage); records last message by each client
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
// machine id -> ...
private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10; //number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
- public long liveslotcount = 0; // TODO: MAKE PRIVATE
+ private long liveslotcount = 0;
private int chance;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
static final int REJECTED_THRESHOLD = 5;
- public int resizethreshold; // TODO: MAKE PRIVATE
+ private int resizethreshold;
private long lastliveslotseqn; //smallest sequence number with a live entry
private Random random = new Random();
- private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
+ private long lastUncommittedTransaction = 0;
- private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
- private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
- private List<Commit> commitList = null; // List of all the most recent live commits
- private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
+ private int smallestTableStatusSeen = -1;
+ private int largestTableStatusSeen = -1;
+ private int lastSeenPendingTransactionSpeculateIndex = 0;
+ private int commitSequenceNumber = 0;
+ private long localTransactionSequenceNumber = 0;
- private Set<Abort> abortSet = null; // Set of the live aborts
- public Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV TODO: Make Private
+ private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
+ private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+ private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
+ private Map<Long, Abort> abortMap = null; // Set of the live aborts
+ private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
+ private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
- public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
+ private Map<Long, Transaction> uncommittedTransactionsMap = null;
private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
- // private Set<Abort> arbitratorTable = null; // Table of arbitrators
- private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
-
-
-
- public Table(String baseurl, String password, long _localmachineid) {
+ private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
+ private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+ private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
+ private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
+ private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
+ private List<Commit> pendingCommitsList = null;
+ private List<Commit> pendingCommitsToDelete = null;
+ private Map<Long, LocalComm> localCommunicationChannels;
+ private Map<Long, TransactionStatus> transactionStatusMap = null;
+ private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
+
+
+ public Table(String hostname, String baseurl, String password, long _localmachineid) {
localmachineid = _localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
setResizeThreshold();
sequencenumber = 0;
- cloud = new CloudComm(this, baseurl, password);
+ cloud = new CloudComm(this, hostname, baseurl, password);
lastliveslotseqn = 1;
setupDataStructs();
private void setupDataStructs() {
pendingTransQueue = new LinkedList<PendingTransaction>();
- commitList = new LinkedList<Commit>();
- abortSet = new HashSet<Abort>();
+ commitMap = new HashMap<Long, Map<Long, Commit>>();
+ abortMap = new HashMap<Long, Abort>();
+ committedMapByKey = new HashMap<IoTString, Commit>();
commitedTable = new HashMap<IoTString, KeyValue>();
speculativeTable = new HashMap<IoTString, KeyValue>();
uncommittedTransactionsMap = new HashMap<Long, Transaction>();
arbitratorTable = new HashMap<IoTString, Long>();
newKeyTable = new HashMap<IoTString, NewKey>();
- newCommitMap = new HashMap<Long, Commit> ();
+ newCommitMap = new HashMap<Long, Map<Long, Commit>>();
+ lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+ lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
+ lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
+ pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
+ pendingCommitsList = new LinkedList<Commit>();
+ pendingCommitsToDelete = new LinkedList<Commit>();
+ localCommunicationChannels = new HashMap<Long, LocalComm>();
+ transactionStatusMap = new HashMap<Long, TransactionStatus>();
+ transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+ }
+
+ public void initTable() throws ServerException {
+ cloud.setSalt();//Set the salt
+ Slot s = new Slot(this, 1, localmachineid);
+ TableStatus status = new TableStatus(s, numslots);
+ s.addEntry(status);
+ Slot[] array = cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ /* update data structure */
+ validateandupdate(array, true);
+ } else {
+ throw new Error("Error on initialization");
+ }
}
- public void rebuild() {
+ public void rebuild() throws ServerException {
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, true);
}
System.out.println("Old: " + o);
System.out.println("New: " + n);
System.out.println("Size: " + buffer.size());
- System.out.println("Commits Map: " + commitedTable.size());
- System.out.println("Commits List: " + commitList.size());
+ System.out.println("Commits Key Map: " + commitedTable.size());
+ // System.out.println("Commits Live Map: " + commitMap.size());
+ System.out.println("Pending: " + pendingTransQueue.size());
+
+ // List<IoTString> strList = new ArrayList<IoTString>();
+ // for (int i = 0; i < 100; i++) {
+ // String keyA = "a" + i;
+ // String keyB = "b" + i;
+ // String keyC = "c" + i;
+ // String keyD = "d" + i;
+
+ // IoTString iKeyA = new IoTString(keyA);
+ // IoTString iKeyB = new IoTString(keyB);
+ // IoTString iKeyC = new IoTString(keyC);
+ // IoTString iKeyD = new IoTString(keyD);
+
+ // strList.add(iKeyA);
+ // strList.add(iKeyB);
+ // strList.add(iKeyC);
+ // strList.add(iKeyD);
+ // }
+
+
+ // for (Long l : commitMap.keySet()) {
+ // for (Long l2 : commitMap.get(l).keySet()) {
+ // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+ // strList.remove(kv.getKey());
+ // System.out.print(kv.getKey() + " ");
+ // }
+ // }
+ // }
+
+ // System.out.println();
+ // System.out.println();
+
+ // for (IoTString s : strList) {
+ // System.out.print(s + " ");
+ // }
+ // System.out.println();
+ // System.out.println(strList.size());
+ }
+
+ public long getId() {
+ return localmachineid;
+ }
+
+ public boolean hasConnection() {
+ return cloud.hasConnection();
+ }
+
+ public String toString() {
+ String retString = " Committed Table: \n";
+ retString += "---------------------------\n";
+ retString += commitedTable.toString();
+
+ retString += "\n\n";
+
+ retString += " Speculative Table: \n";
+ retString += "---------------------------\n";
+ retString += speculativeTable.toString();
+
+ return retString;
+ }
+
+ public void addLocalComm(long machineId, LocalComm lc) {
+ localCommunicationChannels.put(machineId, lc);
+ }
+ public Long getArbitrator(IoTString key) {
+ return arbitratorTable.get(key);
}
public IoTString getCommitted(IoTString key) {
}
public IoTString getSpeculative(IoTString key) {
- KeyValue kv = speculativeTable.get(key);
+ KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+ if (kv == null) {
+ kv = speculativeTable.get(key);
+ }
+
+ if (kv == null) {
+ kv = commitedTable.get(key);
+ }
+
if (kv != null) {
return kv.getValue();
} else {
}
}
- public void initTable() {
- cloud.setSalt();//Set the salt
- Slot s = new Slot(this, 1, localmachineid);
- TableStatus status = new TableStatus(s, numslots);
- s.addEntry(status);
- Slot[] array = cloud.putSlot(s, numslots);
- if (array == null) {
- array = new Slot[] {s};
- /* update data structure */
- validateandupdate(array, true);
+ public IoTString getCommittedAtomic(IoTString key) {
+ KeyValue kv = commitedTable.get(key);
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
} else {
- throw new Error("Error on initialization");
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
}
}
- public String toString() {
- String retString = " Committed Table: \n";
- retString += "---------------------------\n";
- retString += commitedTable.toString();
+ public IoTString getSpeculativeAtomic(IoTString key) {
- retString += "\n\n";
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
- retString += " Speculative Table: \n";
- retString += "---------------------------\n";
- retString += speculativeTable.toString();
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
- return retString;
+ KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+ if (kv == null) {
+ kv = speculativeTable.get(key);
+ }
+
+ if (kv == null) {
+ kv = commitedTable.get(key);
+ }
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
+ } else {
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
+ }
}
- public void startTransaction() {
- // Create a new transaction, invalidates any old pending transactions.
- pendingTransBuild = new PendingTransaction();
+ public Pair<Boolean, Boolean> update() {
+
+ boolean gotLatestFromServer = false;
+ boolean didSendLocal = false;
+
+ try {
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+ validateandupdate(newslots, false);
+ gotLatestFromServer = true;
+
+ if (!pendingTransQueue.isEmpty()) {
+
+ // We have a pending transaction so do full insertion
+ processPendingTrans();
+ } else {
+
+ // We dont have a pending transaction so do minimal effort
+ updateWithNotPendingTrans();
+ }
+
+ didSendLocal = true;
+
+ } catch (Exception e) {
+ // could not update so do nothing
+ }
+
+ return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
}
- public void commitTransaction() {
+ public Boolean updateFromLocal(long arb) {
+ LocalComm lc = localCommunicationChannels.get(arb);
+ if (lc == null) {
+ // Cant talk directly to arbitrator so cant do anything
+ return false;
+ }
- if (pendingTransBuild.getKVUpdates().size() == 0) {
- // If no updates are made then there is no point inserting into the chain
- return;
+ byte[] array = new byte[Long.BYTES ];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
}
- // Add the pending transaction to the queue
- pendingTransQueue.add(pendingTransBuild);
+ byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
- while (!pendingTransQueue.isEmpty()) {
- if (tryput( pendingTransQueue.peek(), false)) {
- pendingTransQueue.poll();
- }
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
}
+
+
+ for (Commit commit : newCommits) {
+ // Prepare to process the commit
+ processEntry(commit);
+ }
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ return true;
+ }
+
+ public void startTransaction() {
+ // Create a new transaction, invalidates any old pending transactions.
+ pendingTransBuild = new PendingTransaction();
}
public void addKV(IoTString key, IoTString value) {
// Make sure new key value pair matches the current arbitrator
if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
// TODO: Maybe not throw en error
- throw new Error("Not all Key Values Match.");
+ throw new Error("Not all Key Values Match Arbitrator.");
}
KeyValue kv = new KeyValue(key, value);
pendingTransBuild.addKV(kv);
}
- public void addGuard(Guard guard) {
- pendingTransBuild.addGuard(guard);
- }
+ public TransactionStatus commitTransaction() {
- public void update() {
+ if (pendingTransBuild.getKVUpdates().size() == 0) {
- Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+ // transaction with no updates will have no effect on the system
+ return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
+ }
- validateandupdate(newslots, false);
+ TransactionStatus transStatus = null;
- if (uncommittedTransactionsMap.keySet().size() > 0) {
+ if (pendingTransBuild.getArbitrator() != localmachineid) {
- boolean doEnd = false;
- boolean needResize = false;
- while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
- boolean resize = needResize;
- needResize = false;
+ // set the local sequence number so we can recognize this transaction later
+ pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
+ localTransactionSequenceNumber++;
- Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- int newsize = 0;
- if (liveslotcount > resizethreshold) {
- resize = true; //Resize is forced
- }
+ transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
+ transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
- if (resize) {
- newsize = (int) (numslots * RESIZE_MULTIPLE);
- TableStatus status = new TableStatus(s, newsize);
- s.addEntry(status);
- }
+ // Add the pending transaction to the queue
+ pendingTransQueue.add(pendingTransBuild);
- doRejectedMessages(s);
- ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+ for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
+ PendingTransaction pt = pendingTransQueue.get(i);
- // Resize was needed so redo call
- if (retTup.getFirst()) {
- needResize = true;
- continue;
- }
-
- // Extract working variables
- boolean seenliveslot = retTup.getSecond();
- long seqn = retTup.getThird();
+ if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
- // Did need to arbitrate
- doEnd = !doArbitration(s);
+ lastSeenPendingTransactionSpeculateIndex = i;
- doOptionalRescue(s, seenliveslot, seqn, resize);
+ for (KeyValue kv : pt.getKVUpdates()) {
+ pendingTransSpeculativeTable.put(kv.getKey(), kv);
+ }
- int max = 0;
- if (resize) {
- max = newsize;
}
+ }
+ } else {
+ Transaction ut = new Transaction(null,
+ -1,
+ localmachineid,
+ pendingTransBuild.getArbitrator(),
+ pendingTransBuild.getKVUpdates(),
+ pendingTransBuild.getKVGuard());
- Slot[] array = cloud.putSlot(s, max);
- if (array == null) {
- array = new Slot[] {s};
- rejectedmessagelist.clear();
- } else {
- if (array.length == 0)
- throw new Error("Server Error: Did not send any slots");
- rejectedmessagelist.add(s.getSequenceNumber());
- doEnd = false;
- }
+ Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
- /* update data structure */
- validateandupdate(array, true);
+ if (retData.getFirst()) {
+ transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
+ } else {
+ transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
+ }
+ }
+
+ // Try to insert transactions if possible
+ if (!pendingTransQueue.isEmpty()) {
+ // We have a pending transaction so do full insertion
+ processPendingTrans();
+ } else {
+ try {
+ // We dont have a pending transaction so do minimal effort
+ updateWithNotPendingTrans();
+ } catch (Exception e) {
+ // Do nothing
}
}
+
+ // reset it so next time is fresh
+ pendingTransBuild = new PendingTransaction();
+
+ return transStatus;
}
- public boolean createNewKey(IoTString keyName, long machineId) {
+ public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
while (true) {
if (arbitratorTable.get(keyName) != null) {
}
if (tryput(keyName, machineId, false)) {
-
// If successfully inserted
return true;
}
}
}
+ private void processPendingTrans() {
+
+ boolean sentAllPending = false;
+ try {
+ while (!pendingTransQueue.isEmpty()) {
+ if (tryput( pendingTransQueue.peek(), false)) {
+ pendingTransQueue.poll();
+ }
+ }
+
+ // if got here then all pending transactions were sent
+ sentAllPending = true;
+ } catch (Exception e) {
+ // There was a connection error
+ sentAllPending = false;
+ }
+
+ if (!sentAllPending) {
+
+ for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
+ PendingTransaction pt = i.next();
+ LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
+ if (lc == null) {
+ // Cant talk directly to arbitrator so cant do anything
+ continue;
+ }
+
+
+ Transaction ut = new Transaction(null,
+ -1,
+ localmachineid,
+ pendingTransBuild.getArbitrator(),
+ pendingTransBuild.getKVUpdates(),
+ pendingTransBuild.getKVGuard());
+
+
+ Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
+
+ for (Commit commit : retData.getSecond()) {
+ // Prepare to process the commit
+ processEntry(commit);
+ }
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ if (retData.getFirst()) {
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ if (transStatus != null) {
+ transStatus.setStatus(TransactionStatus.StatusCommitted);
+ }
+
+ } else {
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ if (transStatus != null) {
+ transStatus.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+ i.remove();
+ }
+ }
+ }
+
+ private void updateWithNotPendingTrans() throws ServerException {
+
+ boolean doEnd = false;
+ boolean needResize = false;
+ while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
+ boolean resize = needResize;
+ needResize = false;
+
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ doRejectedMessages(s);
+
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ needResize = true;
+ continue;
+ }
+
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
+
+ // Did need to arbitrate
+ doEnd = !doArbitration(s);
+
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+
+ int max = 0;
+ if (resize) {
+ max = newsize;
+ }
+
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+
+ // Delete pending commits that were sent to the cloud
+ deletePendingCommits();
+
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ doEnd = false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+ }
+ }
+
+ private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+
+ // encode the request
+ byte[] array = new byte[Long.BYTES + ut.getSize()];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
+ }
+ ut.encode(bbEncode);
+
+ byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
+ }
+
+ return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+ }
+
+ public byte[] localCommInput(byte[] data) {
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastSeenCommit = bbDecode.getLong();
+
+ Transaction ut = null;
+ if (data.length != Long.BYTES) {
+ bbDecode.get();
+ ut = (Transaction)Transaction.decode(null, bbDecode);
+ }
+ // Do the local update and arbitrate
+ Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+
+ // Calculate the size of the response
+ int size = Byte.BYTES + Integer.BYTES;
+ for (Commit com : returnData.getSecond()) {
+ size += com.getSize();
+ }
+
+ // encode the response
+ byte[] array = new byte[size];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ if (returnData.getFirst()) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ bbEncode.putInt(returnData.getSecond().size());
+
+ for (Commit com : returnData.getSecond()) {
+ com.encode(bbEncode);
+ }
+
+ return bbEncode.array();
+ }
+
+ private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+ List<Commit> returnCommits = new ArrayList<Commit>();
+
+ if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+ // There is a commit that the other client has not seen yet
+
+ Map<Long, Commit> cm = commitMap.get(localmachineid);
+ if (cm != null) {
+
+ List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+ Collections.sort(commitKeys);
+
+
+ for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+ Commit com = cm.get(commitKeys.get(i));
+
+ if (com.getSequenceNumber() <= lastCommitSeen) {
+ break;
+ }
+ returnCommits.add((Commit)com.getCopy(null));
+ }
+ }
+ }
+
+
+ if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
+ // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+ // or there is no transaction to process
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
+ if (!ut.evaluateGuard(commitedTable, null)) {
+ // Guard evaluated as false so return only the commits that the other device has not seen yet
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
+ // create the commit
+ Commit commit = new Commit(null,
+ -1,
+ commitSequenceNumber,
+ ut.getArbitrator(),
+ ut.getkeyValueUpdateSet());
+ commitSequenceNumber = commitSequenceNumber + 1;
+
+ // Add to the pending commits list
+ pendingCommitsList.add(commit);
+
+ // Add this commit so we can send it back
+ returnCommits.add(commit);
+
+ // Prepare to process the commit
+ processEntry(commit);
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ return new Pair<Boolean, List<Commit>>(true, returnCommits);
+ }
+
public void decrementLiveCount() {
liveslotcount--;
- // System.out.println("Decrement Live Count");
}
private void setResizeThreshold() {
resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
}
- private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
+ private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
if (liveslotcount > resizethreshold) {
resize = true; //Resize is forced
- System.out.println("Live count resize: " + liveslotcount + " " + resizethreshold);
-
}
if (resize) {
newsize = (int) (numslots * RESIZE_MULTIPLE);
-
- System.out.println("New Size: " + newsize + " old: " + buffer.oldestseqn); // TODO remove
-
TableStatus status = new TableStatus(s, newsize);
s.addEntry(status);
}
doRejectedMessages(s);
-
-
ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
// Resize was needed so redo call
Transaction trans = new Transaction(s,
s.getSequenceNumber(),
localmachineid,
+ pendingTrans.getArbitrator(),
pendingTrans.getKVUpdates(),
- pendingTrans.getGuard());
+ pendingTrans.getKVGuard());
boolean insertedTrans = false;
if (s.hasSpace(trans)) {
s.addEntry(trans);
}
doOptionalRescue(s, seenliveslot, seqn, resize);
- insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
+ Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
+
+ if (sendRetData.getFirst()) {
+ // update the status and change what the sequence number is for the
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+ transStatus.setStatus(TransactionStatus.StatusSent);
+ transStatus.setSentTransaction();
+ transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
+ }
+
- if (insertedTrans) {
- // System.out.println("Inserted: " + trans.getSequenceNumber());
+ if (sendRetData.getSecond().length != 0) {
+ // insert into the local block chain
+ validateandupdate(sendRetData.getSecond(), true);
}
- return insertedTrans;
+ return sendRetData.getFirst();
}
- private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+ private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
if (liveslotcount > resizethreshold) {
boolean seenliveslot = retTup.getSecond();
long seqn = retTup.getThird();
-
doArbitration(s);
NewKey newKey = new NewKey(s, keyName, arbMachineid);
}
doOptionalRescue(s, seenliveslot, seqn, resize);
- return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+ Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
+
+ if (sendRetData.getSecond().length != 0) {
+ // insert into the local block chain
+ validateandupdate(sendRetData.getSecond(), true);
+ }
+
+ return sendRetData.getFirst();
}
private void doRejectedMessages(Slot s) {
} else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
if (!resize) {
System.out.println("B"); //?
-
- System.out.println("==============================NEEEEDDDD RESIZING");
return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
}
}
}
private boolean doArbitration(Slot s) {
- // Arbitrate
- Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ // flag whether we have finished all arbitration
+ boolean stillHasArbitration = false;
+
+ pendingCommitsToDelete.clear();
+
+ // First add queue commits
+ for (Commit commit : pendingCommitsList) {
+ if (s.hasSpace(commit)) {
+ s.addEntry(commit);
+ pendingCommitsToDelete.add(commit);
+ } else {
+ // Ran out of space so move on but still not done
+ stillHasArbitration = true;
+ return stillHasArbitration;
+ }
+ }
+
+ // Arbitrate
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
// Sort from oldest to newest
Collections.sort(transSeqNums);
-
- boolean didNeedArbitration = false;
for (Long transNum : transSeqNums) {
Transaction ut = uncommittedTransactionsMap.get(transNum);
- KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
// Check if this machine arbitrates for this transaction
- if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ if (ut.getArbitrator() != localmachineid ) {
continue;
}
// we did have something to arbitrate on
- didNeedArbitration = true;
+ stillHasArbitration = true;
Entry newEntry = null;
- try {
- if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
- // Guard evaluated as true
+ if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
+ // Guard evaluated as true
- // update the local tmp current key set
- for (KeyValue kv : ut.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
- // create the commit
- newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
- } else {
- // Guard was false
+ // create the commit
+ newEntry = new Commit(s,
+ ut.getSequenceNumber(),
+ commitSequenceNumber,
+ ut.getArbitrator(),
+ ut.getkeyValueUpdateSet());
+ commitSequenceNumber = commitSequenceNumber + 1;
+ } else {
+ // Guard was false
- // create the abort
- newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
- }
- } catch (Exception e) {
- e.printStackTrace();
+ // create the abort
+ newEntry = new Abort(s,
+ ut.getSequenceNumber(),
+ ut.getMachineID(),
+ ut.getArbitrator());
}
if ((newEntry != null) && s.hasSpace(newEntry)) {
}
}
- return didNeedArbitration;
+ return stillHasArbitration;
+ }
+
+ private void deletePendingCommits() {
+ for (Commit com : pendingCommitsToDelete) {
+ pendingCommitsList.remove(com);
+ }
+ pendingCommitsToDelete.clear();
}
private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
}
}
- private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
+ private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
int max = 0;
if (resize)
max = newsize;
+
Slot[] array = cloud.putSlot(s, max);
if (array == null) {
array = new Slot[] {s};
rejectedmessagelist.clear();
+
+ // Delete pending commits that were sent to the cloud
+ deletePendingCommits();
} else {
- if (array.length == 0)
- throw new Error("Server Error: Did not send any slots");
+ // if (array.length == 0)
+ // throw new Error("Server Error: Did not send any slots");
rejectedmessagelist.add(s.getSequenceNumber());
inserted = false;
}
- /* update data structure */
- validateandupdate(array, true);
-
- return inserted;
+ return new Pair<Boolean, Slot[]>(inserted, array);
}
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
before decoding */
if (newslots.length == 0) return;
+ // Reset the table status declared sizes
+ smallestTableStatusSeen = -1;
+ largestTableStatusSeen = -1;
+
long firstseqnum = newslots[0].getSequenceNumber();
if (firstseqnum <= sequencenumber) {
throw new Error("Server Error: Sent older slots!");
HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
- initExpectedSize(firstseqnum);
+ // initExpectedSize(firstseqnum);
for (Slot slot : newslots) {
processSlot(indexer, slot, acceptupdatestolocal, machineSet);
- updateExpectedSize();
+ // updateExpectedSize();
}
- proccessAllNewCommits();
-
/* If there is a gap, check to see if the server sent us everything. */
if (firstseqnum != (sequencenumber + 1)) {
}
}
+
commitNewMaxSize();
/* Commit new to slots. */
}
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+ // Process all on key value pairs
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
// Speculate on key value pairs
- createSpeculativeTable();
- }
+ didCommitOrSpeculate |= createSpeculativeTable();
- public void proccessAllNewCommits() {
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+ }
+ private boolean proccessAllNewCommits() {
// Process only if there are commit
if (newCommitMap.keySet().size() == 0) {
- return;
+ return false;
}
+ boolean didProcessNewCommit = false;
- List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
+ for (Long arb : newCommitMap.keySet()) {
- // Sort from oldest to newest commit
- Collections.sort(commitSeqNums);
+ List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
+
+ // Sort from oldest to newest commit
+ Collections.sort(commitSeqNums);
- // Go through each new commit one by one
- for (Long entrySeqNum : commitSeqNums) {
- Commit entry = newCommitMap.get(entrySeqNum);
+ // Go through each new commit one by one
+ for (Long entrySeqNum : commitSeqNums) {
+ Commit entry = newCommitMap.get(arb).get(entrySeqNum);
- if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+ long lastCommitSeenSeqNum = -1;
+ if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+ lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+ }
+
+ if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
+ Map<Long, Commit> cm = commitMap.get(arb);
+ if (cm == null) {
+ cm = new HashMap<Long, Commit>();
+ }
- // Remove any old commits
- for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
- Commit prevcommit = i.next();
+ Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
+ commitMap.put(arb, cm);
- if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
- prevcommit.setDead();
- i.remove();
+ if (prevCommit != null) {
+ prevCommit.setDead();
+
+ for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+ committedMapByKey.put(kv.getKey(), entry);
+ }
}
+
+ continue;
}
- commitList.add(entry);
- continue;
- }
- // Remove any old commits
- for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
- Commit prevcommit = i.next();
- prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+ Set<Commit> commitsToEditSet = new HashSet<Commit>();
- if (!prevcommit.isLive()) {
- i.remove();
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
}
- }
- // Add the new commit
- commitList.add(entry);
- lastCommitSeenSeqNum = entry.getTransSequenceNumber();
- // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
+ commitsToEditSet.remove(null);
+
+ for (Commit prevCommit : commitsToEditSet) {
+
+ Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+ if (!prevCommit.isLive()) {
+ Map<Long, Commit> cm = commitMap.get(arb);
+
+ // remove it from the map so that it can be set as dead
+ if (cm != null) {
+ cm.remove(prevCommit.getSequenceNumber());
+ commitMap.put(arb, cm);
+ }
+ }
+ }
+
+ // Add the new commit
+ Map<Long, Commit> cm = commitMap.get(arb);
+ if (cm == null) {
+ cm = new HashMap<Long, Commit>();
+ }
+ cm.put(entry.getSequenceNumber(), entry);
+ commitMap.put(arb, cm);
+
+ lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
+ // set the trans sequence number if we are able to
+ if (entry.getTransSequenceNumber() != -1) {
+ lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ }
+
+ didProcessNewCommit = true;
- // Update the committed table list
- for (KeyValue kv : entry.getkeyValueUpdateSet()) {
- IoTString key = kv.getKey();
- commitedTable.put(key, kv);
+ // Update the committed table list
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ IoTString key = kv.getKey();
+ commitedTable.put(key, kv);
+ committedMapByKey.put(key, entry);
+ }
}
+ }
+ // Clear the new commits storage so we can use it later
+ newCommitMap.clear();
+
- long committedTransSeq = entry.getTransSequenceNumber();
+ // go through all saved transactions and update the status of those that can be updated
+ for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<Long, TransactionStatus> entry = i.next();
+ long seqnum = entry.getKey();
+ TransactionStatus status = entry.getValue();
- // Make dead the transactions
- for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
- Transaction prevtrans = i.next().getValue();
+ if (status.getSentTransaction()) {
- if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+ Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
+ Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
+
+ if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
+ ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
+ status.setStatus(TransactionStatus.StatusCommitted);
i.remove();
- prevtrans.setDead();
}
}
}
+ return didProcessNewCommit;
+ }
- // Clear the new commits storage so we can use it later
- newCommitMap.clear();
+ private void deleteDeadUncommittedTransactions() {
+ // Make dead the transactions
+ for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next().getValue();
+ long transArb = prevtrans.getArbitrator();
+
+ Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
+ Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
+
+ if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
+ ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
+ i.remove();
+ prevtrans.setDead();
+ }
+ }
}
- private void createSpeculativeTable() {
- Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ private boolean createSpeculativeTable() {
+ if (uncommittedTransactionsMap.keySet().size() == 0) {
+ return false;
+ }
+
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
// Sort from oldest to newest commit
Collections.sort(utSeqNums);
+ if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
- for (Long key : utSeqNums) {
- Transaction trans = uncommittedTransactionsMap.get(key);
+ speculativeTable.clear();
+ lastUncommittedTransaction = -1;
- try {
- if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ for (Long key : utSeqNums) {
+ Transaction trans = uncommittedTransactionsMap.get(key);
+
+ lastUncommittedTransaction = key;
+
+ if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
for (KeyValue kv : trans.getkeyValueUpdateSet()) {
speculativeTableTmp.put(kv.getKey(), kv);
}
}
- } catch (Exception e) {
- e.printStackTrace();
+ }
+ } else {
+ for (Long key : utSeqNums) {
+
+ if (key <= lastUncommittedTransaction) {
+ continue;
+ }
+
+ lastUncommittedTransaction = key;
+
+ Transaction trans = uncommittedTransactionsMap.get(key);
+
+ if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+ }
}
}
- speculativeTable = speculativeTableTmp;
+ for (IoTString key : speculativeTableTmp.keySet()) {
+ speculativeTable.put(key, speculativeTableTmp.get(key));
+ }
+
+ return true;
+ }
+
+ private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
+
+ if (didCommitOrSpeculate) {
+ pendingTransSpeculativeTable.clear();
+ lastSeenPendingTransactionSpeculateIndex = 0;
+
+ int index = 0;
+ for (PendingTransaction pt : pendingTransQueue) {
+ if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+
+ lastSeenPendingTransactionSpeculateIndex = index;
+ index++;
+
+ for (KeyValue kv : pt.getKVUpdates()) {
+ pendingTransSpeculativeTable.put(kv.getKey(), kv);
+ }
+
+ }
+ }
+ }
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
- if (numslots != expectedsize) {
- throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+
+
+ // We only have 1 size so we must have this many slots
+ if (largestTableStatusSeen == smallestTableStatusSeen) {
+ if (numslots != smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
+ } else {
+ // We have more than 1
+ if (numslots < smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
}
+
+ // if (numslots != expectedsize) {
+ // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+ // }
}
private void initExpectedSize(long firstsequencenumber) {
}
private void commitNewMaxSize() {
+
+ if (largestTableStatusSeen == -1) {
+ currmaxsize = numslots;
+ } else {
+ currmaxsize = largestTableStatusSeen;
+ }
+
if (numslots != currmaxsize) {
- System.out.println("Resizing the buffer"); // TODO: Remove
buffer.resize(currmaxsize);
}
}
private void processEntry(Transaction entry) {
- Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+ long arb = entry.getArbitrator();
+ Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
+ Long abLast = lastAbortSeenSeqNumMap.get(arb);
+
+ Transaction prevTrans = null;
+
+ if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else {
+ prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+ }
// Duplicate so delete old copy
if (prevTrans != null) {
}
private void processEntry(Abort entry) {
-
if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
// Abort has not been seen yet so we need to keep track of it
- abortSet.add(entry);
+
+ Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
+ if (prevAbort != null) {
+ prevAbort.setDead(); // delete old version of the duplicate
+ }
+
+ if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+ lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ }
} else {
// The machine already saw this so it is dead
entry.setDead();
}
- // Make dead the transactions
- for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
- Transaction prevtrans = i.next().getValue();
-
- if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
- i.remove();
- prevtrans.setDead();
- }
+ // Update the status of the transaction and remove it since we are done with this transaction
+ TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
}
}
- private void processEntry(Commit entry, Slot s) {
- Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+ private void processEntry(Commit entry) {
+ Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
+
+ if (arbMap == null) {
+ arbMap = new HashMap<Long, Commit>();
+ }
+
+ Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
+ newCommitMap.put(entry.getTransArbitrator(), arbMap);
+
if (prevCommit != null) {
prevCommit.setDead();
}
private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
- updateCurrMaxSize(newnumslots);
+ // updateCurrMaxSize(newnumslots);
if (lastTableStatus != null)
lastTableStatus.setDead();
lastTableStatus = entry;
- }
+ if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
+ smallestTableStatusSeen = newnumslots;
+ }
+
+ if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
+ largestTableStatusSeen = newnumslots;
+ }
+ }
private void addWatchList(long machineid, RejectedMessage entry) {
HashSet<RejectedMessage> entries = watchlist.get(machineid);
}
// Set dead the abort
- for (Iterator<Abort> ait = abortSet.iterator(); ait.hasNext(); ) {
- Abort abort = ait.next();
+ for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
+ Abort abort = i.next().getValue();
if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
abort.setDead();
- ait.remove();
+ i.remove();
}
}
-
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
if (machineid == localmachineid) {
if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
- throw new Error("Server Error: Mismatch on local machine sequence number");
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
} else {
if (lastmsgseqnum > seqnum)
throw new Error("Server Error: Rollback on remote machine sequence number");
break;
case Entry.TypeCommit:
- processEntry((Commit)entry, slot);
+ processEntry((Commit)entry);
break;
case Entry.TypeAbort:
throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
}
}
-}
+}
\ No newline at end of file