Local communication support
[iotcloud.git] / version2 / src / java / iotcloud / Transaction.java
1 package iotcloud;
2
3 import java.util.Map;
4 import java.util.Set;
5 import java.util.List;
6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
10
11 class Transaction {
12
13     private Map<Integer, TransactionPart> parts = null;
14     private Set<Integer> missingParts = null;
15     private List<Integer> partsPendingSend = null;
16     private boolean isComplete = false;
17     private Set<KeyValue> keyValueGuardSet = null;
18     private Set<KeyValue> keyValueUpdateSet = null;
19     private boolean isDead = false;
20     private long sequenceNumber = -1;
21     private long clientLocalSequenceNumber = -1;
22     private long arbitratorId = -1;
23     private long machineId = -1;
24     private Pair<Long, Long> transactionId = null;
25
26     private int nextPartToSend = 0;
27     private boolean didSendAPartToServer = false;
28
29     private TransactionStatus transactionStatus = null;
30
31     public Transaction() {
32         parts = new HashMap<Integer, TransactionPart>();
33         keyValueGuardSet = new HashSet<KeyValue>();
34         keyValueUpdateSet = new HashSet<KeyValue>();
35         partsPendingSend = new ArrayList<Integer>();
36     }
37
38     public void addPartEncode(TransactionPart newPart) {
39         parts.put(newPart.getPartNumber(), newPart);
40         partsPendingSend.add(newPart.getPartNumber());
41
42         // Get the sequence number and other important information
43         sequenceNumber = newPart.getSequenceNumber();
44         arbitratorId = newPart.getArbitratorId();
45         transactionId = newPart.getTransactionId();
46         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
47         machineId = newPart.getMachineId();
48
49         isComplete = true;
50     }
51
52     public void addPartDecode(TransactionPart newPart) {
53
54         if (isDead) {
55             // If dead then just kill this part and move on
56             newPart.setDead();
57             return;
58         }
59
60         // Get the sequence number and other important information
61         sequenceNumber = newPart.getSequenceNumber();
62         arbitratorId = newPart.getArbitratorId();
63         transactionId = newPart.getTransactionId();
64         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
65         machineId = newPart.getMachineId();
66
67         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
68
69         if (previoslySeenPart != null) {
70             // Set dead the old one since the new one is a rescued version of this part
71             previoslySeenPart.setDead();
72         } else if (newPart.isLastPart()) {
73             missingParts = new HashSet<Integer>();
74
75             for (int i = 0; i < newPart.getPartNumber(); i++) {
76                 if (parts.get(i) == null) {
77                     missingParts.add(i);
78                 }
79             }
80         }
81
82         if (!isComplete) {
83
84             // We have seen this part so remove it from the set of missing parts
85             missingParts.remove(newPart.getPartNumber());
86
87             // Check if all the parts have been seen
88             if (missingParts.size() == 0) {
89
90                 // We have all the parts
91                 isComplete = true;
92
93                 // Decode all the parts and create the key value guard and update sets
94                 decodeTransactionData();
95             }
96         }
97     }
98
99     public void addUpdateKV(KeyValue kv) {
100         keyValueUpdateSet.add(kv);
101     }
102
103     public void addGuardKV(KeyValue kv) {
104         keyValueGuardSet.add(kv);
105     }
106
107
108     public long getSequenceNumber() {
109         return sequenceNumber;
110     }
111
112     public void setSequenceNumber(long _sequenceNumber) {
113         sequenceNumber = _sequenceNumber;
114
115         for (Integer i : parts.keySet()) {
116             parts.get(i).setSequenceNumber(sequenceNumber);
117         }
118     }
119
120     public long getClientLocalSequenceNumber() {
121         return clientLocalSequenceNumber;
122     }
123
124     public Map<Integer, TransactionPart> getParts() {
125         return parts;
126     }
127
128     public boolean didSendAPartToServer() {
129         return didSendAPartToServer;
130     }
131
132     public void resetNextPartToSend() {
133         nextPartToSend = 0;
134     }
135
136     public TransactionPart getNextPartToSend() {
137         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
138             return null;
139         }
140         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
141         nextPartToSend++;
142         return part;
143     }
144
145     public void setTransactionStatus(TransactionStatus _transactionStatus) {
146         transactionStatus = _transactionStatus;
147     }
148
149     public TransactionStatus getTransactionStatus() {
150         return transactionStatus;
151     }
152
153     public void removeSentParts(List<Integer> sentParts) {
154         nextPartToSend = 0;
155         partsPendingSend.removeAll(sentParts);
156         didSendAPartToServer = true;
157         transactionStatus.setTransactionSequenceNumber(sequenceNumber);
158     }
159
160     public boolean didSendAllParts() {
161         return partsPendingSend.isEmpty();
162     }
163
164     public Set<KeyValue> getKeyValueUpdateSet() {
165         return keyValueUpdateSet;
166     }
167
168     public int getNumberOfParts() {
169         return parts.size();
170     }
171
172     public long getMachineId() {
173         return machineId;
174     }
175
176     public long getArbitrator() {
177         return arbitratorId;
178     }
179
180     public boolean isComplete() {
181         return isComplete;
182     }
183
184     public Pair<Long, Long> getId() {
185         return transactionId;
186     }
187
188     public void setDead() {
189         if (isDead) {
190             // Already dead
191             return;
192         }
193
194         // Set dead
195         isDead = true;
196
197         // Make all the parts of this transaction dead
198         for (Integer partNumber : parts.keySet()) {
199             TransactionPart part = parts.get(partNumber);
200             part.setDead();
201         }
202     }
203
204     public TransactionPart getPart(int index) {
205         return parts.get(index);
206     }
207
208     private void decodeTransactionData() {
209
210         // Calculate the size of the data section
211         int dataSize = 0;
212         for (int i = 0; i < parts.keySet().size(); i++) {
213             TransactionPart tp = parts.get(i);
214             dataSize += tp.getDataSize();
215         }
216
217         byte[] combinedData = new byte[dataSize];
218         int currentPosition = 0;
219
220         // Stitch all the data sections together
221         for (int i = 0; i < parts.keySet().size(); i++) {
222             TransactionPart tp = parts.get(i);
223             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
224             currentPosition += tp.getDataSize();
225         }
226
227         // Decoder Object
228         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
229
230         // Decode how many key value pairs need to be decoded
231         int numberOfKVGuards = bbDecode.getInt();
232         int numberOfKVUpdates = bbDecode.getInt();
233
234         // Decode all the guard key values
235         for (int i = 0; i < numberOfKVGuards; i++) {
236             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
237             keyValueGuardSet.add(kv);
238         }
239
240         // Decode all the updates key values
241         for (int i = 0; i < numberOfKVUpdates; i++) {
242             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
243             keyValueUpdateSet.add(kv);
244         }
245     }
246
247     public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
248         for (KeyValue kvGuard : keyValueGuardSet) {
249
250             // First check if the key is in the speculative table, this is the value of the latest assumption
251             KeyValue kv = null;
252
253             // If we have a speculation table then use it first
254             if (pendingTransactionSpeculatedKeyValueTable != null) {
255                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
256             }
257
258             // If we have a speculation table then use it first
259             if ((kv == null) && (speculatedKeyValueTable != null)) {
260                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
261             }
262
263             if (kv == null) {
264                 // if it is not in the speculative table then check the committed table and use that
265                 // value as our latest assumption
266                 kv = committedKeyValueTable.get(kvGuard.getKey());
267             }
268
269             if (kvGuard.getValue() != null) {
270                 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
271                     return false;
272                 }
273             } else {
274                 if (kv != null) {
275                     return false;
276                 }
277             }
278         }
279         return true;
280     }
281 }