Changes
[iotcloud.git] / version2 / src / java / iotcloud / Commit.java
1 package iotcloud;
2
3 import java.util.Map;
4 import java.util.HashMap;
5 import java.util.Set;
6 import java.util.HashSet;
7 import java.nio.ByteBuffer;
8
9 class Commit {
10
11     private Map<Integer, CommitPart> parts = null;
12     private Set<Integer> missingParts = null;
13     private boolean isComplete = false;
14     private boolean hasLastPart = false;
15     private Set<KeyValue> keyValueUpdateSet = null;
16     private boolean isDead = false;
17     private long sequenceNumber = -1;
18     private long machineId = -1;
19     private long transactionSequenceNumber = -1;
20
21     private Set<IoTString> liveKeys = null;
22
23     public Commit() {
24         parts = new HashMap<Integer, CommitPart>();
25         keyValueUpdateSet = new HashSet<KeyValue>();
26
27         liveKeys = new HashSet<IoTString>();
28     }
29
30     public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
31         parts = new HashMap<Integer, CommitPart>();
32         keyValueUpdateSet = new HashSet<KeyValue>();
33
34         liveKeys = new HashSet<IoTString>();
35
36         sequenceNumber = _sequenceNumber;
37         machineId = _machineId;
38         transactionSequenceNumber = _transactionSequenceNumber;
39         isComplete = true;
40     }
41
42
43     public void addPartDecode(CommitPart newPart) {
44
45         if (isDead) {
46             // If dead then just kill this part and move on
47             newPart.setDead();
48             return;
49         }
50
51         CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
52
53         if (previoslySeenPart != null) {
54             // Set dead the old one since the new one is a rescued version of this part
55             previoslySeenPart.setDead();
56         } else if (newPart.isLastPart()) {
57             missingParts = new HashSet<Integer>();
58             hasLastPart = true;
59
60             for (int i = 0; i < newPart.getPartNumber(); i++) {
61                 if (parts.get(i) == null) {
62                     missingParts.add(i);
63                 }
64             }
65         }
66
67         if (!isComplete && hasLastPart) {
68
69             // We have seen this part so remove it from the set of missing parts
70             missingParts.remove(newPart.getPartNumber());
71
72             // Check if all the parts have been seen
73             if (missingParts.size() == 0) {
74
75                 // We have all the parts
76                 isComplete = true;
77
78                 // Decode all the parts and create the key value guard and update sets
79                 decodeCommitData();
80
81                 // Get the sequence number and arbitrator of this transaction
82                 sequenceNumber = parts.get(0).getSequenceNumber();
83                 machineId = parts.get(0).getMachineId();
84                 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
85             }
86         }
87     }
88
89     public long getSequenceNumber() {
90         return sequenceNumber;
91     }
92
93     public long getTransactionSequenceNumber() {
94         return transactionSequenceNumber;
95     }
96
97     public Map<Integer, CommitPart> getParts() {
98         return parts;
99     }
100
101     public void addKV(KeyValue kv) {
102         keyValueUpdateSet.add(kv);
103         liveKeys.add(kv.getKey());
104     }
105
106     public void invalidateKey(IoTString key) {
107         liveKeys.remove(key);
108
109         if (liveKeys.size() == 0) {
110             setDead();
111         }
112     }
113
114     public Set<KeyValue> getKeyValueUpdateSet() {
115         return keyValueUpdateSet;
116     }
117
118     public int getNumberOfParts() {
119         return parts.size();
120     }
121
122     public long getMachineId() {
123         return machineId;
124     }
125
126     public boolean isComplete() {
127         return isComplete;
128     }
129
130     public boolean isLive() {
131         return !isDead;
132     }
133
134     public void setDead() {
135         if (isDead) {
136             // Already dead
137             return;
138         }
139
140         // Set dead
141         isDead = true;
142
143         // Make all the parts of this transaction dead
144         for (Integer partNumber : parts.keySet()) {
145             CommitPart part = parts.get(partNumber);
146             part.setDead();
147         }
148     }
149
150     public CommitPart getPart(int index) {
151         return parts.get(index);
152     }
153
154     public void createCommitParts() {
155
156         parts.clear();
157
158         // Convert to bytes
159         byte[] byteData = convertDataToBytes();
160
161
162         int commitPartCount = 0;
163         int currentPosition = 0;
164         int remaining = byteData.length;
165
166         while (remaining > 0) {
167
168             Boolean isLastPart = false;
169             // determine how much to copy
170             int copySize = CommitPart.MAX_NON_HEADER_SIZE;
171             if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
172                 copySize = remaining;
173                 isLastPart = true; // last bit of data so last part
174             }
175
176             // Copy to a smaller version
177             byte[] partData = new byte[copySize];
178             System.arraycopy(byteData, currentPosition, partData, 0, copySize);
179
180             CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
181             parts.put(part.getPartNumber(), part);
182
183             // Update position, count and remaining
184             currentPosition += copySize;
185             commitPartCount++;
186             remaining -= copySize;
187         }
188     }
189
190     private void decodeCommitData() {
191
192         // Calculate the size of the data section
193         int dataSize = 0;
194         for (int i = 0; i < parts.keySet().size(); i++) {
195             CommitPart tp = parts.get(i);
196             dataSize += tp.getDataSize();
197         }
198
199         byte[] combinedData = new byte[dataSize];
200         int currentPosition = 0;
201
202         // Stitch all the data sections together
203         for (int i = 0; i < parts.keySet().size(); i++) {
204             CommitPart tp = parts.get(i);
205             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
206             currentPosition += tp.getDataSize();
207         }
208
209         // Decoder Object
210         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
211
212         // Decode how many key value pairs need to be decoded
213         int numberOfKVUpdates = bbDecode.getInt();
214
215         // Decode all the updates key values
216         for (int i = 0; i < numberOfKVUpdates; i++) {
217             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
218             keyValueUpdateSet.add(kv);
219             liveKeys.add(kv.getKey());
220         }
221     }
222
223     private byte[] convertDataToBytes() {
224
225         // Calculate the size of the data
226         int sizeOfData = Integer.BYTES; // Number of Update KV's
227         for (KeyValue kv : keyValueUpdateSet) {
228             sizeOfData += kv.getSize();
229         }
230
231         // Data handlers and storage
232         byte[] dataArray = new byte[sizeOfData];
233         ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
234
235         // Encode the size of the updates and guard sets
236         bbEncode.putInt(keyValueUpdateSet.size());
237
238         // Encode all the updates
239         for (KeyValue kv : keyValueUpdateSet) {
240             kv.encode(bbEncode);
241         }
242
243         return bbEncode.array();
244     }
245
246     private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
247         keyValueUpdateSet.clear();
248         liveKeys.clear();
249
250         keyValueUpdateSet.addAll(newKVs.values());
251         liveKeys.addAll(newKVs.keySet());
252
253     }
254
255
256     public static Commit merge(Commit newer, Commit older, long newSequenceNumber) {
257
258         if (older == null) {
259             return newer;
260         } else if (newer == null) {
261             return older;
262         }
263
264         Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
265         for (KeyValue kv : older.getKeyValueUpdateSet()) {
266             kvSet.put(kv.getKey(), kv);
267         }
268
269         for (KeyValue kv : newer.getKeyValueUpdateSet()) {
270             kvSet.put(kv.getKey(), kv);
271         }
272
273         long transactionSequenceNumber = newer.getTransactionSequenceNumber();
274
275         if (transactionSequenceNumber == -1) {
276             transactionSequenceNumber = older.getTransactionSequenceNumber();
277         }
278
279         Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
280
281         newCommit.setKVsMap(kvSet);
282
283         return newCommit;
284     }
285 }