/** * IoTTable data structure. Provides client interface. * @author Brian Demsky * @version 1.0 */ final class Table { /* Constants */ static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10 static final int SKIP_THRESHOLD = 10; static final double RESIZE_MULTIPLE = 1.2; static final double RESIZE_THRESHOLD = 0.75; static final int REJECTED_THRESHOLD = 5; /* Helper Objects */ SlotBuffer buffer = NULL; CloudComm cloud = NULL; Random random = NULL; TableStatus liveTableStatus = NULL; PendingTransaction pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction Transaction lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction Transaction firstPendingTransaction = NULL; // first transaction in the pending transaction list /* Variables */ int numberOfSlots = 0; // Number of slots stored in buffer int bufferResizeThreshold = 0; // Threshold on the number of live slots before a resize is needed int64_t liveSlotCount = 0; // Number of currently live slots int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry int64_t localMachineId = 0; // Machine ID of this client device int64_t sequenceNumber = 0; // Largest sequence number a client has received int64_t localSequenceNumber = 0; // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on int64_t localArbitrationSequenceNumber = 0; bool hadPartialSendToServer = false; bool attemptedToSendToServer = false; int64_t expectedsize; bool didFindTableStatus = false; int64_t currMaxSize = 0; Slot lastSlotAttemptedToSend = NULL; bool lastIsNewKey = false; int lastNewSize = 0; Hashtable> lastTransactionPartsSent = NULL; List lastPendingSendArbitrationEntriesToDelete = NULL; NewKey lastNewKey = NULL; /* Data Structures */ Hashtable committedKeyValueTable = NULL; // Table of committed key value pairs Hashtable speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value Hashtable pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions Hashtable liveNewKeyTable = NULL; // Table of live new keys Hashtable> lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); Hashtable> rejectedMessageWatchListTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet Hashtable arbitratorTable = NULL; // Table of keys and their arbitrators Hashtable, Abort> liveAbortTable = NULL; // Table live abort messages Hashtable, TransactionPart>> newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server Hashtable, CommitPart>> newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server Hashtable lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on Hashtable liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number Hashtable, Transaction> liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID Hashtable> liveCommitsTable = NULL; Hashtable liveCommitsByKeyTable = NULL; Hashtable lastCommitSeenSequenceNumberByArbitratorTable = NULL; Vector rejectedSlotList = NULL; // List of rejected slots that have yet to be sent to the server List pendingTransactionQueue = NULL; List pendingSendArbitrationRounds = NULL; List pendingSendArbitrationEntriesToDelete = NULL; Hashtable> transactionPartsSent = NULL; Hashtable outstandingTransactionStatus = NULL; Hashtable liveAbortsGeneratedByLocal = NULL; Set> offlineTransactionsCommittedAndAtServer = NULL; Hashtable> localCommunicationTable = NULL; Hashtable lastTransactionSeenFromMachineFromServer = NULL; Hashtable lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) { localMachineId = _localMachineId; cloud = new CloudComm(this, baseurl, password, listeningPort); init(); } Table(CloudComm _cloud, int64_t _localMachineId) { localMachineId = _localMachineId; cloud = _cloud; init(); } /** * Init all the stuff needed for for table usage */ void init() { // Init helper objects random = new Random(); buffer = new SlotBuffer(); // Set Variables oldestLiveSlotSequenceNumver = 1; // init data structs committedKeyValueTable = new Hashtable(); speculatedKeyValueTable = new Hashtable(); pendingTransactionSpeculatedKeyValueTable = new Hashtable(); liveNewKeyTable = new Hashtable(); lastMessageTable = new Hashtable>(); rejectedMessageWatchListTable = new Hashtable>(); arbitratorTable = new Hashtable(); liveAbortTable = new Hashtable, Abort>(); newTransactionParts = new Hashtable, TransactionPart>>(); newCommitParts = new Hashtable, CommitPart>>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); liveTransactionByTransactionIdTable = new Hashtable, Transaction>(); liveCommitsTable = new Hashtable>(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotList = new Vector(); pendingTransactionQueue = new ArrayList(); pendingSendArbitrationEntriesToDelete = new ArrayList(); transactionPartsSent = new Hashtable>(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); offlineTransactionsCommittedAndAtServer = new HashSet>(); localCommunicationTable = new Hashtable>(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new ArrayList(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); // Other init stuff numberOfSlots = buffer.capacity(); setResizeThreshold(); } // TODO: delete method synchronized void printSlots() { int64_t o = buffer.getOldestSeqNum(); int64_t n = buffer.getNewestSeqNum(); int[] types = new int[10]; int num = 0; int livec = 0; int deadc = 0; int casdasd = 0; int liveslo = 0; for (int64_t i = o; i < (n + 1); i++) { Slot s = buffer.getSlot(i); if (s.isLive()) { liveslo++; } Vector entries = s.getEntries(); for (Entry e : entries) { if (e.isLive()) { int type = e.getType(); if (type == 6) { RejectedMessage rej = (RejectedMessage)e; casdasd++; System.out.println(rej.getMachineID()); } types[type] = types[type] + 1; num++; livec++; } else { deadc++; } } } for (int i = 0; i < 10; i++) { System.out.println(i + " " + types[i]); } System.out.println("Live count: " + livec); System.out.println("Live Slot count: " + liveslo); System.out.println("Dead count: " + deadc); System.out.println("Old: " + o); System.out.println("New: " + n); System.out.println("Size: " + buffer.size()); // System.out.println("Commits: " + liveCommitsTable.size()); System.out.println("pendingTrans: " + pendingTransactionQueue.size()); System.out.println("Trans Status Out: " + outstandingTransactionStatus.size()); for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) { System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k)); } for (Long a : liveCommitsTable.keySet()) { for (Long b : liveCommitsTable.get(a).keySet()) { for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) { System.out.print(kv + " "); } System.out.print("|| "); } System.out.println(); } } /** * Initialize the table by inserting a table status as the first entry into the table status * also initialize the crypto stuff. */ synchronized void initTable() throws ServerException { cloud.initSecurity(); // Create the first insertion into the block chain which is the table status Slot s = new Slot(this, 1, localMachineId, localSequenceNumber); localSequenceNumber++; TableStatus status = new TableStatus(s, numberOfSlots); s.addEntry(status); Slot[] array = cloud.putSlot(s, numberOfSlots); if (array == NULL) { array = new Slot[] {s}; // update local block chain validateAndUpdate(array, true); } else if (array.length == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); } else { throw new Error("Error on initialization"); } } /** * Rebuild the table from scratch by pulling the latest block chain from the server. */ synchronized void rebuild() throws ServerException { // Just pull the latest slots from the server Slot[] newslots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); sendToServer(NULL); updateLiveTransactionsAndStatus(); } // String toString() { // String retString = " Committed Table: \n"; // retString += "---------------------------\n"; // retString += commitedTable.toString(); // retString += "\n\n"; // retString += " Speculative Table: \n"; // retString += "---------------------------\n"; // retString += speculativeTable.toString(); // return retString; // } synchronized void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber) { localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); } synchronized Long getArbitrator(IoTString key) { return arbitratorTable.get(key); } synchronized void close() { cloud.close(); } synchronized IoTString getCommitted(IoTString key) { KeyValue kv = committedKeyValueTable.get(key); if (kv != NULL) { return kv.getValue(); } else { return NULL; } } synchronized IoTString getSpeculative(IoTString key) { KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); if (kv == NULL) { kv = speculatedKeyValueTable.get(key); } if (kv == NULL) { kv = committedKeyValueTable.get(key); } if (kv != NULL) { return kv.getValue(); } else { return NULL; } } synchronized IoTString getCommittedAtomic(IoTString key) { KeyValue kv = committedKeyValueTable.get(key); if (arbitratorTable.get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } if (kv != NULL) { pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); return kv.getValue(); } else { pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); return NULL; } } synchronized IoTString getSpeculativeAtomic(IoTString key) { if (arbitratorTable.get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } KeyValue kv = pendingTransactionSpeculatedKeyValueTable.get(key); if (kv == NULL) { kv = speculatedKeyValueTable.get(key); } if (kv == NULL) { kv = committedKeyValueTable.get(key); } if (kv != NULL) { pendingTransactionBuilder.addKVGuard(new KeyValue(key, kv.getValue())); return kv.getValue(); } else { pendingTransactionBuilder.addKVGuard(new KeyValue(key, NULL)); return NULL; } } synchronized bool update() { try { Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); updateLiveTransactionsAndStatus(); return true; } catch (Exception e) { // e.printStackTrace(); for (Long m : localCommunicationTable.keySet()) { updateFromLocal(m); } } return false; } synchronized bool createNewKey(IoTString keyName, int64_t machineId) throws ServerException { while (true) { if (arbitratorTable.get(keyName) != NULL) { // There is already an arbitrator return false; } NewKey newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { // If successfully inserted return true; } } } synchronized void startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransactionBuilder = new PendingTransaction(localMachineId); } synchronized void addKV(IoTString key, IoTString value) { // Make sure it is a valid key if (arbitratorTable.get(key) == NULL) { throw new Error("Key not Found."); } // Make sure new key value pair matches the current arbitrator if (!pendingTransactionBuilder.checkArbitrator(arbitratorTable.get(key))) { // TODO: Maybe not throw en error throw new Error("Not all Key Values Match Arbitrator."); } // Add the key value to this transaction KeyValue kv = new KeyValue(key, value); pendingTransactionBuilder.addKV(kv); } synchronized TransactionStatus commitTransaction() { if (pendingTransactionBuilder.getKVUpdates().size() == 0) { // transaction with no updates will have no effect on the system return new TransactionStatus(TransactionStatus.StatusNoEffect, -1); } // Set the local transaction sequence number and increment pendingTransactionBuilder.setClientLocalSequenceNumber(localTransactionSequenceNumber); localTransactionSequenceNumber++; // Create the transaction status TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransactionBuilder.getArbitrator()); // Create the new transaction Transaction newTransaction = pendingTransactionBuilder.createTransaction(); newTransaction.setTransactionStatus(transactionStatus); if (pendingTransactionBuilder.getArbitrator() != localMachineId) { // Add it to the queue and invalidate the builder for safety pendingTransactionQueue.add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); updateLiveStateFromLocal(); } pendingTransactionBuilder = new PendingTransaction(localMachineId); try { sendToServer(NULL); } catch (ServerException e) { Set arbitratorTriedAndFailed = new HashSet(); for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { Transaction transaction = iter.next(); if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client // to preserve ordering for arbitrator continue; } Pair sendReturn = sendTransactionToLocal(transaction); if (sendReturn.getFirst()) { // Failed to contact over local arbitratorTriedAndFailed.add(transaction.getArbitrator()); } else { // Successful contact or should not contact if (sendReturn.getSecond()) { // did arbitrate iter.remove(); } } } } updateLiveStateFromLocal(); return transactionStatus; } /** * Get the machine ID for this client */ int64_t getMachineId() { return localMachineId; } /** * Decrement the number of live slots that we currently have */ void decrementLiveCount() { liveSlotCount--; } /** * Recalculate the new resize threshold */ void setResizeThreshold() { int resizeLower = (int) (RESIZE_THRESHOLD * numberOfSlots); bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); } int64_t getLocalSequenceNumber() { return localSequenceNumber; } bool lastInsertedNewKey = false; bool sendToServer(NewKey newKey) throws ServerException { bool fromRetry = false; try { if (hadPartialSendToServer) { Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); if (newSlots.length == 0) { fromRetry = true; ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { newKey = NULL; } } for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Update which transactions parts still need to be sent transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); // Add the transaction status to the outstanding list outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); // Update the transaction status transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); } } } else { newSlots = sendSlotsReturn.getThird(); bool isInserted = false; for (Slot s : newSlots) { if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { isInserted = true; break; } } for (Slot s : newSlots) { if (isInserted) { break; } // Process each entry in the slot for (Entry entry : s.getEntries()) { if (entry.getType() == Entry.TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { isInserted = true; break; } } } } if (isInserted) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { newKey = NULL; } } for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Update which transactions parts still need to be sent transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); // Add the transaction status to the outstanding list outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); // Update the transaction status transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); } else { transaction.resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); } } } } } for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); } } if (sendSlotsReturn.getThird().length != 0) { // insert into the local block chain validateAndUpdate(sendSlotsReturn.getThird(), true); } // continue; } else { bool isInserted = false; for (Slot s : newSlots) { if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { isInserted = true; break; } } for (Slot s : newSlots) { if (isInserted) { break; } // Process each entry in the slot for (Entry entry : s.getEntries()) { if (entry.getType() == Entry.TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) { isInserted = true; break; } } } } if (isInserted) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) { newKey = NULL; } } for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Update which transactions parts still need to be sent transaction.removeSentParts(lastTransactionPartsSent.get(transaction)); // Add the transaction status to the outstanding list outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); // Update the transaction status transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); } else { transaction.resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); } } } } else { for (Transaction transaction : lastTransactionPartsSent.keySet()) { transaction.resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer()) { transaction.setSequenceNumber(-1); } } } // insert into the local block chain validateAndUpdate(newSlots, true); } } } catch (ServerException e) { throw e; } try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) { fromRetry = false; if (hadPartialSendToServer) { throw new Error("Should Be error free"); } // If there is a new key with same name then end if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) { return false; } // Create the slot Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); bool needsResize = fillSlotsReturn.getFirst(); int newSize = fillSlotsReturn.getSecond(); Boolean insertedNewKey = fillSlotsReturn.getThird(); if (needsResize) { // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } // Clear the sent data since we are trying again pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); // We needed a resize so try again fillSlot(slot, true, newKey); } lastSlotAttemptedToSend = slot; lastIsNewKey = (newKey != NULL); lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; lastTransactionPartsSent = new Hashtable>(transactionPartsSent); lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); if (sendSlotsReturn.getFirst()) { // Did insert into the block chain if (insertedNewKey) { // This slot was what was inserted not a previous slot // New Key was successfully inserted into the block chain so dont want to insert it again newKey = NULL; } // Remove the aborts and commit parts that were sent from the pending to send queue for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { ArbitrationRound round = iter.next(); round.removeParts(pendingSendArbitrationEntriesToDelete); if (round.isDoneSending()) { // Sent all the parts iter.remove(); } } for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetServerFailure(); // Update which transactions parts still need to be sent transaction.removeSentParts(transactionPartsSent.get(transaction)); // Add the transaction status to the outstanding list outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); // Update the transaction status transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); // Check if all the transaction parts were successfully sent and if so then remove it from pending if (transaction.didSendAllParts()) { transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); pendingTransactionQueue.remove(transaction); } } } else { // if (!sendSlotsReturn.getSecond()) { // for (Transaction transaction : lastTransactionPartsSent.keySet()) { // transaction.resetServerFailure(); // } // } else { // for (Transaction transaction : lastTransactionPartsSent.keySet()) { // transaction.resetServerFailure(); // // Update which transactions parts still need to be sent // transaction.removeSentParts(transactionPartsSent.get(transaction)); // // Add the transaction status to the outstanding list // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus()); // // Update the transaction status // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial); // // Check if all the transaction parts were successfully sent and if so then remove it from pending // if (transaction.didSendAllParts()) { // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully); // pendingTransactionQueue.remove(transaction); // for (KeyValue kv : transaction.getKeyValueUpdateSet()) { // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber()); // } // } // } // } // Reset which transaction to send for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); // transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } } // Clear the sent data in preparation for next send pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); if (sendSlotsReturn.getThird().length != 0) { // insert into the local block chain validateAndUpdate(sendSlotsReturn.getThird(), true); } } } catch (ServerException e) { if (e.getType() != ServerException.TypeInputTimeout) { // e.printStackTrace(); // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); // Set the transaction sequence number back to nothing if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) { transaction.setSequenceNumber(-1); } } } else { // There was a partial send to the server hadPartialSendToServer = true; // if (!fromRetry) { // lastTransactionPartsSent = new Hashtable>(transactionPartsSent); // lastPendingSendArbitrationEntriesToDelete = new ArrayList(pendingSendArbitrationEntriesToDelete); // } // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { transaction.resetNextPartToSend(); transaction.setServerFailure(); } } pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); throw e; } return newKey == NULL; } synchronized bool updateFromLocal(int64_t machineId) { Pair localCommunicationInformation = localCommunicationTable.get(machineId); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing return false; } // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1; if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != NULL) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId); } char[] sendData = new char[sendDataSize]; ByteBuffer bbEncode = ByteBuffer.wrap(sendData); // Encode the data bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); bbEncode.putInt(0); // Send by local char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); localSequenceNumber++; if (returnData == NULL) { // Could not contact server return false; } // Decode the data ByteBuffer bbDecode = ByteBuffer.wrap(returnData); int numberOfEntries = bbDecode.getInt(); for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode.get(); if (type == Entry.TypeAbort) { Abort abort = (Abort)Abort.decode(NULL, bbDecode); processEntry(abort); } else if (type == Entry.TypeCommitPart) { CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); processEntry(commitPart); } } updateLiveStateFromLocal(); return true; } Pair sendTransactionToLocal(Transaction transaction) { // Get the devices local communications Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing return new Pair(true, false); } // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); for (TransactionPart part : transaction.getParts().values()) { sendDataSize += part.getSize(); } Long lastArbitrationDataLocalSequenceNumber = (int64_t) - 1; if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != NULL) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); } // Make the send data size char[] sendData = new char[sendDataSize]; ByteBuffer bbEncode = ByteBuffer.wrap(sendData); // Encode the data bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); bbEncode.putInt(transaction.getParts().size()); for (TransactionPart part : transaction.getParts().values()) { part.encode(bbEncode); } // Send by local char[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); localSequenceNumber++; if (returnData == NULL) { // Could not contact server return new Pair(true, false); } // Decode the data ByteBuffer bbDecode = ByteBuffer.wrap(returnData); bool didCommit = bbDecode.get() == 1; bool couldArbitrate = bbDecode.get() == 1; int numberOfEntries = bbDecode.getInt(); bool foundAbort = false; for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode.get(); if (type == Entry.TypeAbort) { Abort abort = (Abort)Abort.decode(NULL, bbDecode); if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { foundAbort = true; } processEntry(abort); } else if (type == Entry.TypeCommitPart) { CommitPart commitPart = (CommitPart)CommitPart.decode(NULL, bbDecode); processEntry(commitPart); } } updateLiveStateFromLocal(); if (couldArbitrate) { TransactionStatus status = transaction.getTransactionStatus(); if (didCommit) { status.setStatus(TransactionStatus.StatusCommitted); } else { status.setStatus(TransactionStatus.StatusAborted); } } else { TransactionStatus status = transaction.getTransactionStatus(); if (foundAbort) { status.setStatus(TransactionStatus.StatusAborted); } else { status.setStatus(TransactionStatus.StatusCommitted); } } return new Pair(false, true); } synchronized char[] acceptDataFromLocal(char[] data) { // Decode the data ByteBuffer bbDecode = ByteBuffer.wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode.getLong(); int numberOfParts = bbDecode.getInt(); // If we did commit a transaction or not bool didCommit = false; bool couldArbitrate = false; if (numberOfParts != 0) { // decode the transaction Transaction transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { bbDecode.get(); TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); transaction.addPartDecode(newPart); } // Arbitrate on transaction and pull relevant return data Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); couldArbitrate = localArbitrateReturn.getFirst(); didCommit = localArbitrateReturn.getSecond(); updateLiveStateFromLocal(); // Transaction was sent to the server so keep track of it to prevent double commit if (transaction.getSequenceNumber() != -1) { offlineTransactionsCommittedAndAtServer.add(transaction.getId()); } } // The data to send back int returnDataSize = 0; List unseenArbitrations = new ArrayList(); // Get the aborts to send back List abortLocalSequenceNumbers = new ArrayList(liveAbortsGeneratedByLocal.keySet()); Collections.sort(abortLocalSequenceNumbers); for (Long localSequenceNumber : abortLocalSequenceNumbers) { if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); unseenArbitrations.add(abort); returnDataSize += abort.getSize(); } // Get the commits to send back Hashtable commitForClientTable = liveCommitsTable.get(localMachineId); if (commitForClientTable != NULL) { List commitLocalSequenceNumbers = new ArrayList(commitForClientTable.keySet()); Collections.sort(commitLocalSequenceNumbers); for (Long localSequenceNumber : commitLocalSequenceNumbers) { Commit commit = commitForClientTable.get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } unseenArbitrations.addAll(commit.getParts().values()); for (CommitPart commitPart : commit.getParts().values()) { returnDataSize += commitPart.getSize(); } } } // Number of arbitration entries to decode returnDataSize += 2 * sizeof(int32_t); // Boolean of did commit or not if (numberOfParts != 0) { returnDataSize += sizeof(char); } // Data to send Back char[] returnData = new char[returnDataSize]; ByteBuffer bbEncode = ByteBuffer.wrap(returnData); if (numberOfParts != 0) { if (didCommit) { bbEncode.put((char)1); } else { bbEncode.put((char)0); } if (couldArbitrate) { bbEncode.put((char)1); } else { bbEncode.put((char)0); } } bbEncode.putInt(unseenArbitrations.size()); for (Entry entry : unseenArbitrations) { entry.encode(bbEncode); } localSequenceNumber++; return returnData; } ThreeTuple sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; bool inserted = false; bool lastTryInserted = false; Slot[] array = cloud.putSlot(slot, newSize); if (array == NULL) { array = new Slot[] {slot}; rejectedSlotList.clear(); inserted = true; } else { if (array.length == 0) { throw new Error("Server Error: Did not send any slots"); } // if (attemptedToSendToServerTmp) { if (hadPartialSendToServer) { bool isInserted = false; for (Slot s : array) { if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { isInserted = true; break; } } for (Slot s : array) { if (isInserted) { break; } // Process each entry in the slot for (Entry entry : s.getEntries()) { if (entry.getType() == Entry.TypeLastMessage) { LastMessage lastMessage = (LastMessage)entry; if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { isInserted = true; break; } } } } if (!isInserted) { rejectedSlotList.add(slot.getSequenceNumber()); lastTryInserted = false; } else { lastTryInserted = true; } } else { rejectedSlotList.add(slot.getSequenceNumber()); lastTryInserted = false; } } return new ThreeTuple(inserted, lastTryInserted, array); } /** * Returns false if a resize was needed */ ThreeTuple fillSlot(Slot slot, bool resize, NewKey newKeyEntry) { int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { resize = true; //Resize is forced } if (resize) { newSize = (int) (numberOfSlots * RESIZE_MULTIPLE); TableStatus status = new TableStatus(slot, newSize); slot.addEntry(status); } // Fill with rejected slots first before doing anything else doRejectedMessages(slot); // Do mandatory rescue of entries ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables bool needsResize = mandatoryRescueReturn.getFirst(); bool seenLiveSlot = mandatoryRescueReturn.getSecond(); int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { // We need to resize but we are not resizing so return false return new ThreeTuple(true, NULL, NULL); } bool inserted = false; if (newKeyEntry != NULL) { newKeyEntry.setSlot(slot); if (slot.hasSpace(newKeyEntry)) { slot.addEntry(newKeyEntry); inserted = true; } } // Clear the transactions, aborts and commits that were sent previously transactionPartsSent.clear(); pendingSendArbitrationEntriesToDelete.clear(); for (ArbitrationRound round : pendingSendArbitrationRounds) { bool isFull = false; round.generateParts(); List parts = round.getParts(); // Insert pending arbitration data for (Entry arbitrationData : parts) { // If it is an abort then we need to set some information if (arbitrationData instanceof Abort) { ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); } if (!slot.hasSpace(arbitrationData)) { // No space so cant do anything else with these data entries isFull = true; break; } // Add to this current slot and add it to entries to delete slot.addEntry(arbitrationData); pendingSendArbitrationEntriesToDelete.add(arbitrationData); } if (isFull) { break; } } if (pendingTransactionQueue.size() > 0) { Transaction transaction = pendingTransactionQueue.get(0); // Set the transaction sequence number if it has yet to be inserted into the block chain // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) { // transaction.setSequenceNumber(slot.getSequenceNumber()); // } if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) { transaction.setSequenceNumber(slot.getSequenceNumber()); } while (true) { TransactionPart part = transaction.getNextPartToSend(); if (part == NULL) { // Ran out of parts to send for this transaction so move on break; } if (slot.hasSpace(part)) { slot.addEntry(part); List partsSent = transactionPartsSent.get(transaction); if (partsSent == NULL) { partsSent = new ArrayList(); transactionPartsSent.put(transaction, partsSent); } partsSent.add(part.getPartNumber()); transactionPartsSent.put(transaction, partsSent); } else { break; } } } // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); return new ThreeTuple(false, newSize, inserted); } void doRejectedMessages(Slot s) { if (! rejectedSlotList.isEmpty()) { /* TODO: We should avoid generating a rejected message entry if * there is already a sufficient entry in the queue (e.g., * equalsto value of true and same sequence number). */ int64_t old_seqn = rejectedSlotList.firstElement(); if (rejectedSlotList.size() > REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotList.lastElement(); RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s.addEntry(rm); } else { int64_t prev_seqn = -1; int i = 0; /* Go through list of missing messages */ for (; i < rejectedSlotList.size(); i++) { int64_t curr_seqn = rejectedSlotList.get(i); Slot s_msg = buffer.getSlot(curr_seqn); if (s_msg != NULL) break; prev_seqn = curr_seqn; } /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); s.addEntry(rm); } /* Generate rejected message entries for present messages */ for (; i < rejectedSlotList.size(); i++) { int64_t curr_seqn = rejectedSlotList.get(i); Slot s_msg = buffer.getSlot(curr_seqn); int64_t machineid = s_msg.getMachineID(); RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); s.addEntry(rm); } } } } ThreeTuple doMandatoryResuce(Slot slot, bool resize) { int64_t newestSequenceNumber = buffer.getNewestSeqNum(); int64_t oldestSequenceNumber = buffer.getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { oldestLiveSlotSequenceNumver = oldestSequenceNumber; } int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; bool seenLiveSlot = false; int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full int64_t threshold = firstIfFull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { Slot previousSlot = buffer.getSlot(currentSequenceNumber); // Push slot number forward if (! seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; } if (!previousSlot.isLive()) { continue; } // We have seen a live slot seenLiveSlot = true; // Get all the live entries for a slot Vector liveEntries = previousSlot.getLiveEntries(resize); // Iterate over all the live entries and try to rescue them for (Entry liveEntry : liveEntries) { if (slot.hasSpace(liveEntry)) { // Enough space to rescue the entry slot.addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue System.out.println("B"); //? return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); } } } // Did not resize return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); } void doOptionalRescue(Slot s, bool seenliveslot, int64_t seqn, bool resize) { /* now go through live entries from least to greatest sequence number until * either all live slots added, or the slot doesn't have enough room * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer.getNewestSeqNum(); search: for (; seqn <= newestseqnum; seqn++) { Slot prevslot = buffer.getSlot(seqn); //Push slot number forward if (!seenliveslot) oldestLiveSlotSequenceNumver = seqn; if (!prevslot.isLive()) continue; seenliveslot = true; Vector liveentries = prevslot.getLiveEntries(resize); for (Entry liveentry : liveentries) { if (s.hasSpace(liveentry)) s.addEntry(liveentry); else { skipcount++; if (skipcount > SKIP_THRESHOLD) break search; } } } } /** * Checks for malicious activity and updates the local copy of the block chain. */ void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { // The cloud communication layer has checked slot HMACs already before decoding if (newSlots.length == 0) { return; } // Make sure all slots are newer than the last largest slot this client has seen int64_t firstSeqNum = newSlots[0].getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { throw new Error("Server Error: Sent older slots!"); } // Create an object that can access both new slots and slots in our local chain // without committing slots to our local chain SlotIndexer indexer = new SlotIndexer(newSlots, buffer); // Check that the HMAC chain is not broken checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients HashSet machineSet = new HashSet(lastMessageTable.keySet()); // Process each slots data for (Slot slot : newSlots) { processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); updateExpectedSize(); } // If there is a gap, check to see if the server sent us everything. if (firstSeqNum != (sequenceNumber + 1)) { // Check the size of the slots that were sent down by the server. // Can only check the size if there was a gap checkNumSlots(newSlots.length); // Since there was a gap every machine must have pushed a slot or must have // a last message message. If not then the server is hiding slots if (!machineSet.isEmpty()) { throw new Error("Missing record for machines: " + machineSet); } } // Update the size of our local block chain. commitNewMaxSize(); // Commit new to slots to the local block chain. for (Slot slot : newSlots) { // Insert this slot into our local block chain copy. buffer.putSlot(slot); // Keep track of how many slots are currently live (have live data in them). liveSlotCount++; } // Get the sequence number of the latest slot in the system sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); updateLiveStateFromServer(); // No Need to remember after we pulled from the server offlineTransactionsCommittedAndAtServer.clear(); // This is invalidated now hadPartialSendToServer = false; } void updateLiveStateFromServer() { // Process the new transaction parts processNewTransactionParts(); // Do arbitration on new transactions that were received arbitrateFromServer(); // Update all the committed keys bool didCommitOrSpeculate = updateCommittedTable(); // Delete the transactions that are now dead updateLiveTransactionsAndStatus(); // Do speculations didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); } void updateLiveStateFromLocal() { // Update all the committed keys bool didCommitOrSpeculate = updateCommittedTable(); // Delete the transactions that are now dead updateLiveTransactionsAndStatus(); // Do speculations didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); } void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { // if (didFindTableStatus) { // return; // } int64_t prevslots = firstSequenceNumber; if (didFindTableStatus) { // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; // System.out.println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); } else { expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; // System.out.println("Here: " + expectedsize); } // System.out.println(numberOfSlots); didFindTableStatus = true; currMaxSize = numberOfSlots; } void updateExpectedSize() { expectedsize++; if (expectedsize > currMaxSize) { expectedsize = currMaxSize; } } /** * Check the size of the block chain to make sure there are enough slots sent back by the server. * This is only called when we have a gap between the slots that we have locally and the slots * sent by the server therefore in the slots sent by the server there will be at least 1 Table * status message */ void checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); } } void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; } /** * Update the size of of the local buffer if it is needed. */ void commitNewMaxSize() { didFindTableStatus = false; // Resize the local slot buffer if (numberOfSlots != currMaxSize) { buffer.resize((int)currMaxSize); } // Change the number of local slots to the new size numberOfSlots = (int)currMaxSize; // Recalculate the resize threshold since the size of the local buffer has changed setResizeThreshold(); } /** * Process the new transaction parts from this latest round of slots received from the server */ void processNewTransactionParts() { if (newTransactionParts.size() == 0) { // Nothing new to process return; } // Iterate through all the machine Ids that we received new parts for for (Long machineId : newTransactionParts.keySet()) { Hashtable, TransactionPart> parts = newTransactionParts.get(machineId); // Iterate through all the parts for that machine Id for (Pair partId : parts.keySet()) { TransactionPart part = parts.get(partId); Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(part.getArbitratorId()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part.getSequenceNumber())) { // Set dead the transaction part part.setDead(); continue; } // Get the transaction object for that sequence number Transaction transaction = liveTransactionBySequenceNumberTable.get(part.getSequenceNumber()); if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable.put(part.getSequenceNumber(), transaction); liveTransactionByTransactionIdTable.put(part.getTransactionId(), transaction); } // Add that part to the transaction transaction.addPartDecode(part); } } // Clear all the new transaction parts in preparation for the next time the server sends slots newTransactionParts.clear(); } int64_t lastSeqNumArbOn = 0; void arbitrateFromServer() { if (liveTransactionBySequenceNumberTable.size() == 0) { // Nothing to arbitrate on so move on return; } // Get the transaction sequence numbers and sort from oldest to newest List transactionSequenceNumbers = new ArrayList(liveTransactionBySequenceNumberTable.keySet()); Collections.sort(transactionSequenceNumbers); // Collection of key value pairs that are Hashtable speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Set generatedAborts = new HashSet(); for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { continue; } if (transactionSequenceNumber < lastSeqNumArbOn) { continue; } if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { // We have seen this already locally so dont commit again continue; } if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } // update the largest transaction seen by arbitrator from server if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == NULL) { lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); } else { Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); } } if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true // Update the local changes so we can make the commit for (KeyValue kv : transaction.getKeyValueUpdateSet()) { speculativeTableTmp.put(kv.getKey(), kv); } // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; } else { // Guard evaluated was false so create abort // Create the abort Abort newAbort = new Abort(NULL, transaction.getClientLocalSequenceNumber(), transaction.getSequenceNumber(), transaction.getMachineId(), transaction.getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; generatedAborts.add(newAbort); // Insert the abort so we can process processEntry(newAbort); } lastSeqNumArbOn = transactionSequenceNumber; // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber); } Commit newCommit = NULL; // If there is something to commit if (speculativeTableTmp.size() != 0) { // Create the commit and increment the commit sequence number newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); localArbitrationSequenceNumber++; // Add all the new keys to the commit for (KeyValue kv : speculativeTableTmp.values()) { newCommit.addKV(kv); } // create the commit parts newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server // Insert the commit so we can process it for (CommitPart commitPart : newCommit.getParts().values()) { processEntry(commitPart); } } if ((newCommit != NULL) || (generatedAborts.size() > 0)) { ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); if (newArbitrationRound.getCommit() != NULL) { for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { processEntry(commitPart); } } } } } Pair arbitrateOnLocalTransaction(Transaction transaction) { // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { return new Pair(false, false); } if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this return new Pair(false, false); } if (transaction.getMachineId() != localMachineId) { // dont do this check for local transactions if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != NULL) { if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { // We've have already seen this from the server return new Pair(false, false); } } } if (transaction.evaluateGuard(committedKeyValueTable, NULL, NULL)) { // Guard evaluated as true // Create the commit and increment the commit sequence number Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); localArbitrationSequenceNumber++; // Update the local changes so we can make the commit for (KeyValue kv : transaction.getKeyValueUpdateSet()) { newCommit.addKV(kv); } // create the commit parts newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { processEntry(commitPart); } } else { // Insert the commit so we can process it for (CommitPart commitPart : newCommit.getParts().values()) { processEntry(commitPart); } } if (transaction.getMachineId() == localMachineId) { TransactionStatus status = transaction.getTransactionStatus(); if (status != NULL) { status.setStatus(TransactionStatus.StatusCommitted); } } updateLiveStateFromLocal(); return new Pair(true, true); } else { if (transaction.getMachineId() == localMachineId) { // For locally created messages update the status // Guard evaluated was false so create abort TransactionStatus status = transaction.getTransactionStatus(); if (status != NULL) { status.setStatus(TransactionStatus.StatusAborted); } } else { Set addAbortSet = new HashSet(); // Create the abort Abort newAbort = new Abort(NULL, transaction.getClientLocalSequenceNumber(), -1, transaction.getMachineId(), transaction.getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; addAbortSet.add(newAbort); // Append all the commit parts to the end of the pending queue waiting for sending to the server ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { processEntry(commitPart); } } } updateLiveStateFromLocal(); return new Pair(true, false); } } /** * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates */ bool compactArbitrationData() { if (pendingSendArbitrationRounds.size() < 2) { // Nothing to compact so do nothing return false; } ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); if (lastRound.didSendPart()) { return false; } bool hadCommit = (lastRound.getCommit() == NULL); bool gotNewCommit = false; int numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds.size()) { ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); if (round.isFull() || round.didSendPart()) { // Stop since there is a part that cannot be compacted and we need to compact in order break; } if (round.getCommit() == NULL) { // Try compacting aborts only int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); if (newSize > ArbitrationRound.MAX_PARTS) { // Cant compact since it would be too large break; } lastRound.addAborts(round.getAborts()); } else { // Create a new larger commit Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them newCommit.createCommitParts(); // Calculate the new size of the parts int newSize = newCommit.getNumberOfParts(); newSize += lastRound.getAbortsCount(); newSize += round.getAbortsCount(); if (newSize > ArbitrationRound.MAX_PARTS) { // Cant compact since it would be too large break; } // Set the new compacted part lastRound.setCommit(newCommit); lastRound.addAborts(round.getAborts()); gotNewCommit = true; } numberToDelete++; } if (numberToDelete != 1) { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece if (numberToDelete == pendingSendArbitrationRounds.size()) { pendingSendArbitrationRounds.clear(); } else { for (int i = 0; i < numberToDelete; i++) { pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1); } } // Add the new compacted into the pending to send list pendingSendArbitrationRounds.add(lastRound); // Should reinsert into the commit processor if (hadCommit && gotNewCommit) { return true; } } return false; } // bool compactArbitrationData() { // return false; // } /** * Update all the commits and the committed tables, sets dead the dead transactions */ bool updateCommittedTable() { if (newCommitParts.size() == 0) { // Nothing new to process return false; } // Iterate through all the machine Ids that we received new parts for for (Long machineId : newCommitParts.keySet()) { Hashtable, CommitPart> parts = newCommitParts.get(machineId); // Iterate through all the parts for that machine Id for (Pair partId : parts.keySet()) { CommitPart part = parts.get(partId); // Get the transaction object for that sequence number Hashtable commitForClientTable = liveCommitsTable.get(part.getMachineId()); if (commitForClientTable == NULL) { // This is the first commit from this device commitForClientTable = new Hashtable(); liveCommitsTable.put(part.getMachineId(), commitForClientTable); } Commit commit = commitForClientTable.get(part.getSequenceNumber()); if (commit == NULL) { // This is a new commit that we dont have so make a new one commit = new Commit(); // Insert this new commit into the live tables commitForClientTable.put(part.getSequenceNumber(), commit); } // Add that part to the commit commit.addPartDecode(part); } } // Clear all the new commits parts in preparation for the next time the server sends slots newCommitParts.clear(); // If we process a new commit keep track of it for future use bool didProcessANewCommit = false; // Process the commits one by one for (Long arbitratorId : liveCommitsTable.keySet()) { // Get all the commits for a specific arbitrator Hashtable commitForClientTable = liveCommitsTable.get(arbitratorId); // Sort the commits in order List commitSequenceNumbers = new ArrayList(commitForClientTable.keySet()); Collections.sort(commitSequenceNumbers); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != NULL) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId); } // Go through each new commit one by one for (int i = 0; i < commitSequenceNumbers.size(); i++) { Long commitSequenceNumber = commitSequenceNumbers.get(i); Commit commit = commitForClientTable.get(commitSequenceNumber); // Special processing if a commit is not complete if (!commit.isComplete()) { if (i == (commitSequenceNumbers.size() - 1)) { // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits break; } else { // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet). // Delete it and move on commit.setDead(); commitForClientTable.remove(commit.getSequenceNumber()); continue; } } // Update the last transaction that was updated if we can if (commit.getTransactionSequenceNumber() != -1) { Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); } } // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != NULL) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); } } else { // Never seen any data from this arbitrator so record the first one lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); } // We have already seen this commit before so need to do the full processing on this commit if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { // Update the last transaction that was updated if we can if (commit.getTransactionSequenceNumber() != -1) { Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber()); } } continue; } // If we got here then this is a brand new commit and needs full processing // Get what commits should be edited, these are the commits that have live values for their keys Set commitsToEdit = new HashSet(); for (KeyValue kv : commit.getKeyValueUpdateSet()) { commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); } commitsToEdit.remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated for (Commit previousCommit : commitsToEdit) { // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit.isLive()) { // Update which keys in the old commits are still live for (KeyValue kv : commit.getKeyValueUpdateSet()) { previousCommit.invalidateKey(kv.getKey()); } // if the commit is now dead then remove it if (!previousCommit.isLive()) { commitForClientTable.remove(previousCommit); } } } // Update the last seen sequence number from this arbitrator if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != NULL) { if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } } else { lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); } // We processed a new commit that we havent seen before didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key for (KeyValue kv : commit.getKeyValueUpdateSet()) { committedKeyValueTable.put(kv.getKey(), kv); liveCommitsByKeyTable.put(kv.getKey(), commit); } } } return didProcessANewCommit; } /** * Create the speculative table from transactions that are still live and have come from the cloud */ bool updateSpeculativeTable(bool didProcessNewCommits) { if (liveTransactionBySequenceNumberTable.keySet().size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them from oldest to newest List transactionSequenceNumbersSorted = new ArrayList(liveTransactionBySequenceNumberTable.keySet()); Collections.sort(transactionSequenceNumbersSorted); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted.get(0) != oldestTransactionSequenceNumberSpeculatedOn; if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch // Start from scratch speculatedKeyValueTable.clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted.get(0); // Find where to start arbitration from int startIndex = transactionSequenceNumbersSorted.indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; if (startIndex >= transactionSequenceNumbersSorted.size()) { // Make sure we are not out of bounds return false; // did not speculate } Set incompleteTransactionArbitrator = new HashSet(); bool didSkip = true; for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { int64_t transactionSequenceNumber = transactionSequenceNumbersSorted.get(i); Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); if (!transaction.isComplete()) { // If there is an incomplete transaction then there is nothing we can do // add this transactions arbitrator to the list of arbitrators we should ignore incompleteTransactionArbitrator.add(transaction.getArbitrator()); didSkip = true; continue; } if (incompleteTransactionArbitrator.contains(transaction.getArbitrator())) { continue; } lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table for (KeyValue kv : transaction.getKeyValueUpdateSet()) { speculatedKeyValueTable.put(kv.getKey(), kv); } } } if (didSkip) { // Since there was a skip we need to redo the speculation next time around lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; } // We did some speculation return true; } /** * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer */ void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { if (pendingTransactionQueue.size() == 0) { // There is nothing to speculate on return; } if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue.get(0))) { // need to reset on the pending speculation lastPendingTransactionSpeculatedOn = NULL; firstPendingTransaction = pendingTransactionQueue.get(0); pendingTransactionSpeculatedKeyValueTable.clear(); } // Find where to start arbitration from int startIndex = pendingTransactionQueue.indexOf(firstPendingTransaction) + 1; if (startIndex >= pendingTransactionQueue.size()) { // Make sure we are not out of bounds return; } for (int i = startIndex; i < pendingTransactionQueue.size(); i++) { Transaction transaction = pendingTransactionQueue.get(i); lastPendingTransactionSpeculatedOn = transaction; if (transaction.evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table for (KeyValue kv : transaction.getKeyValueUpdateSet()) { pendingTransactionSpeculatedKeyValueTable.put(kv.getKey(), kv); } } } } /** * Set dead and remove from the live transaction tables the transactions that are dead */ void updateLiveTransactionsAndStatus() { // Go through each of the transactions for (Iterator> iter = liveTransactionBySequenceNumberTable.entrySet().iterator(); iter.hasNext();) { Transaction transaction = iter.next().getValue(); // Check if the transaction is dead Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(transaction.getArbitrator()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction.getSequenceNumber())) { // Set dead the transaction transaction.setDead(); // Remove the transaction from the live table iter.remove(); liveTransactionByTransactionIdTable.remove(transaction.getId()); } } // Go through each of the transactions for (Iterator> iter = outstandingTransactionStatus.entrySet().iterator(); iter.hasNext();) { TransactionStatus status = iter.next().getValue(); // Check if the transaction is dead Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(status.getTransactionArbitrator()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status.getTransactionSequenceNumber())) { // Set committed status.setStatus(TransactionStatus.StatusCommitted); // Remove iter.remove(); } } } /** * Process this slot, entry by entry. Also update the latest message sent by slot */ void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet) { // Update the last message seen updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); // Process each entry in the slot for (Entry entry : slot.getEntries()) { switch (entry.getType()) { case Entry.TypeCommitPart: processEntry((CommitPart)entry); break; case Entry.TypeAbort: processEntry((Abort)entry); break; case Entry.TypeTransactionPart: processEntry((TransactionPart)entry); break; case Entry.TypeNewKey: processEntry((NewKey)entry); break; case Entry.TypeLastMessage: processEntry((LastMessage)entry, machineSet); break; case Entry.TypeRejectedMessage: processEntry((RejectedMessage)entry, indexer); break; case Entry.TypeTableStatus: processEntry((TableStatus)entry, slot.getSequenceNumber()); break; default: throw new Error("Unrecognized type: " + entry.getType()); } } } /** * Update the last message that was sent for a machine Id */ void processEntry(LastMessage entry, HashSet machineSet) { // Update what the last message received by a machine was updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) */ void processEntry(NewKey entry) { // Update the arbitrator table with the new key information arbitratorTable.put(entry.getKey(), entry.getMachineID()); // Update what the latest live new key is NewKey oldNewKey = liveNewKeyTable.put(entry.getKey(), entry); if (oldNewKey != NULL) { // Delete the old new key messages oldNewKey.setDead(); } } /** * Process new table status entries and set dead the old ones as new ones come in. * keeps track of the largest and smallest table status seen in this current round * of updating the local copy of the block chain */ void processEntry(TableStatus entry, int64_t seq) { int newNumSlots = entry.getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); if (liveTableStatus != NULL) { // We have a larger table status so the old table status is no int64_ter alive liveTableStatus.setDead(); } // Make this new table status the latest alive table status liveTableStatus = entry; } /** * Check old messages to see if there is a block chain violation. Also */ void processEntry(RejectedMessage entry, SlotIndexer indexer) { int64_t oldSeqNum = entry.getOldSeqNum(); int64_t newSeqNum = entry.getNewSeqNum(); bool isequal = entry.getEqual(); int64_t machineId = entry.getMachineID(); int64_t seq = entry.getSequenceNumber(); // Check if we have messages that were supposed to be rejected in our local block chain for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { // Get the slot Slot slot = indexer.getSlot(seqNum); if (slot != NULL) { // If we have this slot make sure that it was not supposed to be a rejected slot int64_t slotMachineId = slot.getMachineID(); if (isequal != (slotMachineId == machineId)) { throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); } } } // Create a list of clients to watch until they see this rejected message entry. HashSet deviceWatchSet = new HashSet(); for (Map.Entry> lastMessageEntry : lastMessageTable.entrySet()) { // Machine ID for the last message entry int64_t lastMessageEntryMachineId = lastMessageEntry.getKey(); // We've seen it, don't need to continue to watch. Our next // message will implicitly acknowledge it. if (lastMessageEntryMachineId == localMachineId) { continue; } Pair lastMessageValue = lastMessageEntry.getValue(); int64_t entrySequenceNumber = lastMessageValue.getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this machine ID did not see yet addWatchList(lastMessageEntryMachineId, entry); // This client did not see this rejected message yet so add it to the watch set to monitor deviceWatchSet.add(lastMessageEntryMachineId); } } if (deviceWatchSet.isEmpty()) { // This rejected message has been seen by all the clients so entry.setDead(); } else { // We need to watch this rejected message entry.setWatchSet(deviceWatchSet); } } /** * Check if this abort is live, if not then save it so we can kill it later. * update the last transaction number that was arbitrated on. */ void processEntry(Abort entry) { if (entry.getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); if (status != NULL) { status.setStatus(TransactionStatus.StatusAborted); } } // Abort has not been seen by the client it is for yet so we need to keep track of it Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version } if (entry.getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); } if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { // The machine already saw this so it is dead entry.setDead(); liveAbortTable.remove(entry.getAbortId()); if (entry.getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); } return; } // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != NULL) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } } else { // Never seen any data from this arbitrator so record the first one lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } // Set dead a transaction if we can Transaction transactionToSetDead = liveTransactionByTransactionIdTable.remove(new Pair(entry.getTransactionMachineId(), entry.getTransactionClientLocalSequenceNumber())); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable.remove(transactionToSetDead.getSequenceNumber()); } // Update the last transaction sequence number that the arbitrator arbitrated on Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getTransactionArbitrator()); if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { // Is a valid one if (entry.getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable.put(entry.getTransactionArbitrator(), entry.getTransactionSequenceNumber()); } } } /** * Set dead the transaction part if that transaction is dead and keep track of all new parts */ void processEntry(TransactionPart entry) { // Check if we have already seen this transaction and set it dead OR if it is not alive Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getArbitratorId()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry.getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry.setDead(); return; } // This part is still alive Hashtable, TransactionPart> transactionPart = newTransactionParts.get(entry.getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one transactionPart = new Hashtable, TransactionPart>(); newTransactionParts.put(entry.getMachineId(), transactionPart); } // Update the part and set dead ones we have already seen (got a rescued version) TransactionPart previouslySeenPart = transactionPart.put(entry.getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart.setDead(); } } /** * Process new commit entries and save them for future use. Delete duplicates */ void processEntry(CommitPart entry) { // Update the last transaction that was updated if we can if (entry.getTransactionSequenceNumber() != -1) { Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId()); // Update the last transaction sequence number that the arbitrator arbitrated on if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber()); } } Hashtable, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one commitPart = new Hashtable, CommitPart>(); newCommitParts.put(entry.getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a rescued version) CommitPart previouslySeenPart = commitPart.put(entry.getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart.setDead(); } } /** * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them. * Updates the live aborts, removes those that are dead and sets them dead. * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message. */ void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet) { // We have seen this machine ID machineSet.remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet HashSet watchset = rejectedMessageWatchListTable.get(machineId); // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { // Go through each rejected message that this machine Id has not seen yet for (Iterator rmit = watchset.iterator(); rmit.hasNext(); ) { RejectedMessage rm = rmit.next(); // If this machine Id has seen this rejected message... if (rm.getSequenceNumber() <= seqNum) { // Remove it from our watchlist rmit.remove(); // Decrement machines that need to see this notification rm.removeWatcher(machineId); } } } // Set dead the abort for (Iterator, Abort>> i = liveAbortTable.entrySet().iterator(); i.hasNext();) { Abort abort = i.next().getValue(); if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { abort.setDead(); i.remove(); if (abort.getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); } } } if (machineId == localMachineId) { // Our own messages are immediately dead. if (liveness instanceof LastMessage) { ((LastMessage)liveness).setDead(); } else if (liveness instanceof Slot) { ((Slot)liveness).setDead(); } else { throw new Error("Unrecognized type"); } } // Get the old last message for this device Pair lastMessageEntry = lastMessageTable.put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; } int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); Liveness lastEntry = lastMessageEntry.getSecond(); // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { if (lastEntry instanceof LastMessage) { ((LastMessage)lastEntry).setDead(); } else if (lastEntry instanceof Slot) { ((Slot)lastEntry).setDead(); } else { throw new Error("Unrecognized type"); } } // Make sure the server is not playing any games if (machineId == localMachineId) { if (hadPartialSendToServer) { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); } } else { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); } } } else { if (lastMessageSeqNum > seqNum) { throw new Error("Server Error: Rollback on remote machine sequence number"); } } } /** * Add a rejected message entry to the watch set to keep track of which clients have seen that * rejected message entry and which have not. */ void addWatchList(int64_t machineId, RejectedMessage entry) { HashSet entries = rejectedMessageWatchListTable.get(machineId); if (entries == NULL) { // There is no set for this machine ID yet so create one entries = new HashSet(); rejectedMessageWatchListTable.put(machineId, entries); } entries.add(entry); } /** * Check if the HMAC chain is not violated */ void checkHMACChain(SlotIndexer indexer, Slot[] newSlots) { for (int i = 0; i < newSlots.length; i++) { Slot currSlot = newSlots[i]; Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); if (prevSlot != NULL && !Arrays.equals(prevSlot.getHMAC(), currSlot.getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); } } }