+ private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+
+ // encode the request
+ byte[] array = new byte[Long.BYTES + ut.getSize()];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
+ }
+ ut.encode(bbEncode);
+
+ byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
+ }
+
+ return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+ }
+
+ public byte[] localCommInput(byte[] data) {
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastSeenCommit = bbDecode.getLong();
+ bbDecode.get();
+ Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
+
+ // Do the local update and arbitrate
+ Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+
+ // Calculate the size of the response
+ int size = Byte.BYTES + Integer.BYTES;
+ for (Commit com : returnData.getSecond()) {
+ size += com.getSize();
+ }
+
+ // encode the response
+ byte[] array = new byte[size];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ if (returnData.getFirst()) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ bbEncode.putInt(returnData.getSecond().size());
+
+ for (Commit com : returnData.getSecond()) {
+ com.encode(bbEncode);
+ }
+
+ return bbEncode.array();
+ }
+
+ private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+ if (ut.getArbitrator() != localmachineid) {
+ // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+ return null;
+ }
+
+ List<Commit> returnCommits = new ArrayList<Commit>();
+
+ if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+ // There is a commit that the other client has not seen yet
+
+ Map<Long, Commit> cm = commitMap.get(localmachineid);
+ if (cm != null) {
+
+ List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+ Collections.sort(commitKeys);
+
+
+ for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+ Commit com = cm.get(commitKeys.get(i));
+
+ if (com.getSequenceNumber() <= lastCommitSeen) {
+ break;
+ }
+ returnCommits.add((Commit)com.getCopy(null));
+ }
+ }
+ }
+
+ if (!ut.evaluateGuard(commitedTable, null)) {
+ // Guard evaluated as false so return only the commits that the other device has not seen yet
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
+ // create the commit
+ Commit commit = new Commit(null,
+ -1,
+ commitSequenceNumber,
+ ut.getArbitrator(),
+ ut.getkeyValueUpdateSet());
+ commitSequenceNumber = commitSequenceNumber + 1;
+
+ // Add to the pending commits list
+ pendingCommitsList.add(commit);
+
+ // Add this commit so we can send it back
+ returnCommits.add(commit);
+
+ // Prepare to process the commit
+ processEntry(commit);
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ return new Pair<Boolean, List<Commit>>(true, returnCommits);
+ }
+