edits
[iotcloud.git] / version2 / src / C / Commit.h
1
2
3 class Commit {
4
5     private Map<Integer, CommitPart> parts = NULL;
6     private Set<Integer> missingParts = NULL;
7     private bool isComplete = false;
8     private bool hasLastPart = false;
9     private Set<KeyValue> keyValueUpdateSet = NULL;
10     private bool isDead = false;
11     private int64_t sequenceNumber = -1;
12     private int64_t machineId = -1;
13     private int64_t transactionSequenceNumber = -1;
14
15     private Set<IoTString> liveKeys = NULL;
16
17     public Commit() {
18         parts = new HashMap<Integer, CommitPart>();
19         keyValueUpdateSet = new HashSet<KeyValue>();
20
21         liveKeys = new HashSet<IoTString>();
22     }
23
24     public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
25         parts = new HashMap<Integer, CommitPart>();
26         keyValueUpdateSet = new HashSet<KeyValue>();
27
28         liveKeys = new HashSet<IoTString>();
29
30         sequenceNumber = _sequenceNumber;
31         machineId = _machineId;
32         transactionSequenceNumber = _transactionSequenceNumber;
33         isComplete = true;
34     }
35
36
37     public void addPartDecode(CommitPart newPart) {
38
39         if (isDead) {
40             // If dead then just kill this part and move on
41             newPart.setDead();
42             return;
43         }
44
45         CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
46
47         if (previoslySeenPart != NULL) {
48             // Set dead the old one since the new one is a rescued version of this part
49             previoslySeenPart.setDead();
50         } else if (newPart.isLastPart()) {
51             missingParts = new HashSet<Integer>();
52             hasLastPart = true;
53
54             for (int i = 0; i < newPart.getPartNumber(); i++) {
55                 if (parts.get(i) == NULL) {
56                     missingParts.add(i);
57                 }
58             }
59         }
60
61         if (!isComplete && hasLastPart) {
62
63             // We have seen this part so remove it from the set of missing parts
64             missingParts.remove(newPart.getPartNumber());
65
66             // Check if all the parts have been seen
67             if (missingParts.size() == 0) {
68
69                 // We have all the parts
70                 isComplete = true;
71
72                 // Decode all the parts and create the key value guard and update sets
73                 decodeCommitData();
74
75                 // Get the sequence number and arbitrator of this transaction
76                 sequenceNumber = parts.get(0).getSequenceNumber();
77                 machineId = parts.get(0).getMachineId();
78                 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
79             }
80         }
81     }
82
83     public int64_t getSequenceNumber() {
84         return sequenceNumber;
85     }
86
87     public int64_t getTransactionSequenceNumber() {
88         return transactionSequenceNumber;
89     }
90
91     public Map<Integer, CommitPart> getParts() {
92         return parts;
93     }
94
95     public void addKV(KeyValue kv) {
96         keyValueUpdateSet.add(kv);
97         liveKeys.add(kv.getKey());
98     }
99
100     public void invalidateKey(IoTString key) {
101         liveKeys.remove(key);
102
103         if (liveKeys.size() == 0) {
104             setDead();
105         }
106     }
107
108     public Set<KeyValue> getKeyValueUpdateSet() {
109         return keyValueUpdateSet;
110     }
111
112     public int getNumberOfParts() {
113         return parts.size();
114     }
115
116     public int64_t getMachineId() {
117         return machineId;
118     }
119
120     public bool isComplete() {
121         return isComplete;
122     }
123
124     public bool isLive() {
125         return !isDead;
126     }
127
128     public void setDead() {
129         if (isDead) {
130             // Already dead
131             return;
132         }
133
134         // Set dead
135         isDead = true;
136
137         // Make all the parts of this transaction dead
138         for (Integer partNumber : parts.keySet()) {
139             CommitPart part = parts.get(partNumber);
140             part.setDead();
141         }
142     }
143
144     public CommitPart getPart(int index) {
145         return parts.get(index);
146     }
147
148     public void createCommitParts() {
149
150         parts.clear();
151
152         // Convert to chars
153         char[] charData = convertDataToBytes();
154
155
156         int commitPartCount = 0;
157         int currentPosition = 0;
158         int remaining = charData.length;
159
160         while (remaining > 0) {
161
162             Boolean isLastPart = false;
163             // determine how much to copy
164             int copySize = CommitPart.MAX_NON_HEADER_SIZE;
165             if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
166                 copySize = remaining;
167                 isLastPart = true; // last bit of data so last part
168             }
169
170             // Copy to a smaller version
171             char[] partData = new char[copySize];
172             System.arraycopy(charData, currentPosition, partData, 0, copySize);
173
174             CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
175             parts.put(part.getPartNumber(), part);
176
177             // Update position, count and remaining
178             currentPosition += copySize;
179             commitPartCount++;
180             remaining -= copySize;
181         }
182     }
183
184     private void decodeCommitData() {
185
186         // Calculate the size of the data section
187         int dataSize = 0;
188         for (int i = 0; i < parts.keySet().size(); i++) {
189             CommitPart tp = parts.get(i);
190             dataSize += tp.getDataSize();
191         }
192
193         char[] combinedData = new char[dataSize];
194         int currentPosition = 0;
195
196         // Stitch all the data sections together
197         for (int i = 0; i < parts.keySet().size(); i++) {
198             CommitPart tp = parts.get(i);
199             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
200             currentPosition += tp.getDataSize();
201         }
202
203         // Decoder Object
204         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
205
206         // Decode how many key value pairs need to be decoded
207         int numberOfKVUpdates = bbDecode.getInt();
208
209         // Decode all the updates key values
210         for (int i = 0; i < numberOfKVUpdates; i++) {
211             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
212             keyValueUpdateSet.add(kv);
213             liveKeys.add(kv.getKey());
214         }
215     }
216
217     private char[] convertDataToBytes() {
218
219         // Calculate the size of the data
220         int sizeOfData = sizeof(int32_t); // Number of Update KV's
221         for (KeyValue kv : keyValueUpdateSet) {
222             sizeOfData += kv.getSize();
223         }
224
225         // Data handlers and storage
226         char[] dataArray = new char[sizeOfData];
227         ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
228
229         // Encode the size of the updates and guard sets
230         bbEncode.putInt(keyValueUpdateSet.size());
231
232         // Encode all the updates
233         for (KeyValue kv : keyValueUpdateSet) {
234             kv.encode(bbEncode);
235         }
236
237         return bbEncode.array();
238     }
239
240     private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
241         keyValueUpdateSet.clear();
242         liveKeys.clear();
243
244         keyValueUpdateSet.addAll(newKVs.values());
245         liveKeys.addAll(newKVs.keySet());
246
247     }
248
249
250     public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
251
252         if (older == NULL) {
253             return newer;
254         } else if (newer == NULL) {
255             return older;
256         }
257
258         Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
259         for (KeyValue kv : older.getKeyValueUpdateSet()) {
260             kvSet.put(kv.getKey(), kv);
261         }
262
263         for (KeyValue kv : newer.getKeyValueUpdateSet()) {
264             kvSet.put(kv.getKey(), kv);
265         }
266
267         int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
268
269         if (transactionSequenceNumber == -1) {
270             transactionSequenceNumber = older.getTransactionSequenceNumber();
271         }
272
273         Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
274
275         newCommit.setKVsMap(kvSet);
276
277         return newCommit;
278     }
279 }