82280c0fb38daab2c7340fb9bcb392ff62ed5879
[iotcloud.git] / version2 / src / java / iotcloud / Transaction.java
1 package iotcloud;
2
3 import java.util.Map;
4 import java.util.Set;
5 import java.util.List;
6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
10
11 class Transaction {
12
13     private Map<Integer, TransactionPart> parts = null;
14     private Set<Integer> missingParts = null;
15     private List<Integer> partsPendingSend = null;
16     private boolean isComplete = false;
17     private Set<KeyValue> keyValueGuardSet = null;
18     private Set<KeyValue> keyValueUpdateSet = null;
19     private boolean isDead = false;
20     private long sequenceNumber = -1;
21     private long clientLocalSequenceNumber = -1;
22     private long arbitratorId = -1;
23     private long machineId = -1;
24     private Pair<Long, Long> transactionId = null;
25
26     private int nextPartToSend = 0;
27     private boolean didSendAPartToServer = false;
28
29     private TransactionStatus transactionStatus = null;
30
31     public Transaction() {
32         parts = new HashMap<Integer, TransactionPart>();
33         keyValueGuardSet = new HashSet<KeyValue>();
34         keyValueUpdateSet = new HashSet<KeyValue>();
35         partsPendingSend = new ArrayList<Integer>();
36     }
37
38     public void addPartEncode(TransactionPart newPart) {
39         parts.put(newPart.getPartNumber(), newPart);
40         partsPendingSend.add(newPart.getPartNumber());
41
42         // Get the sequence number and other important information
43         sequenceNumber = newPart.getSequenceNumber();
44         arbitratorId = newPart.getArbitratorId();
45         transactionId = newPart.getTransactionId();
46         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
47         machineId = newPart.getMachineId();
48
49         isComplete = true;
50     }
51
52     public void addPartDecode(TransactionPart newPart) {
53
54         if (isDead) {
55             // If dead then just kill this part and move on
56             newPart.setDead();
57             return;
58         }
59
60         // Get the sequence number and other important information
61         sequenceNumber = newPart.getSequenceNumber();
62         arbitratorId = newPart.getArbitratorId();
63         transactionId = newPart.getTransactionId();
64         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
65         machineId = newPart.getMachineId();
66
67         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
68
69         if (previoslySeenPart != null) {
70             // Set dead the old one since the new one is a rescued version of this part
71             previoslySeenPart.setDead();
72         } else if (newPart.isLastPart()) {
73             missingParts = new HashSet<Integer>();
74
75             for (int i = 0; i < newPart.getPartNumber(); i++) {
76                 if (parts.get(i) == null) {
77                     missingParts.add(i);
78                 }
79             }
80         }
81
82         if (!isComplete) {
83
84             // We have seen this part so remove it from the set of missing parts
85             missingParts.remove(newPart.getPartNumber());
86
87             // Check if all the parts have been seen
88             if (missingParts.size() == 0) {
89
90                 // We have all the parts
91                 isComplete = true;
92
93                 // Decode all the parts and create the key value guard and update sets
94                 decodeTransactionData();
95             }
96         }
97     }
98
99     public void addUpdateKV(KeyValue kv) {
100         keyValueUpdateSet.add(kv);
101     }
102
103     public void addGuardKV(KeyValue kv) {
104         keyValueGuardSet.add(kv);
105     }
106
107
108     public long getSequenceNumber() {
109         return sequenceNumber;
110     }
111
112     public void setSequenceNumber(long _sequenceNumber) {
113         sequenceNumber = _sequenceNumber;
114
115         for (Integer i : parts.keySet()) {
116             parts.get(i).setSequenceNumber(sequenceNumber);
117         }
118     }
119
120     public long getClientLocalSequenceNumber() {
121         return clientLocalSequenceNumber;
122     }
123
124     public Map<Integer, TransactionPart> getParts() {
125         return parts;
126     }
127
128
129     public boolean didSendAPartToServer() {
130         return didSendAPartToServer;
131     }
132
133     public void resetNextPartToSend() {
134         nextPartToSend = 0;
135     }
136
137     public TransactionPart getNextPartToSend() {
138         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
139             return null;
140         }
141         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
142         nextPartToSend++;
143         return part;
144     }
145
146     public void setTransactionStatus(TransactionStatus _transactionStatus) {
147         transactionStatus = _transactionStatus;
148     }
149
150     public TransactionStatus getTransactionStatus() {
151         return transactionStatus;
152     }
153
154     public void removeSentParts(List<Integer> sentParts) {
155         nextPartToSend = 0;
156         partsPendingSend.removeAll(sentParts);
157         didSendAPartToServer = true;
158         transactionStatus.setTransactionSequenceNumber(sequenceNumber);
159     }
160
161     public boolean didSendAllParts() {
162         return partsPendingSend.isEmpty();
163     }
164
165
166     public Set<KeyValue> getKeyValueUpdateSet() {
167         return keyValueUpdateSet;
168     }
169
170     public int getNumberOfParts() {
171         return parts.size();
172     }
173
174     public long getMachineId() {
175         return machineId;
176     }
177
178     public long getArbitrator() {
179         return arbitratorId;
180     }
181
182     public boolean isComplete() {
183         return isComplete;
184     }
185
186     public Pair<Long, Long> getId() {
187         return transactionId;
188     }
189
190     public void setDead() {
191         if (isDead) {
192             // Already dead
193             return;
194         }
195
196         // Set dead
197         isDead = true;
198
199         // Make all the parts of this transaction dead
200         for (Integer partNumber : parts.keySet()) {
201             TransactionPart part = parts.get(partNumber);
202             part.setDead();
203         }
204     }
205
206     public TransactionPart getPart(int index) {
207         return parts.get(index);
208     }
209
210     private void decodeTransactionData() {
211
212         // Calculate the size of the data section
213         int dataSize = 0;
214         for (int i = 0; i < parts.keySet().size(); i++) {
215             TransactionPart tp = parts.get(i);
216             dataSize += tp.getDataSize();
217         }
218
219         byte[] combinedData = new byte[dataSize];
220         int currentPosition = 0;
221
222         // Stitch all the data sections together
223         for (int i = 0; i < parts.keySet().size(); i++) {
224             TransactionPart tp = parts.get(i);
225             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
226             currentPosition += tp.getDataSize();
227         }
228
229         // Decoder Object
230         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
231
232         // Decode how many key value pairs need to be decoded
233         int numberOfKVGuards = bbDecode.getInt();
234         int numberOfKVUpdates = bbDecode.getInt();
235
236         // Decode all the guard key values
237         for (int i = 0; i < numberOfKVGuards; i++) {
238             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
239             keyValueGuardSet.add(kv);
240         }
241
242         // Decode all the updates key values
243         for (int i = 0; i < numberOfKVUpdates; i++) {
244             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
245             keyValueUpdateSet.add(kv);
246         }
247     }
248
249     public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
250         for (KeyValue kvGuard : keyValueGuardSet) {
251
252             // First check if the key is in the speculative table, this is the value of the latest assumption
253             KeyValue kv = null;
254
255             // If we have a speculation table then use it first
256             if (pendingTransactionSpeculatedKeyValueTable != null) {
257                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
258             }
259
260             // If we have a speculation table then use it first
261             if ((kv == null) && (speculatedKeyValueTable != null)) {
262                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
263             }
264
265             if (kv == null) {
266                 // if it is not in the speculative table then check the committed table and use that
267                 // value as our latest assumption
268                 kv = committedKeyValueTable.get(kvGuard.getKey());
269             }
270
271             if (kvGuard.getValue() != null) {
272                 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
273                     return false;
274                 }
275             } else {
276                 if (kv != null) {
277                     return false;
278                 }
279             }
280         }
281         return true;
282     }
283 }