6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
13 private Map<Integer, TransactionPart> parts = null;
14 private Set<Integer> missingParts = null;
15 private List<Integer> partsPendingSend = null;
16 private boolean isComplete = false;
17 private Set<KeyValue> keyValueGuardSet = null;
18 private Set<KeyValue> keyValueUpdateSet = null;
19 private boolean isDead = false;
20 private long sequenceNumber = -1;
21 private long clientLocalSequenceNumber = -1;
22 private long arbitratorId = -1;
23 private long machineId = -1;
24 private Pair<Long, Long> transactionId = null;
26 private int nextPartToSend = 0;
27 private boolean didSendAPartToServer = false;
29 private TransactionStatus transactionStatus = null;
31 public Transaction() {
32 parts = new HashMap<Integer, TransactionPart>();
33 keyValueGuardSet = new HashSet<KeyValue>();
34 keyValueUpdateSet = new HashSet<KeyValue>();
35 partsPendingSend = new ArrayList<Integer>();
38 public void addPartEncode(TransactionPart newPart) {
39 parts.put(newPart.getPartNumber(), newPart);
40 partsPendingSend.add(newPart.getPartNumber());
42 // Get the sequence number and other important information
43 sequenceNumber = newPart.getSequenceNumber();
44 arbitratorId = newPart.getArbitratorId();
45 transactionId = newPart.getTransactionId();
46 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
47 machineId = newPart.getMachineId();
52 public void addPartDecode(TransactionPart newPart) {
55 // If dead then just kill this part and move on
60 // Get the sequence number and other important information
61 sequenceNumber = newPart.getSequenceNumber();
62 arbitratorId = newPart.getArbitratorId();
63 transactionId = newPart.getTransactionId();
64 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
65 machineId = newPart.getMachineId();
67 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
69 if (previoslySeenPart != null) {
70 // Set dead the old one since the new one is a rescued version of this part
71 previoslySeenPart.setDead();
72 } else if (newPart.isLastPart()) {
73 missingParts = new HashSet<Integer>();
75 for (int i = 0; i < newPart.getPartNumber(); i++) {
76 if (parts.get(i) == null) {
84 // We have seen this part so remove it from the set of missing parts
85 missingParts.remove(newPart.getPartNumber());
87 // Check if all the parts have been seen
88 if (missingParts.size() == 0) {
90 // We have all the parts
93 // Decode all the parts and create the key value guard and update sets
94 decodeTransactionData();
99 public void addUpdateKV(KeyValue kv) {
100 keyValueUpdateSet.add(kv);
103 public void addGuardKV(KeyValue kv) {
104 keyValueGuardSet.add(kv);
108 public long getSequenceNumber() {
109 return sequenceNumber;
112 public void setSequenceNumber(long _sequenceNumber) {
113 sequenceNumber = _sequenceNumber;
115 for (Integer i : parts.keySet()) {
116 parts.get(i).setSequenceNumber(sequenceNumber);
120 public long getClientLocalSequenceNumber() {
121 return clientLocalSequenceNumber;
124 public Map<Integer, TransactionPart> getParts() {
128 public boolean didSendAPartToServer() {
129 return didSendAPartToServer;
132 public void resetNextPartToSend() {
136 public TransactionPart getNextPartToSend() {
137 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
140 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
145 public void setTransactionStatus(TransactionStatus _transactionStatus) {
146 transactionStatus = _transactionStatus;
149 public TransactionStatus getTransactionStatus() {
150 return transactionStatus;
153 public void removeSentParts(List<Integer> sentParts) {
155 partsPendingSend.removeAll(sentParts);
156 didSendAPartToServer = true;
157 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
160 public boolean didSendAllParts() {
161 return partsPendingSend.isEmpty();
164 public Set<KeyValue> getKeyValueUpdateSet() {
165 return keyValueUpdateSet;
168 public int getNumberOfParts() {
172 public long getMachineId() {
176 public long getArbitrator() {
180 public boolean isComplete() {
184 public Pair<Long, Long> getId() {
185 return transactionId;
188 public void setDead() {
197 // Make all the parts of this transaction dead
198 for (Integer partNumber : parts.keySet()) {
199 TransactionPart part = parts.get(partNumber);
204 public TransactionPart getPart(int index) {
205 return parts.get(index);
208 private void decodeTransactionData() {
210 // Calculate the size of the data section
212 for (int i = 0; i < parts.keySet().size(); i++) {
213 TransactionPart tp = parts.get(i);
214 dataSize += tp.getDataSize();
217 byte[] combinedData = new byte[dataSize];
218 int currentPosition = 0;
220 // Stitch all the data sections together
221 for (int i = 0; i < parts.keySet().size(); i++) {
222 TransactionPart tp = parts.get(i);
223 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
224 currentPosition += tp.getDataSize();
228 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
230 // Decode how many key value pairs need to be decoded
231 int numberOfKVGuards = bbDecode.getInt();
232 int numberOfKVUpdates = bbDecode.getInt();
234 // Decode all the guard key values
235 for (int i = 0; i < numberOfKVGuards; i++) {
236 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
237 keyValueGuardSet.add(kv);
240 // Decode all the updates key values
241 for (int i = 0; i < numberOfKVUpdates; i++) {
242 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
243 keyValueUpdateSet.add(kv);
247 public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
248 for (KeyValue kvGuard : keyValueGuardSet) {
250 // First check if the key is in the speculative table, this is the value of the latest assumption
253 // If we have a speculation table then use it first
254 if (pendingTransactionSpeculatedKeyValueTable != null) {
255 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
258 // If we have a speculation table then use it first
259 if ((kv == null) && (speculatedKeyValueTable != null)) {
260 kv = speculatedKeyValueTable.get(kvGuard.getKey());
264 // if it is not in the speculative table then check the committed table and use that
265 // value as our latest assumption
266 kv = committedKeyValueTable.get(kvGuard.getKey());
269 if (kvGuard.getValue() != null) {
270 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {