+ private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+
+ // Get the devices local communications
+ Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
+
+ if (localCommunicationInformation == null) {
+ // Cant talk to that device locally so do nothing
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+
+ // Get the size of the send data
+ int sendDataSize = Integer.BYTES + Long.BYTES;
+ for (TransactionPart part : transaction.getParts().values()) {
+ sendDataSize += part.getSize();
+ }
+
+ Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
+ }
+
+ // Make the send data size
+ byte[] sendData = new byte[sendDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+ // Encode the data
+ bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode.putInt(transaction.getParts().size());
+ for (TransactionPart part : transaction.getParts().values()) {
+ part.encode(bbEncode);
+ }
+
+
+
+
+
+
+
+
+
+
+
+
+ System.out.println("================================");
+ System.out.println("Sending Locally");
+ for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ System.out.println(kv);
+ }
+
+
+
+
+
+
+
+
+
+
+
+ // Send by local
+ byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+
+
+ System.out.println("--------------------------------");
+ System.out.println();
+
+ if (returnData == null) {
+ // Could not contact server
+ return new Pair<Boolean, Boolean>(true, false);
+ }
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+ boolean didCommit = bbDecode.get() == 1;
+ boolean couldArbitrate = bbDecode.get() == 1;
+ int numberOfEntries = bbDecode.getInt();
+ boolean foundAbort = false;
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte type = bbDecode.get();
+ if (type == Entry.TypeAbort) {
+ Abort abort = (Abort)Abort.decode(null, bbDecode);
+
+ if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
+ foundAbort = true;
+ }
+
+ processEntry(abort);
+ } else if (type == Entry.TypeCommitPart) {
+ CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ if (couldArbitrate) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (didCommit) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ } else {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ } else {
+ if (foundAbort) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ status.setStatus(TransactionStatus.StatusAborted);
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+ }
+
+ return new Pair<Boolean, Boolean>(false, true);
+ }
+
+ public synchronized byte[] acceptDataFromLocal(byte[] data) {
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
+ int numberOfParts = bbDecode.getInt();
+
+ // If we did commit a transaction or not
+ boolean didCommit = false;
+ boolean couldArbitrate = false;
+
+ if (numberOfParts != 0) {
+
+ // decode the transaction
+ Transaction transaction = new Transaction();
+ for (int i = 0; i < numberOfParts; i++) {
+ bbDecode.get();
+ TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
+ transaction.addPartDecode(newPart);
+ }
+
+ // Arbitrate on transaction and pull relevant return data
+ Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ couldArbitrate = localArbitrateReturn.getFirst();
+ didCommit = localArbitrateReturn.getSecond();
+
+ updateLiveStateFromLocal();
+
+ // Transaction was sent to the server so keep track of it to prevent double commit
+ if (transaction.getSequenceNumber() != -1) {
+ offlineTransactionsCommittedAndAtServer.add(transaction.getId());
+ }
+ }
+
+ // The data to send back
+ int returnDataSize = 0;
+ List<Entry> unseenArbitrations = new ArrayList<Entry>();
+
+ // Get the aborts to send back
+ List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
+ Collections.sort(abortLocalSequenceNumbers);
+ for (Long localSequenceNumber : abortLocalSequenceNumbers) {
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
+ unseenArbitrations.add(abort);
+ returnDataSize += abort.getSize();
+ }
+
+ // Get the commits to send back
+ Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
+ if (commitForClientTable != null) {
+ List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Collections.sort(commitLocalSequenceNumbers);
+
+ for (Long localSequenceNumber : commitLocalSequenceNumbers) {
+ Commit commit = commitForClientTable.get(localSequenceNumber);
+
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ System.out.println("---");
+ for (KeyValue kv : commit.getKeyValueUpdateSet()) {
+ System.out.println("Sending Commit Locally: " + kv);
+ }
+ System.out.println("---");
+
+ unseenArbitrations.addAll(commit.getParts().values());
+
+ for (CommitPart commitPart : commit.getParts().values()) {
+ returnDataSize += commitPart.getSize();
+ }
+ }
+ }
+
+ // Number of arbitration entries to decode
+ returnDataSize += 2 * Integer.BYTES;
+
+ // Boolean of did commit or not
+ if (numberOfParts != 0) {
+ returnDataSize += Byte.BYTES;
+ }
+
+ // Data to send Back
+ byte[] returnData = new byte[returnDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
+
+ if (numberOfParts != 0) {
+ if (didCommit) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ if (couldArbitrate) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ }
+
+ bbEncode.putInt(unseenArbitrations.size());
+ for (Entry entry : unseenArbitrations) {
+ entry.encode(bbEncode);
+ }
+
+ return returnData;
+ }
+
+ private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey) throws ServerException {