import java.nio.ByteBuffer;
import java.util.Set;
import java.util.HashSet;
+import java.util.Iterator;
/**
- * This Entry records the abort sent by a given machine.
+ * This Entry records the commit of a transaction.
* @author Ali Younis <ayounis@uci.edu>
* @version 1.0
*/
class Commit extends Entry {
- private long seqnum;
- private Set<KeyValue> keyValueUpdateSet;
+ private long seqnumtrans;
+ private Set<KeyValue> keyValueUpdateSet = null;
+ private Set<KeyValue> liveValues = null;
- public Commit(Slot slot, long _seqnum, long _machineid) {
+ public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
super(slot);
- seqnum=_seqnum;
- machineid=_machineid;
+ seqnumtrans = _seqnumtrans;
+
+ keyValueUpdateSet = new HashSet<KeyValue>();
+ liveValues = new HashSet<KeyValue>();
+
+ for (KeyValue kv : _keyValueUpdateSet) {
+ KeyValue kvCopy = kv.getCopy();
+ keyValueUpdateSet.add(kvCopy);
+ liveValues.add(kvCopy);
+ }
}
- public long getSequenceNumber() {
- return seqnum;
+ public long getTransSequenceNumber() {
+ return seqnumtrans;
}
+ public Set<KeyValue> getkeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+ public byte getType() {
+ return Entry.TypeCommit;
+ }
+ public int getSize() {
+ int size = Long.BYTES + Byte.BYTES; // seq id, entry type
+ size += Integer.BYTES; // number of KV's
+ // Size of each KV
+ for (KeyValue kv : keyValueUpdateSet) {
+ size += kv.getSize();
+ }
+ return size;
+ }
static Entry decode(Slot slot, ByteBuffer bb) {
- long seqnum=bb.getLong();
- long machineid=bb.getLong();
- return new Abort(slot, seqnum, machineid);
+ long seqnumtrans = bb.getLong();
+ int numberOfKeys = bb.getInt();
+
+ Set<KeyValue> kvSet = new HashSet<KeyValue>();
+ for (int i = 0; i < numberOfKeys; i++) {
+ KeyValue kv = KeyValue.decode(bb);
+ kvSet.add(kv);
+ }
+
+ return new Commit(slot, seqnumtrans, kvSet);
}
public void encode(ByteBuffer bb) {
- bb.put(Entry.TypeAbort);
- bb.putLong(seqnum);
- bb.putLong(machineid);
- }
+ bb.put(Entry.TypeCommit);
+ bb.putLong(seqnumtrans);
+ bb.putInt(keyValueUpdateSet.size());
- public int getSize() {
- return 2*Long.BYTES+Byte.BYTES;
+ for (KeyValue kv : keyValueUpdateSet) {
+ kv.encode(bb);
+ }
}
- public byte getType() {
- return Entry.TypeAbort;
+ public Entry getCopy(Slot s) {
+ return new Commit(s, seqnumtrans, keyValueUpdateSet);
}
- public Entry getCopy(Slot s) {
- return new Abort(s, seqnum, machineid);
+ public void updateLiveKeys(Set<KeyValue> kvSet) {
+
+ if (!this.isLive())
+ return;
+
+ for (KeyValue kv1 : kvSet) {
+ for (Iterator<KeyValue> i = liveValues.iterator(); i.hasNext();) {
+ KeyValue kv2 = i.next();
+
+ if (kv1.getKey() == kv2.getKey()) {
+ liveValues.remove(kv2);
+ break;
+ }
+ }
+ }
+
+ if (liveValues.size() == 0)
+ this.setDead();
}
}
\ No newline at end of file