* also initialize the crypto stuff.
*/
public synchronized void initTable() throws ServerException {
- cloud.setSalt(); //Set the salt
+ cloud.initSecurity();
// Create the first insertion into the block chain which is the table status
Slot s = new Slot(this, 1, localMachineId);
}
}
+ updateLiveStateFromLocal();
+
return transactionStatus;
}
// Try to send to the server
ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
- // if (sendSlotsReturn.getSecond()) {
- // System.out.println("Second was true");
- // }
-
-
if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
// Did insert into the block chain
iter.remove();
}
}
+
+ for (Transaction transaction : transactionPartsSent.keySet()) {
- for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetServerFailure();
+
// Update which transactions parts still need to be sent
transaction.removeSentParts(transactionPartsSent.get(transaction));
// Reset which transaction to send
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.resetNextPartToSend();
+ transaction.resetNextPartToSend();
// Set the transaction sequence number back to nothing
if (!transaction.didSendAPartToServer()) {
}
} catch (ServerException e) {
+ System.out.println("Server Failure: " + e.getType());
+
+
if (e.getType() != ServerException.TypeInputTimeout) {
// e.printStackTrace();
// Nothing was able to be sent to the server so just clear these data structures
for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetNextPartToSend();
+
// Set the transaction sequence number back to nothing
if (!transaction.didSendAPartToServer()) {
transaction.setSequenceNumber(-1);
} else {
// There was a partial send to the server
hadPartialSendToServer = true;
+
+ // Nothing was able to be sent to the server so just clear these data structures
+ for (Transaction transaction : transactionPartsSent.keySet()) {
+ transaction.resetNextPartToSend();
+ transaction.setServerFailure();
+ }
}
pendingSendArbitrationEntriesToDelete.clear();
return newKey == null;
}
+ public synchronized boolean updateFromLocal(long machineId) {
+ Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
+ if (localCommunicationInformation == null) {
+ // Cant talk to that device locally so do nothing
+ return false;
+ }
+
+ // Get the size of the send data
+ int sendDataSize = Integer.BYTES + Long.BYTES;
+
+ Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId) != null) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(machineId);
+ }
+
+ byte[] sendData = new byte[sendDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+ // Encode the data
+ bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode.putInt(0);
+
+ // Send by local
+ byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+
+ if (returnData == null) {
+ // Could not contact server
+ return false;
+ }
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+ int numberOfEntries = bbDecode.getInt();
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte type = bbDecode.get();
+ if (type == Entry.TypeAbort) {
+ Abort abort = (Abort)Abort.decode(null, bbDecode);
+ processEntry(abort);
+ } else if (type == Entry.TypeCommitPart) {
+ CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ return true;
+ }
+
private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
// Get the devices local communications
part.encode(bbEncode);
}
-
-
-
-
-
-
-
-
-
-
-
- System.out.println("================================");
- System.out.println("Sending Locally");
- for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
- System.out.println(kv);
- }
-
-
-
-
-
-
-
-
-
-
-
// Send by local
byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
-
- System.out.println("--------------------------------");
- System.out.println();
-
if (returnData == null) {
// Could not contact server
return new Pair<Boolean, Boolean>(true, false);
status.setStatus(TransactionStatus.StatusAborted);
}
} else {
+ TransactionStatus status = transaction.getTransactionStatus();
if (foundAbort) {
- TransactionStatus status = transaction.getTransactionStatus();
status.setStatus(TransactionStatus.StatusAborted);
- return new Pair<Boolean, Boolean>(false, false);
+ } else {
+ status.setStatus(TransactionStatus.StatusCommitted);
}
}
continue;
}
- System.out.println("---");
- for (KeyValue kv : commit.getKeyValueUpdateSet()) {
- System.out.println("Sending Commit Locally: " + kv);
- }
- System.out.println("---");
-
unseenArbitrations.addAll(commit.getParts().values());
for (CommitPart commitPart : commit.getParts().values()) {
}
}
- // Insert as many transactions as possible while keeping order
- for (Transaction transaction : pendingTransactionQueue) {
+ if (pendingTransactionQueue.size() > 0) {
+
+ Transaction transaction = pendingTransactionQueue.get(0);
// Set the transaction sequence number if it has yet to be inserted into the block chain
- if (!transaction.didSendAPartToServer()) {
+ if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
transaction.setSequenceNumber(slot.getSequenceNumber());
}
- boolean ranOutOfSpace = false;
-
while (true) {
TransactionPart part = transaction.getNextPartToSend();
if (slot.hasSpace(part)) {
slot.addEntry(part);
-
List<Integer> partsSent = transactionPartsSent.get(transaction);
if (partsSent == null) {
partsSent = new ArrayList<Integer>();
transactionPartsSent.put(transaction, partsSent);
}
-
partsSent.add(part.getPartNumber());
transactionPartsSent.put(transaction, partsSent);
-
} else {
- ranOutOfSpace = true;
break;
}
}
-
- if (ranOutOfSpace) {
- break;
- }
}
+
+ // // Insert as many transactions as possible while keeping order
+ // for (Transaction transaction : pendingTransactionQueue) {
+
+ // // Set the transaction sequence number if it has yet to be inserted into the block chain
+ // if (!transaction.didSendAPartToServer()) {
+ // transaction.setSequenceNumber(slot.getSequenceNumber());
+ // }
+
+ // boolean ranOutOfSpace = false;
+
+ // while (true) {
+ // TransactionPart part = transaction.getNextPartToSend();
+
+ // if (part == null) {
+ // // Ran out of parts to send for this transaction so move on
+ // break;
+ // }
+
+ // if (slot.hasSpace(part)) {
+ // slot.addEntry(part);
+ // List<Integer> partsSent = transactionPartsSent.get(transaction);
+ // if (partsSent == null) {
+ // partsSent = new ArrayList<Integer>();
+ // transactionPartsSent.put(transaction, partsSent);
+ // }
+ // partsSent.add(part.getPartNumber());
+ // transactionPartsSent.put(transaction, partsSent);
+
+ // for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ // System.out.println("Inserted Into Slot: " + kv);
+ // }
+
+ // ranOutOfSpace = true;
+ // break;
+
+ // } else {
+ // ranOutOfSpace = true;
+ // break;
+ // }
+ // }
+
+ // if (ranOutOfSpace) {
+ // break;
+ // }
+ // }
+
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
// Add that part to the transaction
transaction.addPartDecode(part);
+
+ if (transaction.isComplete()) {
+ for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ System.out.println("Got Live Transaction " + kv + " " + part.getSequenceNumber());
+ }
+ }
}
}
}
}
+
+ for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ System.out.println("Arbitrating on: " + kv);
+ }
+
if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
// Guard evaluated as true
// Update what the last transaction committed was for use in batch commit
lastTransactionCommitted = transaction.getSequenceNumber();
-
} else {
// Guard evaluated was false so create abort
// Insert the abort so we can process
processEntry(newAbort);
+
+ for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ System.out.println("Abort From Server!!!!!! " + kv);
+ }
}
}
}
}
-
if ((newCommit != null) || (generatedAborts.size() > 0)) {
ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
pendingSendArbitrationRounds.add(arbitrationRound);
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
// We've have already seen this from the server
-
- System.out.println("Local Arbitrate Seen Already from server, rejected");
return new Pair<Boolean, Boolean>(false, false);
}
}
*/
private boolean compactArbitrationData() {
-
if (pendingSendArbitrationRounds.size() < 2) {
// Nothing to compact so do nothing
return false;
return false;
}
+ // private boolean compactArbitrationData() {
+ // return false;
+ // }
/**
* Update all the commits and the committed tables, sets dead the dead transactions
- System.out.println("============");
// Update the committed table of keys and which commit is using which key
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
- System.out.println("Committing: " + kv);
committedKeyValueTable.put(kv.getKey(), kv);
liveCommitsByKeyTable.put(kv.getKey(), commit);
}
- System.out.println("--------------");
- System.out.println();
}
}
*/
private void processEntry(Abort entry) {
+
+ if (entry.getTransactionSequenceNumber() != -1) {
+ // update the transaction status if it was sent to the server
+ TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+
// Abort has not been seen by the client it is for yet so we need to keep track of it
Abort previouslySeenAbort = liveAbortTable.put(entry.getAbortId(), entry);
if (previouslySeenAbort != null) {
}
- if (entry.getTransactionSequenceNumber() != -1) {
- // update the transaction status if it was sent to the server
- TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
- if (status != null) {
- status.setStatus(TransactionStatus.StatusAborted);
- }
- }
+
// Update the last arbitration data that we have seen so far
if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
// Is larger
lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
}
-
} else {
// Never seen any data from this arbitrator so record the first one
lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());