X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2Fjava%2Fiotcloud%2FTable.java;h=25b677565f89d8db56c69e4ed30c4d52e1ba6e9c;hp=976422381d11087bf60e6ade37f9b9d03eb0fd6a;hb=9c3fa5cbce287df14626d262bd0179e994338869;hpb=8f2cd2d576d466dd791db72a6c54348d69af8541 diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 9764223..25b6775 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -11,6 +11,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.ArrayList; import java.util.Collections; +import java.nio.ByteBuffer; /** * IoTTable data structure. Provides client interface. @@ -20,15 +21,13 @@ import java.util.Collections; final public class Table { - /* Constants */ - static final int FREE_SLOTS = 10; // Number of slots that should be kept free + static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10 static final int SKIP_THRESHOLD = 10; static final double RESIZE_MULTIPLE = 1.2; static final double RESIZE_THRESHOLD = 0.75; static final int REJECTED_THRESHOLD = 5; - /* Helper Objects */ private SlotBuffer buffer = null; private CloudComm cloud = null; @@ -45,18 +44,32 @@ final public class Table { private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry private long localMachineId = 0; // Machine ID of this client device private long sequenceNumber = 0; // Largest sequence number a client has received - private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server - private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server + private long localSequenceNumber = 0; + + // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server + // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on - private long localCommitSequenceNumber = 0; + private long localArbitrationSequenceNumber = 0; + private boolean hadPartialSendToServer = false; + private boolean attemptedToSendToServer = false; + private long expectedsize; + private boolean didFindTableStatus = false; + private long currMaxSize = 0; + + private Slot lastSlotAttemptedToSend = null; + private boolean lastIsNewKey = false; + private int lastNewSize = 0; + private Map> lastTransactionPartsSent = null; + private List lastPendingSendArbitrationEntriesToDelete = null; + private NewKey lastNewKey = null; + /* Data Structures */ private Map committedKeyValueTable = null; // Table of committed key value pairs private Map speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value private Map pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions - private Map liveNewKeyTable = null; // Table of live new keys private HashMap> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); private HashMap> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet @@ -71,20 +84,21 @@ final public class Table { private Map liveCommitsByKeyTable = null; private Map lastCommitSeenSequenceNumberByArbitratorTable = null; private Vector rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server - private List pendingTransactionQueue = null; - private List pendingSendArbitrationEntries = null; + private List pendingSendArbitrationRounds = null; private List pendingSendArbitrationEntriesToDelete = null; private Map> transactionPartsSent = null; private Map outstandingTransactionStatus = null; + private Map liveAbortsGeneratedByLocal = null; + private Set> offlineTransactionsCommittedAndAtServer = null; + private Map> localCommunicationTable = null; + private Map lastTransactionSeenFromMachineFromServer = null; + private Map lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null; - - - - public Table(String baseurl, String password, long _localMachineId) { + public Table(String baseurl, String password, long _localMachineId, int listeningPort) { localMachineId = _localMachineId; - cloud = new CloudComm(this, baseurl, password); + cloud = new CloudComm(this, baseurl, password, listeningPort); init(); } @@ -127,25 +141,111 @@ final public class Table { lastCommitSeenSequenceNumberByArbitratorTable = new HashMap(); rejectedSlotList = new Vector(); pendingTransactionQueue = new ArrayList(); - pendingSendArbitrationEntries = new ArrayList(); pendingSendArbitrationEntriesToDelete = new ArrayList(); transactionPartsSent = new HashMap>(); outstandingTransactionStatus = new HashMap(); + liveAbortsGeneratedByLocal = new HashMap(); + offlineTransactionsCommittedAndAtServer = new HashSet>(); + localCommunicationTable = new HashMap>(); + lastTransactionSeenFromMachineFromServer = new HashMap(); + pendingSendArbitrationRounds = new ArrayList(); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap(); + // Other init stuff numberOfSlots = buffer.capacity(); setResizeThreshold(); } + // TODO: delete method + public synchronized void printSlots() { + long o = buffer.getOldestSeqNum(); + long n = buffer.getNewestSeqNum(); + + int[] types = new int[10]; + + int num = 0; + + int livec = 0; + int deadc = 0; + + int casdasd = 0; + + int liveslo = 0; + + for (long i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); + + + if (s.isLive()) { + liveslo++; + } + + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); + + + if (type == 6) { + RejectedMessage rej = (RejectedMessage)e; + casdasd++; + + System.out.println(rej.getMachineID()); + } + + + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; + } + } + } + + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Live Slot count: " + liveslo); + + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + // System.out.println("Commits: " + liveCommitsTable.size()); + System.out.println("pendingTrans: " + pendingTransactionQueue.size()); + System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); + + for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { + System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); + } + + + for (Long a : liveCommitsTable.keySet()) { + for (Long b : liveCommitsTable.get(a).keySet()) { + for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) { + System.out.print(kv + " "); + } + System.out.print("|| "); + } + System.out.println(); + } + + } + /** * Initialize the table by inserting a table status as the first entry into the table status * also initialize the crypto stuff. */ public synchronized void initTable() throws ServerException { - cloud.setSalt(); //Set the salt + cloud.initSecurity(); // Create the first insertion into the block chain which is the table status - Slot s = new Slot(this, 1, localMachineId); + Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); + localSequenceNumber++; TableStatus status = new TableStatus(s, numberOfSlots); s.addEntry(status); Slot[] array = cloud.putSlot(s, numberOfSlots); @@ -154,6 +254,9 @@ final public class Table { array = new Slot[] {s}; // update local block chain validateAndUpdate(array, true); + } else if (array.length == 1) { + // in case we did push the slot BUT we failed to init it + validateAndUpdate(array, true); } else { throw new Error("Error on initialization"); } @@ -166,6 +269,9 @@ final public class Table { // Just pull the latest slots from the server Slot[] newslots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + sendToServer(null); + updateLiveTransactionsAndStatus(); + } // public String toString() { @@ -182,10 +288,18 @@ final public class Table { // return retString; // } + public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) { + localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); + } + public synchronized Long getArbitrator(IoTString key) { return arbitratorTable.get(key); } + public synchronized void close() { + cloud.close(); + } + public synchronized IoTString getCommitted(IoTString key) { KeyValue kv = committedKeyValueTable.get(key); @@ -266,17 +380,28 @@ final public class Table { } } - public synchronized void update() { + public synchronized boolean update() { try { Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(null); + + + updateLiveTransactionsAndStatus(); + + return true; } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); + + for (Long m : localCommunicationTable.keySet()) { + updateFromLocal(m); + } } + + return false; } - public synchronized boolean createNewKey(IoTString keyName, long machineId) { + public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException { while (true) { if (arbitratorTable.get(keyName) != null) { // There is already an arbitrator @@ -284,6 +409,7 @@ final public class Table { } NewKey newKey = new NewKey(null, keyName, machineId); + if (sendToServer(newKey)) { // If successfully inserted return true; @@ -291,7 +417,7 @@ final public class Table { } } - public void startTransaction() { + public synchronized void startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransactionBuilder = new PendingTransaction(localMachineId); } @@ -342,7 +468,37 @@ final public class Table { pendingTransactionBuilder = new PendingTransaction(localMachineId); - sendToServer(null); + try { + sendToServer(null); + } catch (ServerException e) { + + Set arbitratorTriedAndFailed = new HashSet(); + for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { + Transaction transaction = iter.next(); + + if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { + // Already contacted this client so ignore all attempts to contact this client + // to preserve ordering for arbitrator + continue; + } + + Pair sendReturn = sendTransactionToLocal(transaction); + + if (sendReturn.getFirst()) { + // Failed to contact over local + arbitratorTriedAndFailed.add(transaction.getArbitrator()); + } else { + // Successful contact or should not contact + + if (sendReturn.getSecond()) { + // did arbitrate + iter.remove(); + } + } + } + } + + updateLiveStateFromLocal(); return transactionStatus; } @@ -369,20 +525,224 @@ final public class Table { bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); } - private boolean sendToServer(NewKey newKey) { + public long getLocalSequenceNumber() { + return localSequenceNumber; + } + + + boolean lastInsertedNewKey = false; + + private boolean sendToServer(NewKey newKey) throws ServerException { + + boolean fromRetry = false; + + try { + if (hadPartialSendToServer) { + Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + if (newSlots.length == 0) { + fromRetry = true; + ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + + if (sendSlotsReturn.getFirst()) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } + } + } else { + + newSlots = sendSlotsReturn.getThird(); + + boolean isInserted = false; + for (Slot s : newSlots) { + if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : newSlots) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (isInserted) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } else { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + + if (sendSlotsReturn.getThird().length != 0) { + // insert into the local block chain + validateAndUpdate(sendSlotsReturn.getThird(), true); + } + // continue; + } else { + boolean isInserted = false; + for (Slot s : newSlots) { + if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : newSlots) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (isInserted) { + if (newKey != null) { + if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { + newKey = null; + } + } + + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // Update the transaction status + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction.didSendAllParts()) { + transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + pendingTransactionQueue.remove(transaction); + } else { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + } else { + for (Transaction transaction : lastTransactionPartsSent.keySet()) { + transaction.resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction.didSendAPartToServer()) { + transaction.setSequenceNumber(-1); + } + } + } + + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } + } catch (ServerException e) { + throw e; + } + + try { // While we have stuff that needs inserting into the block chain - while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) { + while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) { - // try { - // Thread.sleep(300); - // } catch (Exception e) { + fromRetry = false; - // } + if (hadPartialSendToServer) { + throw new Error("Should Be error free"); + } + + + + // If there is a new key with same name then end + if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) { + return false; + } // Create the slot - Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC()); + Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); + localSequenceNumber++; // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); @@ -396,7 +756,7 @@ final public class Table { transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } @@ -409,19 +769,41 @@ final public class Table { fillSlot(slot, true, newKey); } - // Try to send to the server - Pair sendSlotsReturn = sendSlotsToServer(slot, newSize); + lastSlotAttemptedToSend = slot; + lastIsNewKey = (newKey != null); + lastInsertedNewKey = insertedNewKey; + lastNewSize = newSize; + lastNewKey = newKey; + lastTransactionPartsSent = new HashMap>(transactionPartsSent); + lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); + + + ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null); if (sendSlotsReturn.getFirst()) { + // Did insert into the block chain - // New Key was successfully inserted into the block chain so dont want to insert it again - newKey = null; + if (insertedNewKey) { + // This slot was what was inserted not a previous slot + + // New Key was successfully inserted into the block chain so dont want to insert it again + newKey = null; + } // Remove the aborts and commit parts that were sent from the pending to send queue - pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete); + for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { + ArbitrationRound round = iter.next(); + round.removeParts(pendingSendArbitrationEntriesToDelete); + + if (round.isDoneSending()) { + // Sent all the parts + iter.remove(); + } + } for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetServerFailure(); // Update which transactions parts still need to be sent transaction.removeSentParts(transactionPartsSent.get(transaction)); @@ -439,12 +821,43 @@ final public class Table { } } } else { + + // if (!sendSlotsReturn.getSecond()) { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + // } + // } else { + // for (Transaction transaction : lastTransactionPartsSent.keySet()) { + // transaction.resetServerFailure(); + + // // Update which transactions parts still need to be sent + // transaction.removeSentParts(transactionPartsSent.get(transaction)); + + // // Add the transaction status to the outstanding list + // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); + + // // Update the transaction status + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); + + // // Check if all the transaction parts were successfully sent and if so then remove it from pending + // if (transaction.didSendAllParts()) { + // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); + // pendingTransactionQueue.remove(transaction); + + // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); + // } + // } + // } + // } + // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); + // transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } @@ -454,60 +867,369 @@ final public class Table { pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); - if (sendSlotsReturn.getSecond().length != 0) { + if (sendSlotsReturn.getThird().length != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getSecond(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } } + } catch (ServerException e) { if (e.getType() != ServerException.TypeInputTimeout) { - e.printStackTrace(); + // e.printStackTrace(); // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); + // Set the transaction sequence number back to nothing - if (!transaction.didSendAPartToServer()) { + if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } - - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); } else { // There was a partial send to the server + hadPartialSendToServer = true; + + + // if (!fromRetry) { + // lastTransactionPartsSent = new HashMap>(transactionPartsSent); + // lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); + // } + + // Nothing was able to be sent to the server so just clear these data structures + for (Transaction transaction : transactionPartsSent.keySet()) { + transaction.resetNextPartToSend(); + transaction.setServerFailure(); + } } + + pendingSendArbitrationEntriesToDelete.clear(); + transactionPartsSent.clear(); + + throw e; } return newKey == null; } - private Pair sendSlotsToServer(Slot slot, int newSize) throws ServerException { + private synchronized boolean updateFromLocal(long machineId) { + Pair localCommunicationInformation = localCommunicationTable.get(machineId); + if (localCommunicationInformation == null) { + // Cant talk to that device locally so do nothing + return false; + } + + // Get the size of the send data + int sendDataSize = Integer.BYTES + Long.BYTES; + + Long lastArbitrationDataLocalSequenceNumber = (long) - 1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); + } + + byte[] sendData = new byte[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(0); + + // Send by local + byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; + + if (returnData == null) { + // Could not contact server + return false; + } + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + int numberOfEntries = bbDecode.getInt(); + + for (int i = 0; i < numberOfEntries; i++) { + byte type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(null, bbDecode); + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + return true; + } + + private Pair sendTransactionToLocal(Transaction transaction) { + + // Get the devices local communications + Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); + + if (localCommunicationInformation == null) { + // Cant talk to that device locally so do nothing + return new Pair(true, false); + } + + // Get the size of the send data + int sendDataSize = Integer.BYTES + Long.BYTES; + for (TransactionPart part : transaction.getParts().values()) { + sendDataSize += part.getSize(); + } + + Long lastArbitrationDataLocalSequenceNumber = (long) - 1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); + } + + // Make the send data size + byte[] sendData = new byte[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(transaction.getParts().size()); + for (TransactionPart part : transaction.getParts().values()) { + part.encode(bbEncode); + } + + + // Send by local + byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + localSequenceNumber++; + + if (returnData == null) { + // Could not contact server + return new Pair(true, false); + } + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + boolean didCommit = bbDecode.get() == 1; + boolean couldArbitrate = bbDecode.get() == 1; + int numberOfEntries = bbDecode.getInt(); + boolean foundAbort = false; + + for (int i = 0; i < numberOfEntries; i++) { + byte type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(null, bbDecode); + + if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { + foundAbort = true; + } + + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + if (couldArbitrate) { + TransactionStatus status = transaction.getTransactionStatus(); + if (didCommit) { + status.setStatus(TransactionStatus.StatusCommitted); + } else { + status.setStatus(TransactionStatus.StatusAborted); + } + } else { + TransactionStatus status = transaction.getTransactionStatus(); + if (foundAbort) { + status.setStatus(TransactionStatus.StatusAborted); + } else { + status.setStatus(TransactionStatus.StatusCommitted); + } + } + + return new Pair(false, true); + } + + public synchronized byte[] acceptDataFromLocal(byte[] data) { + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + long lastArbitratedSequenceNumberSeen = bbDecode.getLong(); + int numberOfParts = bbDecode.getInt(); + + // If we did commit a transaction or not + boolean didCommit = false; + boolean couldArbitrate = false; + + if (numberOfParts != 0) { + + // decode the transaction + Transaction transaction = new Transaction(); + for (int i = 0; i < numberOfParts; i++) { + bbDecode.get(); + TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode); + transaction.addPartDecode(newPart); + } + + // Arbitrate on transaction and pull relevant return data + Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); + + updateLiveStateFromLocal(); + + // Transaction was sent to the server so keep track of it to prevent double commit + if (transaction.getSequenceNumber() != -1) { + offlineTransactionsCommittedAndAtServer.add(transaction.getId()); + } + } + + // The data to send back + int returnDataSize = 0; + List unseenArbitrations = new ArrayList(); + + // Get the aborts to send back + List abortLocalSequenceNumbers = new ArrayList(liveAbortsGeneratedByLocal.keySet()); + Collections.sort(abortLocalSequenceNumbers); + for (Long localSequenceNumber : abortLocalSequenceNumbers) { + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); + unseenArbitrations.add(abort); + returnDataSize += abort.getSize(); + } + + // Get the commits to send back + Map commitForClientTable = liveCommitsTable.get(localMachineId); + if (commitForClientTable != null) { + List commitLocalSequenceNumbers = new ArrayList(commitForClientTable.keySet()); + Collections.sort(commitLocalSequenceNumbers); + + for (Long localSequenceNumber : commitLocalSequenceNumbers) { + Commit commit = commitForClientTable.get(localSequenceNumber); + + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + unseenArbitrations.addAll(commit.getParts().values()); + + for (CommitPart commitPart : commit.getParts().values()) { + returnDataSize += commitPart.getSize(); + } + } + } + + // Number of arbitration entries to decode + returnDataSize += 2 * Integer.BYTES; + + // Boolean of did commit or not + if (numberOfParts != 0) { + returnDataSize += Byte.BYTES; + } + + // Data to send Back + byte[] returnData = new byte[returnDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(returnData); + + if (numberOfParts != 0) { + if (didCommit) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + if (couldArbitrate) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + } + + bbEncode.putInt(unseenArbitrations.size()); + for (Entry entry : unseenArbitrations) { + entry.encode(bbEncode); + } + + + localSequenceNumber++; + return returnData; + } + + private ThreeTuple sendSlotsToServer(Slot slot, int newSize, boolean isNewKey) throws ServerException { - boolean inserted = true; + boolean attemptedToSendToServerTmp = attemptedToSendToServer; + attemptedToSendToServer = true; + + boolean inserted = false; + boolean lastTryInserted = false; Slot[] array = cloud.putSlot(slot, newSize); if (array == null) { array = new Slot[] {slot}; rejectedSlotList.clear(); + inserted = true; } else { if (array.length == 0) { throw new Error("Server Error: Did not send any slots"); } - rejectedSlotList.add(slot.getSequenceNumber()); - inserted = false; + + // if (attemptedToSendToServerTmp) { + if (hadPartialSendToServer) { + + boolean isInserted = false; + for (Slot s : array) { + if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : array) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (!isInserted) { + rejectedSlotList.add(slot.getSequenceNumber()); + lastTryInserted = false; + } else { + lastTryInserted = true; + } + } else { + rejectedSlotList.add(slot.getSequenceNumber()); + lastTryInserted = false; + } } - return new Pair(inserted, array); + return new ThreeTuple(inserted, lastTryInserted, array); } /** * Returns false if a resize was needed */ private ThreeTuple fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) { + + int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { resize = true; //Resize is forced + } if (resize) { @@ -536,6 +1258,7 @@ final public class Table { if (newKeyEntry != null) { newKeyEntry.setSlot(slot); if (slot.hasSpace(newKeyEntry)) { + slot.addEntry(newKeyEntry); inserted = true; } @@ -545,33 +1268,48 @@ final public class Table { transactionPartsSent.clear(); pendingSendArbitrationEntriesToDelete.clear(); - // Insert pending arbitration data - for (Entry arbitrationData : pendingSendArbitrationEntries) { + for (ArbitrationRound round : pendingSendArbitrationRounds) { + boolean isFull = false; + round.generateParts(); + List parts = round.getParts(); + + // Insert pending arbitration data + for (Entry arbitrationData : parts) { + + // If it is an abort then we need to set some information + if (arbitrationData instanceof Abort) { + ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); + } + + if (!slot.hasSpace(arbitrationData)) { + // No space so cant do anything else with these data entries + isFull = true; + break; + } - // If it is an abort then we need to set some information - if (arbitrationData instanceof Abort) { - ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); + // Add to this current slot and add it to entries to delete + slot.addEntry(arbitrationData); + pendingSendArbitrationEntriesToDelete.add(arbitrationData); } - if (!slot.hasSpace(arbitrationData)) { - // No space so cant do anything else with these data entries + if (isFull) { break; } - - // Add to this current slot and add it to entries to delete - slot.addEntry(arbitrationData); - pendingSendArbitrationEntriesToDelete.add(arbitrationData); } - // Insert as many transactions as possible while keeping order - for (Transaction transaction : pendingTransactionQueue) { + if (pendingTransactionQueue.size() > 0) { + + Transaction transaction = pendingTransactionQueue.get(0); // Set the transaction sequence number if it has yet to be inserted into the block chain - if (!transaction.didSendAPartToServer()) { + // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { + // transaction.setSequenceNumber(slot.getSequenceNumber()); + // } + + if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { transaction.setSequenceNumber(slot.getSequenceNumber()); } - boolean ranOutOfSpace = false; while (true) { TransactionPart part = transaction.getNextPartToSend(); @@ -583,25 +1321,17 @@ final public class Table { if (slot.hasSpace(part)) { slot.addEntry(part); - List partsSent = transactionPartsSent.get(transaction); if (partsSent == null) { partsSent = new ArrayList(); transactionPartsSent.put(transaction, partsSent); } - partsSent.add(part.getPartNumber()); transactionPartsSent.put(transaction, partsSent); - } else { - ranOutOfSpace = true; break; } } - - if (ranOutOfSpace) { - break; - } } // Fill the remainder of the slot with rescue data @@ -619,7 +1349,7 @@ final public class Table { long old_seqn = rejectedSlotList.firstElement(); if (rejectedSlotList.size() > REJECTED_THRESHOLD) { long new_seqn = rejectedSlotList.lastElement(); - RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s.addEntry(rm); } else { long prev_seqn = -1; @@ -634,7 +1364,7 @@ final public class Table { } /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); s.addEntry(rm); } /* Generate rejected message entries for present messages */ @@ -642,7 +1372,7 @@ final public class Table { long curr_seqn = rejectedSlotList.get(i); Slot s_msg = buffer.getSlot(curr_seqn); long machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); s.addEntry(rm); } } @@ -738,11 +1468,6 @@ final public class Table { return; } - // Reset the table status declared sizes - smallestTableStatusSeen = -1; - largestTableStatusSeen = -1; - - // Make sure all slots are newer than the last largest slot this client has seen long firstSeqNum = newSlots[0].getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { @@ -762,6 +1487,8 @@ final public class Table { // Process each slots data for (Slot slot : newSlots) { processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + + updateExpectedSize(); } // If there is a gap, check to see if the server sent us everything. @@ -795,6 +1522,12 @@ final public class Table { sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); updateLiveStateFromServer(); + + // No Need to remember after we pulled from the server + offlineTransactionsCommittedAndAtServer.clear(); + + // This is invalidated now + hadPartialSendToServer = false; } private void updateLiveStateFromServer() { @@ -827,6 +1560,37 @@ final public class Table { updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); } + private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) { + // if (didFindTableStatus) { + // return; + // } + long prevslots = firstSequenceNumber; + + + if (didFindTableStatus) { + // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize; + // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); + + } else { + expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots; + // System.out.println("Here: " + expectedsize); + } + + // System.out.println(numberOfSlots); + + didFindTableStatus = true; + currMaxSize = numberOfSlots; + } + + private void updateExpectedSize() { + expectedsize++; + + if (expectedsize > currMaxSize) { + expectedsize = currMaxSize; + } + } + + /** * Check the size of the block chain to make sure there are enough slots sent back by the server. * This is only called when we have a gap between the slots that we have locally and the slots @@ -834,41 +1598,30 @@ final public class Table { * status message */ private void checkNumSlots(int numberOfSlots) { - - // We only have 1 size so we must have this many slots - if (largestTableStatusSeen == smallestTableStatusSeen) { - if (numberOfSlots != smallestTableStatusSeen) { - throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots); - } - } else { - // We have more than 1 - if (numberOfSlots < smallestTableStatusSeen) { - throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots); - } + if (numberOfSlots != expectedsize) { + throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); } } + private void updateCurrMaxSize(int newmaxsize) { + currMaxSize = newmaxsize; + } + + /** * Update the size of of the local buffer if it is needed. */ private void commitNewMaxSize() { - - int currMaxSize = 0; - - if (largestTableStatusSeen == -1) { - // No table status seen so the current max size does not change - currMaxSize = numberOfSlots; - } else { - currMaxSize = largestTableStatusSeen; - } + didFindTableStatus = false; // Resize the local slot buffer if (numberOfSlots != currMaxSize) { - buffer.resize(currMaxSize); + buffer.resize((int)currMaxSize); } // Change the number of local slots to the new size - numberOfSlots = currMaxSize; + numberOfSlots = (int)currMaxSize; + // Recalculate the resize threshold since the size of the local buffer has changed setResizeThreshold(); @@ -920,7 +1673,10 @@ final public class Table { newTransactionParts.clear(); } - public void arbitrateFromServer() { + + private long lastSeqNumArbOn = 0; + + private void arbitrateFromServer() { if (liveTransactionBySequenceNumberTable.size() == 0) { // Nothing to arbitrate on so move on @@ -936,15 +1692,27 @@ final public class Table { // The last transaction arbitrated on long lastTransactionCommitted = -1; + Set generatedAborts = new HashSet(); for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); + + // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { continue; } + if (transactionSequenceNumber < lastSeqNumArbOn) { + continue; + } + + if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { + // We have seen this already locally so dont commit again + continue; + } + if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break @@ -953,6 +1721,16 @@ final public class Table { } + // update the largest transaction seen by arbitrator from server + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) { + lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + } else { + Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); + if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { + lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + } + } + if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) { // Guard evaluated as true @@ -962,8 +1740,7 @@ final public class Table { } // Update what the last transaction committed was for use in batch commit - lastTransactionCommitted = transaction.getSequenceNumber(); - + lastTransactionCommitted = transactionSequenceNumber; } else { // Guard evaluated was false so create abort @@ -972,22 +1749,29 @@ final public class Table { transaction.getClientLocalSequenceNumber(), transaction.getSequenceNumber(), transaction.getMachineId(), - transaction.getArbitrator()); + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; - // Add the abort to the queue of aborts to send out - pendingSendArbitrationEntries.add(newAbort); + generatedAborts.add(newAbort); // Insert the abort so we can process processEntry(newAbort); } + + lastSeqNumArbOn = transactionSequenceNumber; + + // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); } + Commit newCommit = null; + // If there is something to commit if (speculativeTableTmp.size() != 0) { // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted); - localCommitSequenceNumber++; + newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); + localArbitrationSequenceNumber++; // Add all the new keys to the commit for (KeyValue kv : speculativeTableTmp.values()) { @@ -998,34 +1782,57 @@ final public class Table { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - pendingSendArbitrationEntries.addAll(newCommit.getParts().values()); // Insert the commit so we can process it for (CommitPart commitPart : newCommit.getParts().values()) { processEntry(commitPart); } } + + if ((newCommit != null) || (generatedAborts.size() > 0)) { + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + if (newArbitrationRound.getCommit() != null) { + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } + } + } } - public void arbitrateOnLocalTransaction(Transaction transaction) { + private Pair arbitrateOnLocalTransaction(Transaction transaction) { // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { - return; + return new Pair(false, false); } if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this - return; + return new Pair(false, false); + } + + if (transaction.getMachineId() != localMachineId) { + // dont do this check for local transactions + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) { + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { + // We've have already seen this from the server + return new Pair(false, false); + } + } } if (transaction.evaluateGuard(committedKeyValueTable, null, null)) { // Guard evaluated as true // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1); - localCommitSequenceNumber++; + Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); + localArbitrationSequenceNumber++; // Update the local changes so we can make the commit for (KeyValue kv : transaction.getKeyValueUpdateSet()) { @@ -1036,23 +1843,164 @@ final public class Table { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - pendingSendArbitrationEntries.addAll(newCommit.getParts().values()); + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + pendingSendArbitrationRounds.add(arbitrationRound); - // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { - processEntry(commitPart); + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } else { + // Insert the commit so we can process it + for (CommitPart commitPart : newCommit.getParts().values()) { + processEntry(commitPart); + } } - TransactionStatus status = transaction.getTransactionStatus(); - status.setStatus(TransactionStatus.StatusCommitted); + if (transaction.getMachineId() == localMachineId) { + TransactionStatus status = transaction.getTransactionStatus(); + if (status != null) { + status.setStatus(TransactionStatus.StatusCommitted); + } + } + updateLiveStateFromLocal(); + return new Pair(true, true); } else { - // Guard evaluated was false so create abort - TransactionStatus status = transaction.getTransactionStatus(); - status.setStatus(TransactionStatus.StatusAborted); + + if (transaction.getMachineId() == localMachineId) { + // For locally created messages update the status + + // Guard evaluated was false so create abort + TransactionStatus status = transaction.getTransactionStatus(); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); + } + } else { + Set addAbortSet = new HashSet(); + + + // Create the abort + Abort newAbort = new Abort(null, + transaction.getClientLocalSequenceNumber(), + -1, + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + addAbortSet.add(newAbort); + + + // Append all the commit parts to the end of the pending queue waiting for sending to the server + ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } + } + + updateLiveStateFromLocal(); + return new Pair(true, false); } } + /** + * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + */ + private boolean compactArbitrationData() { + + if (pendingSendArbitrationRounds.size() < 2) { + // Nothing to compact so do nothing + return false; + } + + ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + if (lastRound.didSendPart()) { + return false; + } + + boolean hadCommit = (lastRound.getCommit() == null); + boolean gotNewCommit = false; + + int numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds.size()) { + ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); + + if (round.isFull() || round.didSendPart()) { + // Stop since there is a part that cannot be compacted and we need to compact in order + break; + } + + if (round.getCommit() == null) { + + // Try compacting aborts only + int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; + } + lastRound.addAborts(round.getAborts()); + } else { + + // Create a new larger commit + Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + // Create the commit parts so that we can count them + newCommit.createCommitParts(); + + // Calculate the new size of the parts + int newSize = newCommit.getNumberOfParts(); + newSize += lastRound.getAbortsCount(); + newSize += round.getAbortsCount(); + + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; + } + + // Set the new compacted part + lastRound.setCommit(newCommit); + lastRound.addAborts(round.getAborts()); + gotNewCommit = true; + } + + numberToDelete++; + } + + if (numberToDelete != 1) { + // If there is a compaction + + // Delete the previous pieces that are now in the new compacted piece + if (numberToDelete == pendingSendArbitrationRounds.size()) { + pendingSendArbitrationRounds.clear(); + } else { + for (int i = 0; i < numberToDelete; i++) { + pendingSendArbitrationRounds.removeIndex(pendingSendArbitrationRounds.size() - 1); + } + } + + // Add the new compacted into the pending to send list + pendingSendArbitrationRounds.add(lastRound); + + // Should reinsert into the commit processor + if (hadCommit && gotNewCommit) { + return true; + } + } + + return false; + } + // private boolean compactArbitrationData() { + // return false; + // } + /** * Update all the commits and the committed tables, sets dead the dead transactions */ @@ -1111,6 +2059,12 @@ final public class Table { List commitSequenceNumbers = new ArrayList(commitForClientTable.keySet()); Collections.sort(commitSequenceNumbers); + // Get the last commit seen from this arbitrator + long lastCommitSeenSequenceNumber = -1; + if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) { + lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId); + } + // Go through each new commit one by one for (int i = 0; i < commitSequenceNumbers.size(); i++) { Long commitSequenceNumber = commitSequenceNumbers.get(i); @@ -1130,16 +2084,28 @@ final public class Table { } } - // Get the last commit seen from this arbitrator - long lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) { - lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()); - } - - + // Update the last transaction that was updated if we can + if (commit.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + } + } + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) { + long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); + if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + } // We have already seen this commit before so need to do the full processing on this commit if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { @@ -1157,7 +2123,6 @@ final public class Table { continue; } - // If we got here then this is a brand new commit and needs full processing // Get what commits should be edited, these are the commits that have live values for their keys @@ -1186,16 +2151,12 @@ final public class Table { } // Update the last seen sequence number from this arbitrator - lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); - - // Update the last transaction that was updated if we can - if (commit.getTransactionSequenceNumber() != -1) { - Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { - lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); + if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) { + if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { + lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } + } else { + lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } // We processed a new commit that we havent seen before @@ -1367,7 +2328,6 @@ final public class Table { } } - /** * Process this slot, entry by entry. Also update the latest message sent by slot */ @@ -1405,7 +2365,7 @@ final public class Table { break; case Entry.TypeTableStatus: - processEntry((TableStatus)entry); + processEntry((TableStatus)entry, slot.getSequenceNumber()); break; default: @@ -1443,8 +2403,11 @@ final public class Table { * keeps track of the largest and smallest table status seen in this current round * of updating the local copy of the block chain */ - private void processEntry(TableStatus entry) { + private void processEntry(TableStatus entry, long seq) { int newNumSlots = entry.getMaxSlots(); + updateCurrMaxSize(newNumSlots); + + initExpectedSize(seq, newNumSlots); if (liveTableStatus != null) { // We have a larger table status so the old table status is no longer alive @@ -1453,14 +2416,6 @@ final public class Table { // Make this new table status the latest alive table status liveTableStatus = entry; - - if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) { - smallestTableStatusSeen = newNumSlots; - } - - if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) { - largestTableStatusSeen = newNumSlots; - } } /** @@ -1471,6 +2426,7 @@ final public class Table { long newSeqNum = entry.getNewSeqNum(); boolean isequal = entry.getEqual(); long machineId = entry.getMachineID(); + long seq = entry.getSequenceNumber(); // Check if we have messages that were supposed to be rejected in our local block chain @@ -1506,7 +2462,7 @@ final public class Table { Pair lastMessageValue = lastMessageEntry.getValue(); long entrySequenceNumber = lastMessageValue.getFirst(); - if (entrySequenceNumber < newSeqNum) { + if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this machine ID did not see yet addWatchList(lastMessageEntryMachineId, entry); @@ -1531,25 +2487,52 @@ final public class Table { */ private void processEntry(Abort entry) { + + if (entry.getTransactionSequenceNumber() != -1) { + // update the transaction status if it was sent to the server + TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); + } + } + // Abort has not been seen by the client it is for yet so we need to keep track of it Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); if (previouslySeenAbort != null) { previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version } + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); + } + if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { // The machine already saw this so it is dead entry.setDead(); - liveAbortTable.remove(entry); + liveAbortTable.remove(entry.getAbortId()); + + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); + } + return; } - // update the transaction status - TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); - if (status != null) { - status.setStatus(TransactionStatus.StatusAborted); + + + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) { + + long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); + if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); + } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } @@ -1602,10 +2585,25 @@ final public class Table { * Process new commit entries and save them for future use. Delete duplicates */ private void processEntry(CommitPart entry) { + + + // Update the last transaction that was updated if we can + if (entry.getTransactionSequenceNumber() != -1) { + Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); + + // Update the last transaction sequence number that the arbitrator arbitrated on + if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { + lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); + } + } + + + + Map, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); if (commitPart == null) { - // Dont have a table for this machine Id yet so make one + // Don't have a table for this machine Id yet so make one commitPart = new HashMap, CommitPart>(); newCommitParts.put(entry.getMachineId(), commitPart); } @@ -1640,7 +2638,7 @@ final public class Table { RejectedMessage rm = rmit.next(); // If this machine Id has seen this rejected message... - if (rm.getNewSeqNum() <= seqNum) { + if (rm.getSequenceNumber() <= seqNum) { // Remove it from our watchlist rmit.remove(); @@ -1658,6 +2656,10 @@ final public class Table { if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { abort.setDead(); i.remove(); + + if (abort.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); + } } } @@ -1698,9 +2700,17 @@ final public class Table { // Make sure the server is not playing any games if (machineId == localMachineId) { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqNum + " got: " + lastMessageSeqNum); + if (hadPartialSendToServer) { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); + } + + } else { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); + } } } else { if (lastMessageSeqNum > seqNum) { @@ -1735,4 +2745,4 @@ final public class Table { throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } -} \ No newline at end of file +}