6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
13 private Map<Integer, TransactionPart> parts = null;
14 private Set<Integer> missingParts = null;
15 private List<Integer> partsPendingSend = null;
16 private boolean isComplete = false;
17 private Set<KeyValue> keyValueGuardSet = null;
18 private Set<KeyValue> keyValueUpdateSet = null;
19 private boolean isDead = false;
20 private long sequenceNumber = -1;
21 private long clientLocalSequenceNumber = -1;
22 private long arbitratorId = -1;
23 private long machineId = -1;
24 private Pair<Long, Long> transactionId = null;
26 private int nextPartToSend = 0;
27 private boolean didSendAPartToServer = false;
29 private TransactionStatus transactionStatus = null;
31 private boolean hadServerFailure = false;
33 public Transaction() {
34 parts = new HashMap<Integer, TransactionPart>();
35 keyValueGuardSet = new HashSet<KeyValue>();
36 keyValueUpdateSet = new HashSet<KeyValue>();
37 partsPendingSend = new ArrayList<Integer>();
40 public void addPartEncode(TransactionPart newPart) {
41 parts.put(newPart.getPartNumber(), newPart);
42 partsPendingSend.add(newPart.getPartNumber());
44 // Get the sequence number and other important information
45 sequenceNumber = newPart.getSequenceNumber();
46 arbitratorId = newPart.getArbitratorId();
47 transactionId = newPart.getTransactionId();
48 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
49 machineId = newPart.getMachineId();
54 public void addPartDecode(TransactionPart newPart) {
57 // If dead then just kill this part and move on
62 // Get the sequence number and other important information
63 sequenceNumber = newPart.getSequenceNumber();
64 arbitratorId = newPart.getArbitratorId();
65 transactionId = newPart.getTransactionId();
66 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
67 machineId = newPart.getMachineId();
69 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
71 if (previoslySeenPart != null) {
72 // Set dead the old one since the new one is a rescued version of this part
73 previoslySeenPart.setDead();
74 } else if (newPart.isLastPart()) {
75 missingParts = new HashSet<Integer>();
77 for (int i = 0; i < newPart.getPartNumber(); i++) {
78 if (parts.get(i) == null) {
86 // We have seen this part so remove it from the set of missing parts
87 missingParts.remove(newPart.getPartNumber());
89 // Check if all the parts have been seen
90 if (missingParts.size() == 0) {
92 // We have all the parts
95 // Decode all the parts and create the key value guard and update sets
96 decodeTransactionData();
101 public void addUpdateKV(KeyValue kv) {
102 keyValueUpdateSet.add(kv);
105 public void addGuardKV(KeyValue kv) {
106 keyValueGuardSet.add(kv);
110 public long getSequenceNumber() {
111 return sequenceNumber;
114 public void setSequenceNumber(long _sequenceNumber) {
115 sequenceNumber = _sequenceNumber;
117 for (Integer i : parts.keySet()) {
118 parts.get(i).setSequenceNumber(sequenceNumber);
122 public long getClientLocalSequenceNumber() {
123 return clientLocalSequenceNumber;
126 public Map<Integer, TransactionPart> getParts() {
130 public boolean didSendAPartToServer() {
131 return didSendAPartToServer;
134 public void resetNextPartToSend() {
138 public TransactionPart getNextPartToSend() {
139 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
142 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
148 public void setServerFailure() {
149 hadServerFailure = true;
152 public boolean getServerFailure() {
153 return hadServerFailure;
157 public void resetServerFailure() {
158 hadServerFailure = false;
162 public void setTransactionStatus(TransactionStatus _transactionStatus) {
163 transactionStatus = _transactionStatus;
166 public TransactionStatus getTransactionStatus() {
167 return transactionStatus;
170 public void removeSentParts(List<Integer> sentParts) {
172 partsPendingSend.removeAll(sentParts);
173 didSendAPartToServer = true;
174 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
177 public boolean didSendAllParts() {
178 return partsPendingSend.isEmpty();
181 public Set<KeyValue> getKeyValueUpdateSet() {
182 return keyValueUpdateSet;
185 public int getNumberOfParts() {
189 public long getMachineId() {
193 public long getArbitrator() {
197 public boolean isComplete() {
201 public Pair<Long, Long> getId() {
202 return transactionId;
205 public void setDead() {
214 // Make all the parts of this transaction dead
215 for (Integer partNumber : parts.keySet()) {
216 TransactionPart part = parts.get(partNumber);
221 public TransactionPart getPart(int index) {
222 return parts.get(index);
225 private void decodeTransactionData() {
227 // Calculate the size of the data section
229 for (int i = 0; i < parts.keySet().size(); i++) {
230 TransactionPart tp = parts.get(i);
231 dataSize += tp.getDataSize();
234 byte[] combinedData = new byte[dataSize];
235 int currentPosition = 0;
237 // Stitch all the data sections together
238 for (int i = 0; i < parts.keySet().size(); i++) {
239 TransactionPart tp = parts.get(i);
240 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
241 currentPosition += tp.getDataSize();
245 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
247 // Decode how many key value pairs need to be decoded
248 int numberOfKVGuards = bbDecode.getInt();
249 int numberOfKVUpdates = bbDecode.getInt();
251 // Decode all the guard key values
252 for (int i = 0; i < numberOfKVGuards; i++) {
253 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
254 keyValueGuardSet.add(kv);
257 // Decode all the updates key values
258 for (int i = 0; i < numberOfKVUpdates; i++) {
259 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
260 keyValueUpdateSet.add(kv);
264 public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
265 for (KeyValue kvGuard : keyValueGuardSet) {
267 // First check if the key is in the speculative table, this is the value of the latest assumption
270 // If we have a speculation table then use it first
271 if (pendingTransactionSpeculatedKeyValueTable != null) {
272 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
275 // If we have a speculation table then use it first
276 if ((kv == null) && (speculatedKeyValueTable != null)) {
277 kv = speculatedKeyValueTable.get(kvGuard.getKey());
281 // if it is not in the speculative table then check the committed table and use that
282 // value as our latest assumption
283 kv = committedKeyValueTable.get(kvGuard.getKey());
286 if (kvGuard.getValue() != null) {
287 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
291 System.out.println(kvGuard.getValue() + " " + kv.getValue());
293 System.out.println(kvGuard.getValue() + " " + kv);
300 System.out.println("kvGuard was nulled: " + kv);