X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2Fjava%2Fiotcloud%2FTransaction.java;h=82280c0fb38daab2c7340fb9bcb392ff62ed5879;hp=90ef4d4fc385b269308cbc6d0716da9ece60ad96;hb=8f2cd2d576d466dd791db72a6c54348d69af8541;hpb=a1fd28d4b49b14e9ee970cb6acaeeba3e069d541 diff --git a/version2/src/java/iotcloud/Transaction.java b/version2/src/java/iotcloud/Transaction.java index 90ef4d4..82280c0 100644 --- a/version2/src/java/iotcloud/Transaction.java +++ b/version2/src/java/iotcloud/Transaction.java @@ -1,154 +1,283 @@ package iotcloud; -import java.nio.ByteBuffer; +import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; -import java.util.Map; +import java.nio.ByteBuffer; -class Transaction extends Entry { +class Transaction { - private long seqnum; - private long machineid; - private Set keyValueUpdateSet = null; + private Map parts = null; + private Set missingParts = null; + private List partsPendingSend = null; + private boolean isComplete = false; private Set keyValueGuardSet = null; - private Long arbitrator; + private Set keyValueUpdateSet = null; + private boolean isDead = false; + private long sequenceNumber = -1; + private long clientLocalSequenceNumber = -1; + private long arbitratorId = -1; + private long machineId = -1; + private Pair transactionId = null; + + private int nextPartToSend = 0; + private boolean didSendAPartToServer = false; + + private TransactionStatus transactionStatus = null; + + public Transaction() { + parts = new HashMap(); + keyValueGuardSet = new HashSet(); + keyValueUpdateSet = new HashSet(); + partsPendingSend = new ArrayList(); + } + + public void addPartEncode(TransactionPart newPart) { + parts.put(newPart.getPartNumber(), newPart); + partsPendingSend.add(newPart.getPartNumber()); + + // Get the sequence number and other important information + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + isComplete = true; + } + + public void addPartDecode(TransactionPart newPart) { + + if (isDead) { + // If dead then just kill this part and move on + newPart.setDead(); + return; + } + + // Get the sequence number and other important information + sequenceNumber = newPart.getSequenceNumber(); + arbitratorId = newPart.getArbitratorId(); + transactionId = newPart.getTransactionId(); + clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber(); + machineId = newPart.getMachineId(); + + TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + + if (previoslySeenPart != null) { + // Set dead the old one since the new one is a rescued version of this part + previoslySeenPart.setDead(); + } else if (newPart.isLastPart()) { + missingParts = new HashSet(); - public Transaction(Slot slot, long _seqnum, long _machineid, Long _arbitrator, Set _keyValueUpdateSet, Set _keyValueGuardSet) { - super(slot); - seqnum = _seqnum; - machineid = _machineid; - arbitrator = _arbitrator; - // keyValueUpdateSet = new HashSet(); - // keyValueGuardSet = new HashSet(); + for (int i = 0; i < newPart.getPartNumber(); i++) { + if (parts.get(i) == null) { + missingParts.add(i); + } + } + } - // for (KeyValue kv : _keyValueUpdateSet) { - // KeyValue kvCopy = kv.getCopy(); - // keyValueUpdateSet.add(kvCopy); - // } + if (!isComplete) { - // for (KeyValue kv : _keyValueGuardSet) { - // KeyValue kvCopy = kv.getCopy(); - // keyValueGuardSet.add(kvCopy); - // } + // We have seen this part so remove it from the set of missing parts + missingParts.remove(newPart.getPartNumber()); - keyValueUpdateSet = _keyValueUpdateSet; - keyValueGuardSet = _keyValueGuardSet; + // Check if all the parts have been seen + if (missingParts.size() == 0) { + + // We have all the parts + isComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeTransactionData(); + } + } } - public long getMachineID() { - return machineid; + public void addUpdateKV(KeyValue kv) { + keyValueUpdateSet.add(kv); } - public long getArbitrator() { - return arbitrator; + public void addGuardKV(KeyValue kv) { + keyValueGuardSet.add(kv); } + public long getSequenceNumber() { - return seqnum; + return sequenceNumber; } + public void setSequenceNumber(long _sequenceNumber) { + sequenceNumber = _sequenceNumber; - public Set getkeyValueUpdateSet() { - return keyValueUpdateSet; + for (Integer i : parts.keySet()) { + parts.get(i).setSequenceNumber(sequenceNumber); + } } - public Set getkeyValueGuardSet() { - return keyValueGuardSet; + public long getClientLocalSequenceNumber() { + return clientLocalSequenceNumber; } - public boolean evaluateGuard(Map keyValTableCommitted, Map keyValTableSpeculative) { - for (KeyValue kvGuard : keyValueGuardSet) { + public Map getParts() { + return parts; + } - // First check if the key is in the speculative table, this is the value of the latest assumption - KeyValue kv = null; - // If we have a speculation table then use it first - if (keyValTableSpeculative != null) { - kv = keyValTableSpeculative.get(kvGuard.getKey()); - } - - if (kv == null) { + public boolean didSendAPartToServer() { + return didSendAPartToServer; + } - // if it is not in the speculative table then check the committed table and use that - // value as our latest assumption - kv = keyValTableCommitted.get(kvGuard.getKey()); - } + public void resetNextPartToSend() { + nextPartToSend = 0; + } - if (kvGuard.getValue() != null) { - if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) { - return false; - } - } else { - if (kv != null) { - return false; - } - } + public TransactionPart getNextPartToSend() { + if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) { + return null; } - return true; + TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend)); + nextPartToSend++; + return part; + } + + public void setTransactionStatus(TransactionStatus _transactionStatus) { + transactionStatus = _transactionStatus; } - public byte getType() { - return Entry.TypeTransaction; + public TransactionStatus getTransactionStatus() { + return transactionStatus; } - public int getSize() { - int size = 3 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type - size += Integer.BYTES; // number of KV's - size += Integer.BYTES; // number of Guard KV's + public void removeSentParts(List sentParts) { + nextPartToSend = 0; + partsPendingSend.removeAll(sentParts); + didSendAPartToServer = true; + transactionStatus.setTransactionSequenceNumber(sequenceNumber); + } - // Size of each KV - for (KeyValue kv : keyValueUpdateSet) { - size += kv.getSize(); - } + public boolean didSendAllParts() { + return partsPendingSend.isEmpty(); + } - // Size of each Guard KV - for (KeyValue kv : keyValueGuardSet) { - size += kv.getSize(); - } - return size; + public Set getKeyValueUpdateSet() { + return keyValueUpdateSet; + } + + public int getNumberOfParts() { + return parts.size(); + } + + public long getMachineId() { + return machineId; + } + + public long getArbitrator() { + return arbitratorId; + } + + public boolean isComplete() { + return isComplete; } - public void encode(ByteBuffer bb) { - bb.put(Entry.TypeTransaction); - bb.putLong(seqnum); - bb.putLong(machineid); - bb.putLong(arbitrator); + public Pair getId() { + return transactionId; + } - bb.putInt(keyValueUpdateSet.size()); - for (KeyValue kv : keyValueUpdateSet) { - kv.encode(bb); + public void setDead() { + if (isDead) { + // Already dead + return; } - bb.putInt(keyValueGuardSet.size()); - for (KeyValue kv : keyValueGuardSet) { - kv.encode(bb); + // Set dead + isDead = true; + + // Make all the parts of this transaction dead + for (Integer partNumber : parts.keySet()) { + TransactionPart part = parts.get(partNumber); + part.setDead(); } } - static Entry decode(Slot slot, ByteBuffer bb) { - long seqnum = bb.getLong(); - long machineid = bb.getLong(); - long arbitrator = bb.getLong(); - int numberOfKeys = bb.getInt(); + public TransactionPart getPart(int index) { + return parts.get(index); + } + + private void decodeTransactionData() { - Set kvSetUpdates = new HashSet(); - for (int i = 0; i < numberOfKeys; i++) { - KeyValue kv = KeyValue.decode(bb); - kvSetUpdates.add(kv); + // Calculate the size of the data section + int dataSize = 0; + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + dataSize += tp.getDataSize(); } - int numberOfGuards = bb.getInt(); - Set kvSetGuards = new HashSet(); - for (int i = 0; i < numberOfGuards; i++) { - KeyValue kv = KeyValue.decode(bb); - kvSetGuards.add(kv); + byte[] combinedData = new byte[dataSize]; + int currentPosition = 0; + + // Stitch all the data sections together + for (int i = 0; i < parts.keySet().size(); i++) { + TransactionPart tp = parts.get(i); + System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); + currentPosition += tp.getDataSize(); } - return new Transaction(slot, seqnum, machineid, arbitrator, kvSetUpdates, kvSetGuards); + // Decoder Object + ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); + + // Decode how many key value pairs need to be decoded + int numberOfKVGuards = bbDecode.getInt(); + int numberOfKVUpdates = bbDecode.getInt(); + + // Decode all the guard key values + for (int i = 0; i < numberOfKVGuards; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueGuardSet.add(kv); + } + + // Decode all the updates key values + for (int i = 0; i < numberOfKVUpdates; i++) { + KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); + keyValueUpdateSet.add(kv); + } } - public Entry getCopy(Slot s) { - return new Transaction(s, seqnum, machineid, arbitrator, keyValueUpdateSet, keyValueGuardSet); + public boolean evaluateGuard(Map committedKeyValueTable, Map speculatedKeyValueTable, Map pendingTransactionSpeculatedKeyValueTable) { + for (KeyValue kvGuard : keyValueGuardSet) { + + // First check if the key is in the speculative table, this is the value of the latest assumption + KeyValue kv = null; + + // If we have a speculation table then use it first + if (pendingTransactionSpeculatedKeyValueTable != null) { + kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey()); + } + + // If we have a speculation table then use it first + if ((kv == null) && (speculatedKeyValueTable != null)) { + kv = speculatedKeyValueTable.get(kvGuard.getKey()); + } + + if (kv == null) { + // if it is not in the speculative table then check the committed table and use that + // value as our latest assumption + kv = committedKeyValueTable.get(kvGuard.getKey()); + } + + if (kvGuard.getValue() != null) { + if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) { + return false; + } + } else { + if (kv != null) { + return false; + } + } + } + return true; } } \ No newline at end of file