more edits
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "commit.h"
2
3 Commit::Commit() :
4         parts(new HashMap<int32_t, CommitPart *>()),
5         missingParts(NULL),
6         fldisComplete(false),
7         hasLastPart(false),
8         keyValueUpdateSet(new HashSet<KeyValue *>()),
9         isDead(false),
10         sequenceNumber(-1),
11         machineId(-1),
12         transactionSequenceNumber(-1),
13         liveKeys(new Hashset<IoTString *>) {
14 }
15
16
17 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
18         parts(new HashMap<int32_t, CommitPart *>()),
19         missingParts(NULL),
20         fldisComplete(true),
21         hasLastPart(false),
22         keyValueUpdateSet(new HashSet<KeyValue *>()),
23         isDead(false),
24         sequenceNumber(_sequenceNumber),
25         machineId(_machineId),
26         transactionSequenceNumber(_transactionSequenceNumber),
27         liveKeys(new Hashset<IoTString *>) {
28 }
29
30 void Commit::addPartDecode(CommitPart newPart) {
31
32         if (isDead) {
33                 // If dead then just kill this part and move on
34                 newPart.setDead();
35                 return;
36         }
37
38         CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
39         
40         if (previoslySeenPart != NULL) {
41                 // Set dead the old one since the new one is a rescued version of this part
42                 previoslySeenPart.setDead();
43         } else if (newPart.isLastPart()) {
44                 missingParts = new HashSet<Integer>();
45                 hasLastPart = true;
46                 
47                 for (int i = 0; i < newPart.getPartNumber(); i++) {
48                         if (parts.get(i) == NULL) {
49                                 missingParts.add(i);
50                         }
51                 }
52         }
53         
54         if (!fldisComplete && hasLastPart) {
55                 
56                 // We have seen this part so remove it from the set of missing parts
57                 missingParts.remove(newPart.getPartNumber());
58                 
59                 // Check if all the parts have been seen
60                 if (missingParts.size() == 0) {
61                         
62                         // We have all the parts
63                         fldisComplete = true;
64                         
65                         // Decode all the parts and create the key value guard and update sets
66                         decodeCommitData();
67                         
68                         // Get the sequence number and arbitrator of this transaction
69                         sequenceNumber = parts.get(0).getSequenceNumber();
70                         machineId = parts.get(0).getMachineId();
71                         transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
72                 }
73         }
74 }
75
76     int64_t getSequenceNumber() {
77         return sequenceNumber;
78     }
79
80     int64_t getTransactionSequenceNumber() {
81         return transactionSequenceNumber;
82     }
83
84     Map<Integer, CommitPart> getParts() {
85         return parts;
86     }
87
88     void addKV(KeyValue kv) {
89         keyValueUpdateSet.add(kv);
90         liveKeys.add(kv.getKey());
91     }
92
93     void invalidateKey(IoTString key) {
94         liveKeys.remove(key);
95
96         if (liveKeys.size() == 0) {
97             setDead();
98         }
99     }
100
101     Set<KeyValue> getKeyValueUpdateSet() {
102         return keyValueUpdateSet;
103     }
104
105 int32_t getNumberOfParts() {
106         return parts.size();
107 }
108
109     void setDead() {
110         if (isDead) {
111             // Already dead
112             return;
113         }
114
115         // Set dead
116         isDead = true;
117
118         // Make all the parts of this transaction dead
119         for (Integer partNumber : parts.keySet()) {
120             CommitPart part = parts.get(partNumber);
121             part.setDead();
122         }
123     }
124
125     CommitPart getPart(int index) {
126         return parts.get(index);
127     }
128
129     void createCommitParts() {
130
131         parts.clear();
132
133         // Convert to chars
134         char[] charData = convertDataToBytes();
135
136
137         int commitPartCount = 0;
138         int currentPosition = 0;
139         int remaining = charData.length;
140
141         while (remaining > 0) {
142
143             Boolean isLastPart = false;
144             // determine how much to copy
145             int copySize = CommitPart.MAX_NON_HEADER_SIZE;
146             if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
147                 copySize = remaining;
148                 isLastPart = true; // last bit of data so last part
149             }
150
151             // Copy to a smaller version
152             char[] partData = new char[copySize];
153             System.arraycopy(charData, currentPosition, partData, 0, copySize);
154
155             CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
156             parts.put(part.getPartNumber(), part);
157
158             // Update position, count and remaining
159             currentPosition += copySize;
160             commitPartCount++;
161             remaining -= copySize;
162         }
163     }
164
165     void decodeCommitData() {
166
167         // Calculate the size of the data section
168         int dataSize = 0;
169         for (int i = 0; i < parts.keySet().size(); i++) {
170             CommitPart tp = parts.get(i);
171             dataSize += tp.getDataSize();
172         }
173
174         char[] combinedData = new char[dataSize];
175         int currentPosition = 0;
176
177         // Stitch all the data sections together
178         for (int i = 0; i < parts.keySet().size(); i++) {
179             CommitPart tp = parts.get(i);
180             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
181             currentPosition += tp.getDataSize();
182         }
183
184         // Decoder Object
185         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
186
187         // Decode how many key value pairs need to be decoded
188         int numberOfKVUpdates = bbDecode.getInt();
189
190         // Decode all the updates key values
191         for (int i = 0; i < numberOfKVUpdates; i++) {
192             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
193             keyValueUpdateSet.add(kv);
194             liveKeys.add(kv.getKey());
195         }
196     }
197
198     char[] convertDataToBytes() {
199
200         // Calculate the size of the data
201         int sizeOfData = sizeof(int32_t); // Number of Update KV's
202         for (KeyValue kv : keyValueUpdateSet) {
203             sizeOfData += kv.getSize();
204         }
205
206         // Data handlers and storage
207         char[] dataArray = new char[sizeOfData];
208         ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
209
210         // Encode the size of the updates and guard sets
211         bbEncode.putInt(keyValueUpdateSet.size());
212
213         // Encode all the updates
214         for (KeyValue kv : keyValueUpdateSet) {
215             kv.encode(bbEncode);
216         }
217
218         return bbEncode.array();
219     }
220
221     void setKVsMap(Map<IoTString, KeyValue> newKVs) {
222         keyValueUpdateSet.clear();
223         liveKeys.clear();
224
225         keyValueUpdateSet.addAll(newKVs.values());
226         liveKeys.addAll(newKVs.keySet());
227
228     }
229
230
231     static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
232
233         if (older == NULL) {
234             return newer;
235         } else if (newer == NULL) {
236             return older;
237         }
238
239         Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
240         for (KeyValue kv : older.getKeyValueUpdateSet()) {
241             kvSet.put(kv.getKey(), kv);
242         }
243
244         for (KeyValue kv : newer.getKeyValueUpdateSet()) {
245             kvSet.put(kv.getKey(), kv);
246         }
247
248         int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
249
250         if (transactionSequenceNumber == -1) {
251             transactionSequenceNumber = older.getTransactionSequenceNumber();
252         }
253
254         Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
255
256         newCommit.setKVsMap(kvSet);
257
258         return newCommit;
259     }
260 }