- bool lastInsertedNewKey = false;
-
- bool sendToServer(NewKey newKey) throws ServerException {
-
- bool fromRetry = false;
-
- try {
- if (hadPartialSendToServer) {
- Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
- if (newSlots.length == 0) {
- fromRetry = true;
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
-
- if (sendSlotsReturn.getFirst()) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
- newKey = NULL;
- }
- }
-
- for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- transaction.resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
-
- // Update the transaction status
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction.didSendAllParts()) {
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- pendingTransactionQueue.remove(transaction);
- }
- }
- } else {
-
- newSlots = sendSlotsReturn.getThird();
-
- bool isInserted = false;
- for (Slot s : newSlots) {
- if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
- isInserted = true;
- break;
- }
- }
-
- for (Slot s : newSlots) {
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- for (Entry entry : s.getEntries()) {
-
- if (entry.getType() == Entry.TypeLastMessage) {
- LastMessage lastMessage = (LastMessage)entry;
- if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
- }
-
- if (isInserted) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
- newKey = NULL;
- }
- }
-
- for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- transaction.resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
-
- // Update the transaction status
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction.didSendAllParts()) {
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- pendingTransactionQueue.remove(transaction);
- } else {
- transaction.resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
- transaction.setSequenceNumber(-1);
- }
- }
- }
- }
- }
-
- for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- transaction.resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
- transaction.setSequenceNumber(-1);
- }
- }
-
- if (sendSlotsReturn.getThird().length != 0) {
- // insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getThird(), true);
- }
- // continue;
- } else {
- bool isInserted = false;
- for (Slot s : newSlots) {
- if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
- isInserted = true;
- break;
- }
- }
-
- for (Slot s : newSlots) {
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- for (Entry entry : s.getEntries()) {
-
- if (entry.getType() == Entry.TypeLastMessage) {
- LastMessage lastMessage = (LastMessage)entry;
- if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
- }
-
- if (isInserted) {
- if (newKey != NULL) {
- if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
- newKey = NULL;
- }
- }
-
- for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- transaction.resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
-
- // Update the transaction status
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction.didSendAllParts()) {
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- pendingTransactionQueue.remove(transaction);
- } else {
- transaction.resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
- transaction.setSequenceNumber(-1);
- }
- }
- }
- } else {
- for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- transaction.resetServerFailure();
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer()) {
- transaction.setSequenceNumber(-1);
- }
- }
- }
-
- // insert into the local block chain
- validateAndUpdate(newSlots, true);
- }
- }
- } catch (ServerException e) {
- throw e;
- }
-
-
-
- try {
- // While we have stuff that needs inserting into the block chain
- while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != NULL)) {
-
- fromRetry = false;
-
- if (hadPartialSendToServer) {
- throw new Error("Should Be error free");
- }
-
-
-
- // If there is a new key with same name then end
- if ((newKey != NULL) && (arbitratorTable.get(newKey.getKey()) != NULL)) {
- return false;
- }
-
- // Create the slot
- Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
- localSequenceNumber++;
-
- // Try to fill the slot with data
- ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
- bool needsResize = fillSlotsReturn.getFirst();
- int newSize = fillSlotsReturn.getSecond();
- Boolean insertedNewKey = fillSlotsReturn.getThird();
-
- if (needsResize) {
- // Reset which transaction to send
- for (Transaction transaction : transactionPartsSent.keySet()) {
- transaction.resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
- transaction.setSequenceNumber(-1);
- }
- }
-
- // Clear the sent data since we are trying again
- pendingSendArbitrationEntriesToDelete.clear();
- transactionPartsSent.clear();
-
- // We needed a resize so try again
- fillSlot(slot, true, newKey);
- }
-
- lastSlotAttemptedToSend = slot;
- lastIsNewKey = (newKey != NULL);
- lastInsertedNewKey = insertedNewKey;
- lastNewSize = newSize;
- lastNewKey = newKey;
- lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
- lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
-
-
- ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
-
- if (sendSlotsReturn.getFirst()) {
-
- // Did insert into the block chain
-
- if (insertedNewKey) {
- // This slot was what was inserted not a previous slot
-
- // New Key was successfully inserted into the block chain so dont want to insert it again
- newKey = NULL;
- }
-
- // Remove the aborts and commit parts that were sent from the pending to send queue
- for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
- ArbitrationRound round = iter.next();
- round.removeParts(pendingSendArbitrationEntriesToDelete);
-
- if (round.isDoneSending()) {
- // Sent all the parts
- iter.remove();
- }
- }
-
- for (Transaction transaction : transactionPartsSent.keySet()) {
- transaction.resetServerFailure();
-
- // Update which transactions parts still need to be sent
- transaction.removeSentParts(transactionPartsSent.get(transaction));
-
- // Add the transaction status to the outstanding list
- outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
-
- // Update the transaction status
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
-
- // Check if all the transaction parts were successfully sent and if so then remove it from pending
- if (transaction.didSendAllParts()) {
- transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- pendingTransactionQueue.remove(transaction);
- }
- }
- } else {
-
- // if (!sendSlotsReturn.getSecond()) {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
- // }
- // } else {
- // for (Transaction transaction : lastTransactionPartsSent.keySet()) {
- // transaction.resetServerFailure();
-
- // // Update which transactions parts still need to be sent
- // transaction.removeSentParts(transactionPartsSent.get(transaction));
-
- // // Add the transaction status to the outstanding list
- // outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
-
- // // Update the transaction status
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
-
- // // Check if all the transaction parts were successfully sent and if so then remove it from pending
- // if (transaction.didSendAllParts()) {
- // transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
- // pendingTransactionQueue.remove(transaction);
-
- // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- // System.out.println("Sent: " + kv + " from: " + localMachineId + " Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + " Claimed:" + transaction.getSequenceNumber());
- // }
- // }
- // }
- // }
-
- // Reset which transaction to send
- for (Transaction transaction : transactionPartsSent.keySet()) {
- transaction.resetNextPartToSend();
- // transaction.resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
- transaction.setSequenceNumber(-1);
- }
- }
- }
-
- // Clear the sent data in preparation for next send
- pendingSendArbitrationEntriesToDelete.clear();
- transactionPartsSent.clear();
-
- if (sendSlotsReturn.getThird().length != 0) {
- // insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getThird(), true);
- }
- }
-
- } catch (ServerException e) {
-
- if (e.getType() != ServerException.TypeInputTimeout) {
- // e.printStackTrace();
-
- // Nothing was able to be sent to the server so just clear these data structures
- for (Transaction transaction : transactionPartsSent.keySet()) {
- transaction.resetNextPartToSend();
-
- // Set the transaction sequence number back to nothing
- if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
- transaction.setSequenceNumber(-1);
- }
- }
- } else {
- // There was a partial send to the server
- hadPartialSendToServer = true;
-
-
- // if (!fromRetry) {
- // lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
- // lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
- // }
-
- // Nothing was able to be sent to the server so just clear these data structures
- for (Transaction transaction : transactionPartsSent.keySet()) {
- transaction.resetNextPartToSend();
- transaction.setServerFailure();
- }
- }
-
- pendingSendArbitrationEntriesToDelete.clear();
- transactionPartsSent.clear();
-
- throw e;
- }
-
- return newKey == NULL;