X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2Fjava%2Fiotcloud%2FTable.java;h=be9defb91cef43d5921db27bd9f0ed3e9e3bb02e;hp=6aeb28e3628c57f611e56921717e83f924bad66a;hb=95c8a925e792895f5e4a5da7c08e2c592ee03ee9;hpb=b8cb12edb8ec00bcdb550303a2cc22c6d3fd7680 diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 6aeb28e..be9defb 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Set; import java.util.Collection; import java.util.Collections; +import java.nio.ByteBuffer; /** @@ -24,9 +25,6 @@ import java.util.Collections; final public class Table { private int numslots; //number of slots stored in buffer - //table of key-value pairs - //private HashMap table = new HashMap(); - // machine id -> (sequence number, Slot or LastMessage); records last message by each client private HashMap > lastmessagetable = new HashMap >(); // machine id -> ... @@ -52,10 +50,12 @@ final public class Table { private int smallestTableStatusSeen = -1; private int largestTableStatusSeen = -1; private int lastSeenPendingTransactionSpeculateIndex = 0; + private int commitSequenceNumber = 0; + private long localTransactionSequenceNumber = 0; private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building private LinkedList pendingTransQueue = null; // Queue of pending transactions - private Map commitMap = null; // List of all the most recent live commits + private Map> commitMap = null; // List of all the most recent live commits private Map abortMap = null; // Set of the live aborts private Map committedMapByKey = null; // Table of committed KV private Map commitedTable = null; // Table of committed KV @@ -63,20 +63,24 @@ final public class Table { private Map uncommittedTransactionsMap = null; private Map arbitratorTable = null; // Table of arbitrators private Map newKeyTable = null; // Table of speculative KV - private Map newCommitMap = null; // Map of all the new commits + private Map> newCommitMap = null; // Map of all the new commits private Map lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator - private Map lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator + private Map lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator + private Map lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator private Map pendingTransSpeculativeTable = null; + private List pendingCommitsList = null; + private List pendingCommitsToDelete = null; + private Map localCommunicationChannels; + private Map transactionStatusMap = null; - - public Table(String baseurl, String password, long _localmachineid) { + public Table(String hostname, String baseurl, String password, long _localmachineid) { localmachineid = _localmachineid; buffer = new SlotBuffer(); numslots = buffer.capacity(); setResizeThreshold(); sequencenumber = 0; - cloud = new CloudComm(this, baseurl, password); + cloud = new CloudComm(this, hostname, baseurl, password); lastliveslotseqn = 1; setupDataStructs(); @@ -95,7 +99,7 @@ final public class Table { private void setupDataStructs() { pendingTransQueue = new LinkedList(); - commitMap = new HashMap(); + commitMap = new HashMap>(); abortMap = new HashMap(); committedMapByKey = new HashMap(); commitedTable = new HashMap(); @@ -103,10 +107,30 @@ final public class Table { uncommittedTransactionsMap = new HashMap(); arbitratorTable = new HashMap(); newKeyTable = new HashMap(); - newCommitMap = new HashMap(); + newCommitMap = new HashMap>(); lastCommitSeenSeqNumMap = new HashMap(); + lastCommitSeenTransSeqNumMap = new HashMap(); lastAbortSeenSeqNumMap = new HashMap(); pendingTransSpeculativeTable = new HashMap(); + pendingCommitsList = new LinkedList(); + pendingCommitsToDelete = new LinkedList(); + localCommunicationChannels = new HashMap(); + transactionStatusMap = new HashMap(); + } + + public void initTable() throws ServerException { + cloud.setSalt();//Set the salt + Slot s = new Slot(this, 1, localmachineid); + TableStatus status = new TableStatus(s, numslots); + s.addEntry(status); + Slot[] array = cloud.putSlot(s, numslots); + if (array == null) { + array = new Slot[] {s}; + /* update data structure */ + validateandupdate(array, true); + } else { + throw new Error("Error on initialization"); + } } public void rebuild() throws ServerException { @@ -114,45 +138,112 @@ final public class Table { validateandupdate(newslots, true); } - // // TODO: delete method - // public void printSlots() { - // long o = buffer.getOldestSeqNum(); - // long n = buffer.getNewestSeqNum(); - - // int[] types = new int[10]; - - // int num = 0; - - // int livec = 0; - // int deadc = 0; - // for (long i = o; i < (n + 1); i++) { - // Slot s = buffer.getSlot(i); - - // Vector entries = s.getEntries(); - - // for (Entry e : entries) { - // if (e.isLive()) { - // int type = e.getType(); - // types[type] = types[type] + 1; - // num++; - // livec++; - // } else { - // deadc++; - // } - // } - // } - - // for (int i = 0; i < 10; i++) { - // System.out.println(i + " " + types[i]); - // } - // System.out.println("Live count: " + livec); - // System.out.println("Dead count: " + deadc); - // System.out.println("Old: " + o); - // System.out.println("New: " + n); - // System.out.println("Size: " + buffer.size()); - // System.out.println("Commits Map: " + commitedTable.size()); - // System.out.println("Commits List: " + commitMap.size()); - // } + // TODO: delete method + public void printSlots() { + long o = buffer.getOldestSeqNum(); + long n = buffer.getNewestSeqNum(); + + int[] types = new int[10]; + + int num = 0; + + int livec = 0; + int deadc = 0; + for (long i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); + + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; + } + } + } + + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + System.out.println("Commits Key Map: " + commitedTable.size()); + // System.out.println("Commits Live Map: " + commitMap.size()); + System.out.println("Pending: " + pendingTransQueue.size()); + + // List strList = new ArrayList(); + // for (int i = 0; i < 100; i++) { + // String keyA = "a" + i; + // String keyB = "b" + i; + // String keyC = "c" + i; + // String keyD = "d" + i; + + // IoTString iKeyA = new IoTString(keyA); + // IoTString iKeyB = new IoTString(keyB); + // IoTString iKeyC = new IoTString(keyC); + // IoTString iKeyD = new IoTString(keyD); + + // strList.add(iKeyA); + // strList.add(iKeyB); + // strList.add(iKeyC); + // strList.add(iKeyD); + // } + + + // for (Long l : commitMap.keySet()) { + // for (Long l2 : commitMap.get(l).keySet()) { + // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) { + // strList.remove(kv.getKey()); + // System.out.print(kv.getKey() + " "); + // } + // } + // } + + // System.out.println(); + // System.out.println(); + + // for (IoTString s : strList) { + // System.out.print(s + " "); + // } + // System.out.println(); + // System.out.println(strList.size()); + } + + public long getId() { + return localmachineid; + } + + public boolean hasConnection() { + return cloud.hasConnection(); + } + + public String toString() { + String retString = " Committed Table: \n"; + retString += "---------------------------\n"; + retString += commitedTable.toString(); + + retString += "\n\n"; + + retString += " Speculative Table: \n"; + retString += "---------------------------\n"; + retString += speculativeTable.toString(); + + return retString; + } + + public void addLocalComm(long machineId, LocalComm lc) { + localCommunicationChannels.put(machineId, lc); + } + public Long getArbitrator(IoTString key) { + return arbitratorTable.get(key); + } public IoTString getCommitted(IoTString key) { KeyValue kv = commitedTable.get(key); @@ -234,37 +325,24 @@ final public class Table { } } - public Long getArbitrator(IoTString key) { - return arbitratorTable.get(key); - } - - public void initTable() throws ServerException { - cloud.setSalt();//Set the salt - Slot s = new Slot(this, 1, localmachineid); - TableStatus status = new TableStatus(s, numslots); - s.addEntry(status); - Slot[] array = cloud.putSlot(s, numslots); - if (array == null) { - array = new Slot[] {s}; - /* update data structure */ - validateandupdate(array, true); - } else { - throw new Error("Error on initialization"); - } - } + public void update() { + try { + Slot[] newslots = cloud.getSlots(sequencenumber + 1); + validateandupdate(newslots, false); - public String toString() { - String retString = " Committed Table: \n"; - retString += "---------------------------\n"; - retString += commitedTable.toString(); + if (!pendingTransQueue.isEmpty()) { - retString += "\n\n"; + // We have a pending transaction so do full insertion + processPendingTrans(); + } else { - retString += " Speculative Table: \n"; - retString += "---------------------------\n"; - retString += speculativeTable.toString(); + // We dont have a pending transaction so do minimal effort + updateWithNotPendingTrans(); + } - return retString; + } catch (Exception e) { + // could not update so do nothing + } } public void startTransaction() { @@ -272,150 +350,372 @@ final public class Table { pendingTransBuild = new PendingTransaction(); } - public void commitTransaction() throws ServerException { + public void addKV(IoTString key, IoTString value) { + + if (arbitratorTable.get(key) == null) { + throw new Error("Key not Found."); + } + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } + + KeyValue kv = new KeyValue(key, value); + pendingTransBuild.addKV(kv); + } + + public TransactionStatus commitTransaction() { if (pendingTransBuild.getKVUpdates().size() == 0) { - // If no updates are made then there is no point inserting into the chain - return; + + // transaction with no updates will have no effect on the system + return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); } - // Add the pending transaction to the queue - pendingTransQueue.add(pendingTransBuild); + TransactionStatus transStatus = null; - for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) { - PendingTransaction pt = pendingTransQueue.get(i); + if (pendingTransBuild.getArbitrator() != localmachineid) { - if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) { + // set the local sequence number so we can recognize this transaction later + pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber); + localTransactionSequenceNumber++; - lastSeenPendingTransactionSpeculateIndex = i; + transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator()); + transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus); - for (KeyValue kv : pt.getKVUpdates()) { - pendingTransSpeculativeTable.put(kv.getKey(), kv); - } + // Add the pending transaction to the queue + pendingTransQueue.add(pendingTransBuild); - } - } + for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) { + PendingTransaction pt = pendingTransQueue.get(i); - // Delete since already inserted - pendingTransBuild = new PendingTransaction(); + if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) { - while (!pendingTransQueue.isEmpty()) { - if (tryput( pendingTransQueue.peek(), false)) { - pendingTransQueue.poll(); + lastSeenPendingTransactionSpeculateIndex = i; + + for (KeyValue kv : pt.getKVUpdates()) { + pendingTransSpeculativeTable.put(kv.getKey(), kv); + } + + } } - } - } + } else { + Transaction ut = new Transaction(null, + -1, + localmachineid, + pendingTransBuild.getArbitrator(), + pendingTransBuild.getKVUpdates(), + pendingTransBuild.getKVGuard()); - public void addKV(IoTString key, IoTString value) { + Pair> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid)); - if (arbitratorTable.get(key) == null) { - throw new Error("Key not Found."); + if (retData.getFirst()) { + transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator()); + } else { + transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator()); + } } - // Make sure new key value pair matches the current arbitrator - if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); + // Try to insert transactions if possible + if (!pendingTransQueue.isEmpty()) { + // We have a pending transaction so do full insertion + processPendingTrans(); + } else { + try { + // We dont have a pending transaction so do minimal effort + updateWithNotPendingTrans(); + } catch (Exception e) { + // Do nothing + } } - KeyValue kv = new KeyValue(key, value); - pendingTransBuild.addKV(kv); + // reset it so next time is fresh + pendingTransBuild = new PendingTransaction(); + + return transStatus; } - public void update() throws ServerException { - Slot[] newslots = cloud.getSlots(sequencenumber + 1); - validateandupdate(newslots, false); + public boolean createNewKey(IoTString keyName, long machineId) throws ServerException { - if (!pendingTransQueue.isEmpty()) { - System.out.println("Full Update"); + while (true) { + if (arbitratorTable.get(keyName) != null) { + // There is already an arbitrator + return false; + } - // We have a pending transaction so do full insertion + if (tryput(keyName, machineId, false)) { + // If successfully inserted + return true; + } + } + } + + private void processPendingTrans() { + boolean sentAllPending = false; + try { while (!pendingTransQueue.isEmpty()) { if (tryput( pendingTransQueue.peek(), false)) { pendingTransQueue.poll(); } } - } else { - // We dont have a pending transaction so do minimal effort - if (uncommittedTransactionsMap.keySet().size() > 0) { - - boolean doEnd = false; - boolean needResize = false; - while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) { - boolean resize = needResize; - needResize = false; - - Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); - int newsize = 0; - if (liveslotcount > resizethreshold) { - resize = true; //Resize is forced - } - if (resize) { - newsize = (int) (numslots * RESIZE_MULTIPLE); - TableStatus status = new TableStatus(s, newsize); - s.addEntry(status); - } + // if got here then all pending transactions were sent + sentAllPending = true; + } catch (Exception e) { + // There was a connection error + e.printStackTrace(); + sentAllPending = false; + } - doRejectedMessages(s); - ThreeTuple retTup = doMandatoryResuce(s, resize); + if (!sentAllPending) { - // Resize was needed so redo call - if (retTup.getFirst()) { - needResize = true; - continue; - } + for (Iterator i = pendingTransQueue.iterator(); i.hasNext(); ) { + PendingTransaction pt = i.next(); + LocalComm lc = localCommunicationChannels.get(pt.getArbitrator()); + if (lc == null) { + // Cant talk directly to arbitrator so cant do anything + continue; + } - // Extract working variables - boolean seenliveslot = retTup.getSecond(); - long seqn = retTup.getThird(); - // Did need to arbitrate - doEnd = !doArbitration(s); + Transaction ut = new Transaction(null, + -1, + localmachineid, + pendingTransBuild.getArbitrator(), + pendingTransBuild.getKVUpdates(), + pendingTransBuild.getKVGuard()); - doOptionalRescue(s, seenliveslot, seqn, resize); - int max = 0; - if (resize) { - max = newsize; - } + Pair> retData = sendTransactionToLocal(ut, lc); + + for (Commit commit : retData.getSecond()) { + // Prepare to process the commit + processEntry(commit); + } + + boolean didCommitOrSpeculate = proccessAllNewCommits(); + + // Go through all uncommitted transactions and kill the ones that are dead + deleteDeadUncommittedTransactions(); - Slot[] array = cloud.putSlot(s, max); - if (array == null) { - array = new Slot[] {s}; - rejectedmessagelist.clear(); - } else { - if (array.length == 0) - throw new Error("Server Error: Did not send any slots"); - rejectedmessagelist.add(s.getSequenceNumber()); - doEnd = false; + // Speculate on key value pairs + didCommitOrSpeculate |= createSpeculativeTable(); + createPendingTransactionSpeculativeTable(didCommitOrSpeculate); + + + if (retData.getFirst()) { + TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum()); + if (transStatus != null) { + transStatus.setStatus(TransactionStatus.StatusCommitted); } - /* update data structure */ - validateandupdate(array, true); + } else { + TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum()); + if (transStatus != null) { + transStatus.setStatus(TransactionStatus.StatusAborted); + } } } } } - public boolean createNewKey(IoTString keyName, long machineId) throws ServerException { + private void updateWithNotPendingTrans() throws ServerException { - while (true) { - if (arbitratorTable.get(keyName) != null) { - // There is already an arbitrator - return false; + boolean doEnd = false; + boolean needResize = false; + while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) { + boolean resize = needResize; + needResize = false; + + Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; + if (liveslotcount > resizethreshold) { + resize = true; //Resize is forced } - if (tryput(keyName, machineId, false)) { - // If successfully inserted - return true; + if (resize) { + newsize = (int) (numslots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); + } + + doRejectedMessages(s); + + ThreeTuple retTup = doMandatoryResuce(s, resize); + + // Resize was needed so redo call + if (retTup.getFirst()) { + needResize = true; + continue; } + + // Extract working variables + boolean seenliveslot = retTup.getSecond(); + long seqn = retTup.getThird(); + + // Did need to arbitrate + doEnd = !doArbitration(s); + + doOptionalRescue(s, seenliveslot, seqn, resize); + + int max = 0; + if (resize) { + max = newsize; + } + + Slot[] array = cloud.putSlot(s, max); + if (array == null) { + array = new Slot[] {s}; + rejectedmessagelist.clear(); + + // Delete pending commits that were sent to the cloud + deletePendingCommits(); + + } else { + if (array.length == 0) + throw new Error("Server Error: Did not send any slots"); + rejectedmessagelist.add(s.getSequenceNumber()); + doEnd = false; + } + + /* update data structure */ + validateandupdate(array, true); } } + private Pair> sendTransactionToLocal(Transaction ut, LocalComm lc) { + + // encode the request + byte[] array = new byte[Long.BYTES + ut.getSize()]; + ByteBuffer bbEncode = ByteBuffer.wrap(array); + Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator()); + if (lastSeenCommit != null) { + bbEncode.putLong(lastSeenCommit); + } else { + bbEncode.putLong(0); + } + ut.encode(bbEncode); + + byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array()); + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + boolean didCommit = bbDecode.get() == 1; + int numberOfCommites = bbDecode.getInt(); + + List newCommits = new LinkedList(); + for (int i = 0; i < numberOfCommites; i++ ) { + bbDecode.get(); + Commit com = (Commit)Commit.decode(null, bbDecode); + newCommits.add(com); + } + + return new Pair>(didCommit, newCommits); + } + + public byte[] localCommInput(byte[] data) { + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + long lastSeenCommit = bbDecode.getLong(); + bbDecode.get(); + Transaction ut = (Transaction)Transaction.decode(null, bbDecode); + + // Do the local update and arbitrate + Pair> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit); + + // Calculate the size of the response + int size = Byte.BYTES + Integer.BYTES; + for (Commit com : returnData.getSecond()) { + size += com.getSize(); + } + + // encode the response + byte[] array = new byte[size]; + ByteBuffer bbEncode = ByteBuffer.wrap(array); + if (returnData.getFirst()) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + bbEncode.putInt(returnData.getSecond().size()); + + for (Commit com : returnData.getSecond()) { + com.encode(bbEncode); + } + + return bbEncode.array(); + } + + private Pair> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) { + + if (ut.getArbitrator() != localmachineid) { + // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator + return null; + } + + List returnCommits = new ArrayList(); + + if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) { + // There is a commit that the other client has not seen yet + + Map cm = commitMap.get(localmachineid); + if (cm != null) { + + List commitKeys = new ArrayList(cm.keySet()); + Collections.sort(commitKeys); + + + for (int i = (commitKeys.size() - 1); i >= 0; i--) { + Commit com = cm.get(commitKeys.get(i)); + + if (com.getSequenceNumber() <= lastCommitSeen) { + break; + } + returnCommits.add((Commit)com.getCopy(null)); + } + } + } + + if (!ut.evaluateGuard(commitedTable, null)) { + // Guard evaluated as false so return only the commits that the other device has not seen yet + return new Pair>(false, returnCommits); + } + + // create the commit + Commit commit = new Commit(null, + -1, + commitSequenceNumber, + ut.getArbitrator(), + ut.getkeyValueUpdateSet()); + commitSequenceNumber = commitSequenceNumber + 1; + + // Add to the pending commits list + pendingCommitsList.add(commit); + + // Add this commit so we can send it back + returnCommits.add(commit); + + // Prepare to process the commit + processEntry(commit); + + boolean didCommitOrSpeculate = proccessAllNewCommits(); + + // Go through all uncommitted transactions and kill the ones that are dead + deleteDeadUncommittedTransactions(); + + // Speculate on key value pairs + didCommitOrSpeculate |= createSpeculativeTable(); + createPendingTransactionSpeculativeTable(didCommitOrSpeculate); + + return new Pair>(true, returnCommits); + } + public void decrementLiveCount() { liveslotcount--; } @@ -467,7 +767,23 @@ final public class Table { } doOptionalRescue(s, seenliveslot, seqn, resize); - return doSendSlotsAndInsert(s, insertedTrans, resize, newsize); + Pair sendRetData = doSendSlots(s, insertedTrans, resize, newsize); + + if (sendRetData.getFirst()) { + // update the status and change what the sequence number is for the + TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum()); + transStatus.setStatus(TransactionStatus.StatusSent); + transStatus.setSentTransaction(); + transactionStatusMap.put(trans.getSequenceNumber(), transStatus); + } + + + if (sendRetData.getSecond().length != 0) { + // insert into the local block chain + validateandupdate(sendRetData.getSecond(), true); + } + + return sendRetData.getFirst(); } private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException { @@ -506,7 +822,14 @@ final public class Table { } doOptionalRescue(s, seenliveslot, seqn, resize); - return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize); + Pair sendRetData = doSendSlots(s, insertedNewKey, resize, newsize); + + if (sendRetData.getSecond().length != 0) { + // insert into the local block chain + validateandupdate(sendRetData.getSecond(), true); + } + + return sendRetData.getFirst(); } private void doRejectedMessages(Slot s) { @@ -588,6 +911,24 @@ final public class Table { } private boolean doArbitration(Slot s) { + + // flag whether we have finished all arbitration + boolean stillHasArbitration = false; + + pendingCommitsToDelete.clear(); + + // First add queue commits + for (Commit commit : pendingCommitsList) { + if (s.hasSpace(commit)) { + s.addEntry(commit); + pendingCommitsToDelete.add(commit); + } else { + // Ran out of space so move on but still not done + stillHasArbitration = true; + return stillHasArbitration; + } + } + // Arbitrate Map speculativeTableTmp = new HashMap(); List transSeqNums = new ArrayList(uncommittedTransactionsMap.keySet()); @@ -595,7 +936,6 @@ final public class Table { // Sort from oldest to newest Collections.sort(transSeqNums); - boolean didNeedArbitration = false; for (Long transNum : transSeqNums) { Transaction ut = uncommittedTransactionsMap.get(transNum); @@ -605,7 +945,7 @@ final public class Table { } // we did have something to arbitrate on - didNeedArbitration = true; + stillHasArbitration = true; Entry newEntry = null; @@ -618,12 +958,20 @@ final public class Table { } // create the commit - newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet()); + newEntry = new Commit(s, + ut.getSequenceNumber(), + commitSequenceNumber, + ut.getArbitrator(), + ut.getkeyValueUpdateSet()); + commitSequenceNumber = commitSequenceNumber + 1; } else { // Guard was false // create the abort - newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator()); + newEntry = new Abort(s, + ut.getSequenceNumber(), + ut.getMachineID(), + ut.getArbitrator()); } if ((newEntry != null) && s.hasSpace(newEntry)) { @@ -633,7 +981,14 @@ final public class Table { } } - return didNeedArbitration; + return stillHasArbitration; + } + + private void deletePendingCommits() { + for (Commit com : pendingCommitsToDelete) { + pendingCommitsList.remove(com); + } + pendingCommitsToDelete.clear(); } private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) { @@ -665,7 +1020,7 @@ final public class Table { } } - private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException { + private Pair doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException { int max = 0; if (resize) max = newsize; @@ -674,6 +1029,9 @@ final public class Table { if (array == null) { array = new Slot[] {s}; rejectedmessagelist.clear(); + + // Delete pending commits that were sent to the cloud + deletePendingCommits(); } else { // if (array.length == 0) // throw new Error("Server Error: Did not send any slots"); @@ -681,11 +1039,7 @@ final public class Table { inserted = false; } - if (array.length != 0) { - validateandupdate(array, true); - } - - return inserted; + return new Pair(inserted, array); } private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) { @@ -745,79 +1099,112 @@ final public class Table { createPendingTransactionSpeculativeTable(didCommitOrSpeculate); } - public boolean proccessAllNewCommits() { - + private boolean proccessAllNewCommits() { // Process only if there are commit if (newCommitMap.keySet().size() == 0) { return false; } + boolean didProcessNewCommit = false; - List commitSeqNums = new ArrayList(newCommitMap.keySet()); - - // Sort from oldest to newest commit - Collections.sort(commitSeqNums); + for (Long arb : newCommitMap.keySet()) { - boolean didProcessNewCommit = false; + List commitSeqNums = new ArrayList(newCommitMap.get(arb).keySet()); - // Go through each new commit one by one - for (Long entrySeqNum : commitSeqNums) { - Commit entry = newCommitMap.get(entrySeqNum); + // Sort from oldest to newest commit + Collections.sort(commitSeqNums); - long lastCommitSeenSeqNum = -1; + // Go through each new commit one by one + for (Long entrySeqNum : commitSeqNums) { + Commit entry = newCommitMap.get(arb).get(entrySeqNum); - if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) { - lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()); - } + long lastCommitSeenSeqNum = -1; + if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) { + lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()); + } - if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) { + if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) { + Map cm = commitMap.get(arb); + if (cm == null) { + cm = new HashMap(); + } - Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry); + Commit prevCommit = cm.put(entry.getSequenceNumber(), entry); + commitMap.put(arb, cm); - if (prevCommit != null) { - prevCommit.setDead(); + if (prevCommit != null) { + prevCommit.setDead(); - for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) { - committedMapByKey.put(kv.getKey(), entry); + for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) { + committedMapByKey.put(kv.getKey(), entry); + } } + + continue; } - continue; - } + Set commitsToEditSet = new HashSet(); - Set commitsToEditSet = new HashSet(); + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + commitsToEditSet.add(committedMapByKey.get(kv.getKey())); + } - for (KeyValue kv : entry.getkeyValueUpdateSet()) { - commitsToEditSet.add(committedMapByKey.get(kv.getKey())); - } + commitsToEditSet.remove(null); - commitsToEditSet.remove(null); + for (Commit prevCommit : commitsToEditSet) { - for (Commit prevCommit : commitsToEditSet) { + Set deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet()); - Set deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet()); + if (!prevCommit.isLive()) { + Map cm = commitMap.get(arb); - if (!prevCommit.isLive()) { - commitMap.remove(prevCommit.getTransSequenceNumber()); + // remove it from the map so that it can be set as dead + if (cm != null) { + cm.remove(prevCommit.getSequenceNumber()); + commitMap.put(arb, cm); + } + } } - } - // Add the new commit - commitMap.put(entry.getTransSequenceNumber(), entry); - lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); - didProcessNewCommit = true; + // Add the new commit + Map cm = commitMap.get(arb); + if (cm == null) { + cm = new HashMap(); + } + cm.put(entry.getSequenceNumber(), entry); + commitMap.put(arb, cm); + + lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber()); + + // set the trans sequence number if we are able to + if (entry.getTransSequenceNumber() != -1) { + lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + } - // Update the committed table list - for (KeyValue kv : entry.getkeyValueUpdateSet()) { - IoTString key = kv.getKey(); - commitedTable.put(key, kv); + didProcessNewCommit = true; - committedMapByKey.put(key, entry); + // Update the committed table list + for (KeyValue kv : entry.getkeyValueUpdateSet()) { + IoTString key = kv.getKey(); + commitedTable.put(key, kv); + committedMapByKey.put(key, entry); + } } } - // Clear the new commits storage so we can use it later newCommitMap.clear(); + // go through all saved transactions and update the status of those that can be updated + for (Iterator> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) { + Map.Entry entry = i.next(); + long seqnum = entry.getKey(); + TransactionStatus status = entry.getValue(); + + if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) { + status.setStatus(TransactionStatus.StatusCommitted); + i.remove(); + } + } + return didProcessNewCommit; } @@ -827,8 +1214,11 @@ final public class Table { Transaction prevtrans = i.next().getValue(); long transArb = prevtrans.getArbitrator(); - if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) || - (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) { + Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb); + Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb); + + if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) || + ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) { i.remove(); prevtrans.setDead(); } @@ -942,7 +1332,6 @@ final public class Table { private void updateExpectedSize() { expectedsize++; if (expectedsize > currmaxsize) { - System.out.println("Maxing Out: " + expectedsize + " " + currmaxsize); expectedsize = currmaxsize; } } @@ -1019,7 +1408,7 @@ final public class Table { private void processEntry(Transaction entry) { long arb = entry.getArbitrator(); - Long comLast = lastCommitSeenSeqNumMap.get(arb); + Long comLast = lastCommitSeenTransSeqNumMap.get(arb); Long abLast = lastAbortSeenSeqNumMap.get(arb); Transaction prevTrans = null; @@ -1039,7 +1428,6 @@ final public class Table { } private void processEntry(Abort entry) { - if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) { // Abort has not been seen yet so we need to keep track of it @@ -1047,18 +1435,32 @@ final public class Table { if (prevAbort != null) { prevAbort.setDead(); // delete old version of the duplicate } + + if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) { + lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + } } else { // The machine already saw this so it is dead entry.setDead(); } - if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) { - lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber()); + // Update the status of the transaction and remove it since we are done with this transaction + TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber()); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); } } - private void processEntry(Commit entry, Slot s) { - Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry); + private void processEntry(Commit entry) { + Map arbMap = newCommitMap.get(entry.getTransArbitrator()); + + if (arbMap == null) { + arbMap = new HashMap(); + } + + Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry); + newCommitMap.put(entry.getTransArbitrator(), arbMap); + if (prevCommit != null) { prevCommit.setDead(); } @@ -1078,8 +1480,6 @@ final public class Table { if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) { largestTableStatusSeen = newnumslots; } - - // System.out.println("Table Stat: " + newnumslots + " large: " + largestTableStatusSeen + " small: " + smallestTableStatusSeen); } private void addWatchList(long machineid, RejectedMessage entry) { @@ -1161,7 +1561,7 @@ final public class Table { break; case Entry.TypeCommit: - processEntry((Commit)entry, slot); + processEntry((Commit)entry); break; case Entry.TypeAbort: