class Commit extends Entry {
private long seqnumtrans;
+ private long transarbitrator;
+
private Set<KeyValue> keyValueUpdateSet = null;
- public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
+ public Commit(Slot slot, long _seqnumtrans, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
super(slot);
seqnumtrans = _seqnumtrans;
+ transarbitrator = _transarbitrator;
keyValueUpdateSet = new HashSet<KeyValue>();
return seqnumtrans;
}
+ public long getTransArbitrator() {
+ return transarbitrator;
+ }
+
public Set<KeyValue> getkeyValueUpdateSet() {
return keyValueUpdateSet;
}
}
public int getSize() {
- int size = Long.BYTES + Byte.BYTES; // seq id, entry type
+ int size = 2 * Long.BYTES + Byte.BYTES; // seq id, entry type
size += Integer.BYTES; // number of KV's
// Size of each KV
static Entry decode(Slot slot, ByteBuffer bb) {
long seqnumtrans = bb.getLong();
+ long transarbitrator = bb.getLong();
int numberOfKeys = bb.getInt();
Set<KeyValue> kvSet = new HashSet<KeyValue>();
kvSet.add(kv);
}
- return new Commit(slot, seqnumtrans, kvSet);
+ return new Commit(slot, seqnumtrans, transarbitrator, kvSet);
}
public void encode(ByteBuffer bb) {
bb.put(Entry.TypeCommit);
bb.putLong(seqnumtrans);
+ bb.putLong(transarbitrator);
+
bb.putInt(keyValueUpdateSet.size());
for (KeyValue kv : keyValueUpdateSet) {
}
public Entry getCopy(Slot s) {
- return new Commit(s, seqnumtrans, keyValueUpdateSet);
+ return new Commit(s, seqnumtrans, transarbitrator, keyValueUpdateSet);
}
- public void updateLiveKeys(Set<KeyValue> kvSet) {
+ public Set<KeyValue> updateLiveKeys(Set<KeyValue> kvSet) {
if (!this.isLive())
- return;
+ return new HashSet<KeyValue>();
+
+ Set<KeyValue> toDelete = new HashSet<KeyValue>();
for (KeyValue kv1 : kvSet) {
for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
KeyValue kv2 = i.next();
- if (kv1.getKey() == kv2.getKey()) {
- keyValueUpdateSet.remove(kv2);
+ if (kv1.getKey().equals(kv2.getKey())) {
+ toDelete.add(kv2);
+ i.remove();
break;
}
}
}
- if (keyValueUpdateSet.size() == 0)
+ if (keyValueUpdateSet.size() == 0) {
this.setDead();
+ }
+
+ return toDelete;
}
}
\ No newline at end of file