package iotcloud;
-import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
-import java.util.Iterator;
+import java.nio.ByteBuffer;
+
+class Commit {
+
+ private Map<Integer, CommitPart> parts = null;
+ private Set<Integer> missingParts = null;
+ private boolean isComplete = false;
+ private boolean hasLastPart = false;
+ private Set<KeyValue> keyValueUpdateSet = null;
+ private boolean isDead = false;
+ private long sequenceNumber = -1;
+ private long machineId = -1;
+ private long transactionSequenceNumber = -1;
+
+ private Set<IoTString> liveKeys = null;
+
+ public Commit() {
+ parts = new HashMap<Integer, CommitPart>();
+ keyValueUpdateSet = new HashSet<KeyValue>();
+
+ liveKeys = new HashSet<IoTString>();
+ }
+
+ public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
+ parts = new HashMap<Integer, CommitPart>();
+ keyValueUpdateSet = new HashSet<KeyValue>();
+
+ liveKeys = new HashSet<IoTString>();
+
+ sequenceNumber = _sequenceNumber;
+ machineId = _machineId;
+ transactionSequenceNumber = _transactionSequenceNumber;
+ isComplete = true;
+ }
+
-/**
- * This Entry records the commit of a transaction.
- * @author Ali Younis <ayounis@uci.edu>
- * @version 1.0
- */
+ public void addPartDecode(CommitPart newPart) {
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart.setDead();
+ return;
+ }
-class Commit extends Entry {
- private long seqnumtrans;
- private long seqnumcommit;
- private long transarbitrator;
-
- private Set<KeyValue> keyValueUpdateSet = null;
+ CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+ if (previoslySeenPart != null) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previoslySeenPart.setDead();
+ } else if (newPart.isLastPart()) {
+ missingParts = new HashSet<Integer>();
+ hasLastPart = true;
- public Commit(Slot slot, long _seqnumtrans, long _seqnumcommit, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
- super(slot);
- seqnumtrans = _seqnumtrans;
- seqnumcommit = _seqnumcommit;
- transarbitrator = _transarbitrator;
+ for (int i = 0; i < newPart.getPartNumber(); i++) {
+ if (parts.get(i) == null) {
+ missingParts.add(i);
+ }
+ }
+ }
- keyValueUpdateSet = new HashSet<KeyValue>();
+ if (!isComplete && hasLastPart) {
- for (KeyValue kv : _keyValueUpdateSet) {
- KeyValue kvCopy = kv.getCopy();
- keyValueUpdateSet.add(kvCopy);
- }
- }
+ // We have seen this part so remove it from the set of missing parts
+ missingParts.remove(newPart.getPartNumber());
- public long getTransSequenceNumber() {
- return seqnumtrans;
- }
- public long getSequenceNumber() {
- return seqnumcommit;
- }
+ // Check if all the parts have been seen
+ if (missingParts.size() == 0) {
- public long getTransArbitrator() {
- return transarbitrator;
- }
+ // We have all the parts
+ isComplete = true;
- public Set<KeyValue> getkeyValueUpdateSet() {
- return keyValueUpdateSet;
- }
+ // Decode all the parts and create the key value guard and update sets
+ decodeCommitData();
- public byte getType() {
- return Entry.TypeCommit;
- }
+ // Get the sequence number and arbitrator of this transaction
+ sequenceNumber = parts.get(0).getSequenceNumber();
+ machineId = parts.get(0).getMachineId();
+ transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
+ }
+ }
+ }
- public int getSize() {
- int size = 3 * Long.BYTES + Byte.BYTES; // seq id, entry type
- size += Integer.BYTES; // number of KV's
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
- // Size of each KV
- for (KeyValue kv : keyValueUpdateSet) {
- size += kv.getSize();
- }
+ public long getTransactionSequenceNumber() {
+ return transactionSequenceNumber;
+ }
- return size;
- }
- static Entry decode(Slot slot, ByteBuffer bb) {
- long seqnumtrans = bb.getLong();
- long seqnumcommit = bb.getLong();
- long transarbitrator = bb.getLong();
- int numberOfKeys = bb.getInt();
+ public Map<Integer, CommitPart> getParts() {
+ return parts;
+ }
- Set<KeyValue> kvSet = new HashSet<KeyValue>();
- for (int i = 0; i < numberOfKeys; i++) {
- KeyValue kv = KeyValue.decode(bb);
- kvSet.add(kv);
- }
+ public void addKV(KeyValue kv) {
+ keyValueUpdateSet.add(kv);
+ liveKeys.add(kv.getKey());
+ }
- return new Commit(slot, seqnumtrans, seqnumcommit, transarbitrator, kvSet);
- }
+ public void invalidateKey(IoTString key) {
+ liveKeys.remove(key);
- public void encode(ByteBuffer bb) {
- bb.put(Entry.TypeCommit);
- bb.putLong(seqnumtrans);
- bb.putLong(seqnumcommit);
- bb.putLong(transarbitrator);
+ if (liveKeys.size() == 0) {
+ setDead();
+ }
+ }
- bb.putInt(keyValueUpdateSet.size());
+ public Set<KeyValue> getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bb);
- }
- }
+ public int getNumberOfParts() {
+ return parts.size();
+ }
- public Entry getCopy(Slot s) {
- return new Commit(s, seqnumtrans, seqnumcommit, transarbitrator, keyValueUpdateSet);
- }
+ public long getMachineId() {
+ return machineId;
+ }
- public Set<KeyValue> updateLiveKeys(Set<KeyValue> kvSet) {
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public boolean isLive() {
+ return !isDead;
+ }
+
+ public void setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
- if (!this.isLive())
- return new HashSet<KeyValue>();
+ // Set dead
+ isDead = true;
+
+ // Make all the parts of this transaction dead
+ for (Integer partNumber : parts.keySet()) {
+ CommitPart part = parts.get(partNumber);
+ part.setDead();
+ }
+ }
+
+ public CommitPart getPart(int index) {
+ return parts.get(index);
+ }
+
+ public void createCommitParts() {
+
+ parts.clear();
+
+ // Convert to bytes
+ byte[] byteData = convertDataToBytes();
+
+
+ int commitPartCount = 0;
+ int currentPosition = 0;
+ int remaining = byteData.length;
- Set<KeyValue> toDelete = new HashSet<KeyValue>();
+ while (remaining > 0) {
- for (KeyValue kv1 : kvSet) {
- for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
- KeyValue kv2 = i.next();
+ Boolean isLastPart = false;
+ // determine how much to copy
+ int copySize = CommitPart.MAX_NON_HEADER_SIZE;
+ if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
+ copySize = remaining;
+ isLastPart = true; // last bit of data so last part
+ }
+
+ // Copy to a smaller version
+ byte[] partData = new byte[copySize];
+ System.arraycopy(byteData, currentPosition, partData, 0, copySize);
+
+ CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts.put(part.getPartNumber(), part);
+
+ // Update position, count and remaining
+ currentPosition += copySize;
+ commitPartCount++;
+ remaining -= copySize;
+ }
+ }
+
+ private void decodeCommitData() {
- if (kv1.getKey().equals(kv2.getKey())) {
- toDelete.add(kv2);
- i.remove();
- break;
- }
- }
- }
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ CommitPart tp = parts.get(i);
+ dataSize += tp.getDataSize();
+ }
+
+ byte[] combinedData = new byte[dataSize];
+ int currentPosition = 0;
+
+ // Stitch all the data sections together
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ CommitPart tp = parts.get(i);
+ System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+ currentPosition += tp.getDataSize();
+ }
+
+ // Decoder Object
+ ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVUpdates = bbDecode.getInt();
+
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueUpdateSet.add(kv);
+ liveKeys.add(kv.getKey());
+ }
+ }
+
+ private byte[] convertDataToBytes() {
+
+ // Calculate the size of the data
+ int sizeOfData = Integer.BYTES; // Number of Update KV's
+ for (KeyValue kv : keyValueUpdateSet) {
+ sizeOfData += kv.getSize();
+ }
+
+ // Data handlers and storage
+ byte[] dataArray = new byte[sizeOfData];
+ ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
+
+ // Encode the size of the updates and guard sets
+ bbEncode.putInt(keyValueUpdateSet.size());
+
+ // Encode all the updates
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv.encode(bbEncode);
+ }
- if (keyValueUpdateSet.size() == 0) {
- this.setDead();
- }
-
- return toDelete;
- }
+ return bbEncode.array();
+ }
}
\ No newline at end of file