4 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.nio.ByteBuffer;
11 private Map<Integer, CommitPart> parts = null;
12 private Set<Integer> missingParts = null;
13 private boolean isComplete = false;
14 private boolean hasLastPart = false;
15 private Set<KeyValue> keyValueUpdateSet = null;
16 private boolean isDead = false;
17 private long sequenceNumber = -1;
18 private long machineId = -1;
19 private long transactionSequenceNumber = -1;
21 private Set<IoTString> liveKeys = null;
24 parts = new HashMap<Integer, CommitPart>();
25 keyValueUpdateSet = new HashSet<KeyValue>();
27 liveKeys = new HashSet<IoTString>();
30 public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
31 parts = new HashMap<Integer, CommitPart>();
32 keyValueUpdateSet = new HashSet<KeyValue>();
34 liveKeys = new HashSet<IoTString>();
36 sequenceNumber = _sequenceNumber;
37 machineId = _machineId;
38 transactionSequenceNumber = _transactionSequenceNumber;
43 public void addPartDecode(CommitPart newPart) {
46 // If dead then just kill this part and move on
51 CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
53 if (previoslySeenPart != null) {
54 // Set dead the old one since the new one is a rescued version of this part
55 previoslySeenPart.setDead();
56 } else if (newPart.isLastPart()) {
57 missingParts = new HashSet<Integer>();
60 for (int i = 0; i < newPart.getPartNumber(); i++) {
61 if (parts.get(i) == null) {
67 if (!isComplete && hasLastPart) {
69 // We have seen this part so remove it from the set of missing parts
70 missingParts.remove(newPart.getPartNumber());
72 // Check if all the parts have been seen
73 if (missingParts.size() == 0) {
75 // We have all the parts
78 // Decode all the parts and create the key value guard and update sets
81 // Get the sequence number and arbitrator of this transaction
82 sequenceNumber = parts.get(0).getSequenceNumber();
83 machineId = parts.get(0).getMachineId();
84 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
89 public long getSequenceNumber() {
90 return sequenceNumber;
93 public long getTransactionSequenceNumber() {
94 return transactionSequenceNumber;
97 public Map<Integer, CommitPart> getParts() {
101 public void addKV(KeyValue kv) {
102 keyValueUpdateSet.add(kv);
103 liveKeys.add(kv.getKey());
106 public void invalidateKey(IoTString key) {
107 liveKeys.remove(key);
109 if (liveKeys.size() == 0) {
114 public Set<KeyValue> getKeyValueUpdateSet() {
115 return keyValueUpdateSet;
118 public int getNumberOfParts() {
122 public long getMachineId() {
126 public boolean isComplete() {
130 public boolean isLive() {
134 public void setDead() {
143 // Make all the parts of this transaction dead
144 for (Integer partNumber : parts.keySet()) {
145 CommitPart part = parts.get(partNumber);
150 public CommitPart getPart(int index) {
151 return parts.get(index);
154 public void createCommitParts() {
159 byte[] byteData = convertDataToBytes();
162 int commitPartCount = 0;
163 int currentPosition = 0;
164 int remaining = byteData.length;
166 while (remaining > 0) {
168 Boolean isLastPart = false;
169 // determine how much to copy
170 int copySize = CommitPart.MAX_NON_HEADER_SIZE;
171 if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
172 copySize = remaining;
173 isLastPart = true; // last bit of data so last part
176 // Copy to a smaller version
177 byte[] partData = new byte[copySize];
178 System.arraycopy(byteData, currentPosition, partData, 0, copySize);
180 CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
181 parts.put(part.getPartNumber(), part);
183 // Update position, count and remaining
184 currentPosition += copySize;
186 remaining -= copySize;
190 private void decodeCommitData() {
192 // Calculate the size of the data section
194 for (int i = 0; i < parts.keySet().size(); i++) {
195 CommitPart tp = parts.get(i);
196 dataSize += tp.getDataSize();
199 byte[] combinedData = new byte[dataSize];
200 int currentPosition = 0;
202 // Stitch all the data sections together
203 for (int i = 0; i < parts.keySet().size(); i++) {
204 CommitPart tp = parts.get(i);
205 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
206 currentPosition += tp.getDataSize();
210 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
212 // Decode how many key value pairs need to be decoded
213 int numberOfKVUpdates = bbDecode.getInt();
215 // Decode all the updates key values
216 for (int i = 0; i < numberOfKVUpdates; i++) {
217 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
218 keyValueUpdateSet.add(kv);
219 liveKeys.add(kv.getKey());
223 private byte[] convertDataToBytes() {
225 // Calculate the size of the data
226 int sizeOfData = Integer.BYTES; // Number of Update KV's
227 for (KeyValue kv : keyValueUpdateSet) {
228 sizeOfData += kv.getSize();
231 // Data handlers and storage
232 byte[] dataArray = new byte[sizeOfData];
233 ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
235 // Encode the size of the updates and guard sets
236 bbEncode.putInt(keyValueUpdateSet.size());
238 // Encode all the updates
239 for (KeyValue kv : keyValueUpdateSet) {
243 return bbEncode.array();
246 private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
247 keyValueUpdateSet.clear();
250 keyValueUpdateSet.addAll(newKVs.values());
251 liveKeys.addAll(newKVs.keySet());
256 public static Commit merge(Commit newer, Commit older, long newSequenceNumber) {
260 } else if (newer == null) {
264 Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
265 for (KeyValue kv : older.getKeyValueUpdateSet()) {
266 kvSet.put(kv.getKey(), kv);
269 for (KeyValue kv : newer.getKeyValueUpdateSet()) {
270 kvSet.put(kv.getKey(), kv);
273 long transactionSequenceNumber = newer.getTransactionSequenceNumber();
275 if (transactionSequenceNumber == -1) {
276 transactionSequenceNumber = older.getTransactionSequenceNumber();
279 Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
281 newCommit.setKVsMap(kvSet);