Local communication working
[iotcloud.git] / version2 / src / java / iotcloud / Transaction.java
1 package iotcloud;
2
3 import java.util.Map;
4 import java.util.Set;
5 import java.util.List;
6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
10
11 class Transaction {
12
13     private Map<Integer, TransactionPart> parts = null;
14     private Set<Integer> missingParts = null;
15     private List<Integer> partsPendingSend = null;
16     private boolean isComplete = false;
17     private Set<KeyValue> keyValueGuardSet = null;
18     private Set<KeyValue> keyValueUpdateSet = null;
19     private boolean isDead = false;
20     private long sequenceNumber = -1;
21     private long clientLocalSequenceNumber = -1;
22     private long arbitratorId = -1;
23     private long machineId = -1;
24     private Pair<Long, Long> transactionId = null;
25
26     private int nextPartToSend = 0;
27     private boolean didSendAPartToServer = false;
28
29     private TransactionStatus transactionStatus = null;
30
31     private boolean hadServerFailure = false;
32
33     public Transaction() {
34         parts = new HashMap<Integer, TransactionPart>();
35         keyValueGuardSet = new HashSet<KeyValue>();
36         keyValueUpdateSet = new HashSet<KeyValue>();
37         partsPendingSend = new ArrayList<Integer>();
38     }
39
40     public void addPartEncode(TransactionPart newPart) {
41         parts.put(newPart.getPartNumber(), newPart);
42         partsPendingSend.add(newPart.getPartNumber());
43
44         // Get the sequence number and other important information
45         sequenceNumber = newPart.getSequenceNumber();
46         arbitratorId = newPart.getArbitratorId();
47         transactionId = newPart.getTransactionId();
48         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
49         machineId = newPart.getMachineId();
50
51         isComplete = true;
52     }
53
54     public void addPartDecode(TransactionPart newPart) {
55
56         if (isDead) {
57             // If dead then just kill this part and move on
58             newPart.setDead();
59             return;
60         }
61
62         // Get the sequence number and other important information
63         sequenceNumber = newPart.getSequenceNumber();
64         arbitratorId = newPart.getArbitratorId();
65         transactionId = newPart.getTransactionId();
66         clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
67         machineId = newPart.getMachineId();
68
69         TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
70
71         if (previoslySeenPart != null) {
72             // Set dead the old one since the new one is a rescued version of this part
73             previoslySeenPart.setDead();
74         } else if (newPart.isLastPart()) {
75             missingParts = new HashSet<Integer>();
76
77             for (int i = 0; i < newPart.getPartNumber(); i++) {
78                 if (parts.get(i) == null) {
79                     missingParts.add(i);
80                 }
81             }
82         }
83
84         if (!isComplete) {
85
86             // We have seen this part so remove it from the set of missing parts
87             missingParts.remove(newPart.getPartNumber());
88
89             // Check if all the parts have been seen
90             if (missingParts.size() == 0) {
91
92                 // We have all the parts
93                 isComplete = true;
94
95                 // Decode all the parts and create the key value guard and update sets
96                 decodeTransactionData();
97             }
98         }
99     }
100
101     public void addUpdateKV(KeyValue kv) {
102         keyValueUpdateSet.add(kv);
103     }
104
105     public void addGuardKV(KeyValue kv) {
106         keyValueGuardSet.add(kv);
107     }
108
109
110     public long getSequenceNumber() {
111         return sequenceNumber;
112     }
113
114     public void setSequenceNumber(long _sequenceNumber) {
115         sequenceNumber = _sequenceNumber;
116
117         for (Integer i : parts.keySet()) {
118             parts.get(i).setSequenceNumber(sequenceNumber);
119         }
120     }
121
122     public long getClientLocalSequenceNumber() {
123         return clientLocalSequenceNumber;
124     }
125
126     public Map<Integer, TransactionPart> getParts() {
127         return parts;
128     }
129
130     public boolean didSendAPartToServer() {
131         return didSendAPartToServer;
132     }
133
134     public void resetNextPartToSend() {
135         nextPartToSend = 0;
136     }
137
138     public TransactionPart getNextPartToSend() {
139         if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
140             return null;
141         }
142         TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
143         nextPartToSend++;
144         return part;
145     }
146
147
148     public void setServerFailure() {
149         hadServerFailure = true;
150     }
151
152     public boolean getServerFailure() {
153         return hadServerFailure;
154     }
155
156
157     public void resetServerFailure() {
158         hadServerFailure = false;
159     }
160
161
162     public void setTransactionStatus(TransactionStatus _transactionStatus) {
163         transactionStatus = _transactionStatus;
164     }
165
166     public TransactionStatus getTransactionStatus() {
167         return transactionStatus;
168     }
169
170     public void removeSentParts(List<Integer> sentParts) {
171         nextPartToSend = 0;
172         partsPendingSend.removeAll(sentParts);
173         didSendAPartToServer = true;
174         transactionStatus.setTransactionSequenceNumber(sequenceNumber);
175     }
176
177     public boolean didSendAllParts() {
178         return partsPendingSend.isEmpty();
179     }
180
181     public Set<KeyValue> getKeyValueUpdateSet() {
182         return keyValueUpdateSet;
183     }
184
185     public int getNumberOfParts() {
186         return parts.size();
187     }
188
189     public long getMachineId() {
190         return machineId;
191     }
192
193     public long getArbitrator() {
194         return arbitratorId;
195     }
196
197     public boolean isComplete() {
198         return isComplete;
199     }
200
201     public Pair<Long, Long> getId() {
202         return transactionId;
203     }
204
205     public void setDead() {
206         if (isDead) {
207             // Already dead
208             return;
209         }
210
211         // Set dead
212         isDead = true;
213
214         // Make all the parts of this transaction dead
215         for (Integer partNumber : parts.keySet()) {
216             TransactionPart part = parts.get(partNumber);
217             part.setDead();
218         }
219     }
220
221     public TransactionPart getPart(int index) {
222         return parts.get(index);
223     }
224
225     private void decodeTransactionData() {
226
227         // Calculate the size of the data section
228         int dataSize = 0;
229         for (int i = 0; i < parts.keySet().size(); i++) {
230             TransactionPart tp = parts.get(i);
231             dataSize += tp.getDataSize();
232         }
233
234         byte[] combinedData = new byte[dataSize];
235         int currentPosition = 0;
236
237         // Stitch all the data sections together
238         for (int i = 0; i < parts.keySet().size(); i++) {
239             TransactionPart tp = parts.get(i);
240             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
241             currentPosition += tp.getDataSize();
242         }
243
244         // Decoder Object
245         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
246
247         // Decode how many key value pairs need to be decoded
248         int numberOfKVGuards = bbDecode.getInt();
249         int numberOfKVUpdates = bbDecode.getInt();
250
251         // Decode all the guard key values
252         for (int i = 0; i < numberOfKVGuards; i++) {
253             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
254             keyValueGuardSet.add(kv);
255         }
256
257         // Decode all the updates key values
258         for (int i = 0; i < numberOfKVUpdates; i++) {
259             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
260             keyValueUpdateSet.add(kv);
261         }
262     }
263
264     public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
265         for (KeyValue kvGuard : keyValueGuardSet) {
266
267             // First check if the key is in the speculative table, this is the value of the latest assumption
268             KeyValue kv = null;
269
270             // If we have a speculation table then use it first
271             if (pendingTransactionSpeculatedKeyValueTable != null) {
272                 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
273             }
274
275             // If we have a speculation table then use it first
276             if ((kv == null) && (speculatedKeyValueTable != null)) {
277                 kv = speculatedKeyValueTable.get(kvGuard.getKey());
278             }
279
280             if (kv == null) {
281                 // if it is not in the speculative table then check the committed table and use that
282                 // value as our latest assumption
283                 kv = committedKeyValueTable.get(kvGuard.getKey());
284             }
285
286             if (kvGuard.getValue() != null) {
287                 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
288
289
290                     if (kv != null) {
291                         System.out.println(kvGuard.getValue() + "       " + kv.getValue());
292                     } else {
293                         System.out.println(kvGuard.getValue() + "       " + kv);
294                     }
295
296                     return false;
297                 }
298             } else {
299                 if (kv != null) {
300                     System.out.println("kvGuard was nulled:  " + kv);
301                     return false;
302                 }
303             }
304         }
305         return true;
306     }
307 }