package iotcloud; import java.util.Map; import java.util.Set; import java.util.List; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.nio.ByteBuffer; class Transaction { private Map parts = null; private Set missingParts = null; private List partsPendingSend = null; private boolean isComplete = false; private boolean hasLastPart = false; private Set keyValueGuardSet = null; private Set keyValueUpdateSet = null; private boolean isDead = false; private long sequenceNumber = -1; private long clientLocalSequenceNumber = -1; private long arbitratorId = -1; private long machineId = -1; private Pair transactionId = null; private int nextPartToSend = 0; private boolean didSendAPartToServer = false; private TransactionStatus transactionStatus = null; private boolean hadServerFailure = false; public Transaction() { parts = new HashMap(); keyValueGuardSet = new HashSet(); keyValueUpdateSet = new HashSet(); partsPendingSend = new ArrayList(); } public void addPartEncode(TransactionPart newPart) { parts.put(newPart.getPartNumber(), newPart); partsPendingSend.add(newPart.getPartNumber()); // Get the sequence number and other important information sequenceNumber = newPart.getSequenceNumber(); arbitratorId = newPart.getArbitratorId(); transactionId = newPart.getTransactionId(); clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); machineId = newPart.getMachineId(); isComplete = true; } public void addPartDecode(TransactionPart newPart) { if (isDead) { // If dead then just kill this part and move on newPart.setDead(); return; } // Get the sequence number and other important information sequenceNumber = newPart.getSequenceNumber(); arbitratorId = newPart.getArbitratorId(); transactionId = newPart.getTransactionId(); clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); machineId = newPart.getMachineId(); TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); if (previoslySeenPart != null) { // Set dead the old one since the new one is a rescued version of this part previoslySeenPart.setDead(); } else if (newPart.isLastPart()) { missingParts = new HashSet(); hasLastPart = true; for (int i = 0; i < newPart.getPartNumber(); i++) { if (parts.get(i) == null) { missingParts.add(i); } } } if (!isComplete && hasLastPart) { // We have seen this part so remove it from the set of missing parts missingParts.remove(newPart.getPartNumber()); // Check if all the parts have been seen if (missingParts.size() == 0) { // We have all the parts isComplete = true; // Decode all the parts and create the key value guard and update sets decodeTransactionData(); } } } public void addUpdateKV(KeyValue kv) { keyValueUpdateSet.add(kv); } public void addGuardKV(KeyValue kv) { keyValueGuardSet.add(kv); } public long getSequenceNumber() { return sequenceNumber; } public void setSequenceNumber(long _sequenceNumber) { sequenceNumber = _sequenceNumber; for (Integer i : parts.keySet()) { parts.get(i).setSequenceNumber(sequenceNumber); } } public long getClientLocalSequenceNumber() { return clientLocalSequenceNumber; } public Map getParts() { return parts; } public boolean didSendAPartToServer() { return didSendAPartToServer; } public void resetNextPartToSend() { nextPartToSend = 0; } public TransactionPart getNextPartToSend() { if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { return null; } TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); nextPartToSend++; return part; } public void setServerFailure() { hadServerFailure = true; } public boolean getServerFailure() { return hadServerFailure; } public void resetServerFailure() { hadServerFailure = false; } public void setTransactionStatus(TransactionStatus _transactionStatus) { transactionStatus = _transactionStatus; } public TransactionStatus getTransactionStatus() { return transactionStatus; } public void removeSentParts(List sentParts) { nextPartToSend = 0; if(partsPendingSend.removeAll(sentParts)) { didSendAPartToServer = true; transactionStatus.setTransactionSequenceNumber(sequenceNumber); } } public boolean didSendAllParts() { return partsPendingSend.isEmpty(); } public Set getKeyValueUpdateSet() { return keyValueUpdateSet; } public int getNumberOfParts() { return parts.size(); } public long getMachineId() { return machineId; } public long getArbitrator() { return arbitratorId; } public boolean isComplete() { return isComplete; } public Pair getId() { return transactionId; } public void setDead() { if (isDead) { // Already dead return; } // Set dead isDead = true; // Make all the parts of this transaction dead for (Integer partNumber : parts.keySet()) { TransactionPart part = parts.get(partNumber); part.setDead(); } } public TransactionPart getPart(int index) { return parts.get(index); } private void decodeTransactionData() { // Calculate the size of the data section int dataSize = 0; for (int i = 0; i < parts.keySet().size(); i++) { TransactionPart tp = parts.get(i); dataSize += tp.getDataSize(); } byte[] combinedData = new byte[dataSize]; int currentPosition = 0; // Stitch all the data sections together for (int i = 0; i < parts.keySet().size(); i++) { TransactionPart tp = parts.get(i); System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); currentPosition += tp.getDataSize(); } // Decoder Object ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); // Decode how many key value pairs need to be decoded int numberOfKVGuards = bbDecode.getInt(); int numberOfKVUpdates = bbDecode.getInt(); // Decode all the guard key values for (int i = 0; i < numberOfKVGuards; i++) { KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); keyValueGuardSet.add(kv); } // Decode all the updates key values for (int i = 0; i < numberOfKVUpdates; i++) { KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); keyValueUpdateSet.add(kv); } } public boolean evaluateGuard(Map committedKeyValueTable, Map speculatedKeyValueTable, Map pendingTransactionSpeculatedKeyValueTable) { for (KeyValue kvGuard : keyValueGuardSet) { // First check if the key is in the speculative table, this is the value of the latest assumption KeyValue kv = null; // If we have a speculation table then use it first if (pendingTransactionSpeculatedKeyValueTable != null) { kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); } // If we have a speculation table then use it first if ((kv == null) && (speculatedKeyValueTable != null)) { kv = speculatedKeyValueTable.get(kvGuard.getKey()); } if (kv == null) { // if it is not in the speculative table then check the committed table and use that // value as our latest assumption kv = committedKeyValueTable.get(kvGuard.getKey()); } if (kvGuard.getValue() != null) { if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) { if (kv != null) { System.out.println(kvGuard.getValue() + " " + kv.getValue()); } else { System.out.println(kvGuard.getValue() + " " + kv); } return false; } } else { if (kv != null) { return false; } } } return true; } }