Offline support added
[iotcloud.git] / version2 / src / java / iotcloud / Commit.java
index 1ee044df3e7cca9f22eeb29f071f8666989e2f49..deeb501c8ef47ed51192393914c66a270790e5db 100644 (file)
@@ -14,12 +14,15 @@ import java.util.Iterator;
 
 class Commit extends Entry {
        private long seqnumtrans;
+       private long transarbitrator;
+
        private Set<KeyValue> keyValueUpdateSet = null;
 
 
-       public Commit(Slot slot, long _seqnumtrans, Set<KeyValue> _keyValueUpdateSet) {
+       public Commit(Slot slot, long _seqnumtrans, long _transarbitrator, Set<KeyValue> _keyValueUpdateSet) {
                super(slot);
                seqnumtrans = _seqnumtrans;
+               transarbitrator = _transarbitrator;
 
                keyValueUpdateSet = new HashSet<KeyValue>();
 
@@ -33,6 +36,10 @@ class Commit extends Entry {
                return seqnumtrans;
        }
 
+       public long getTransArbitrator() {
+               return transarbitrator;
+       }
+
        public Set<KeyValue> getkeyValueUpdateSet() {
                return keyValueUpdateSet;
        }
@@ -42,7 +49,7 @@ class Commit extends Entry {
        }
 
        public int getSize() {
-               int size = Long.BYTES + Byte.BYTES; // seq id, entry type
+               int size = 2 * Long.BYTES + Byte.BYTES; // seq id, entry type
                size += Integer.BYTES; // number of KV's
 
                // Size of each KV
@@ -55,6 +62,7 @@ class Commit extends Entry {
 
        static Entry decode(Slot slot, ByteBuffer bb) {
                long seqnumtrans = bb.getLong();
+               long transarbitrator = bb.getLong();
                int numberOfKeys = bb.getInt();
 
                Set<KeyValue> kvSet = new HashSet<KeyValue>();
@@ -63,12 +71,14 @@ class Commit extends Entry {
                        kvSet.add(kv);
                }
 
-               return new Commit(slot, seqnumtrans, kvSet);
+               return new Commit(slot, seqnumtrans, transarbitrator, kvSet);
        }
 
        public void encode(ByteBuffer bb) {
                bb.put(Entry.TypeCommit);
                bb.putLong(seqnumtrans);
+               bb.putLong(transarbitrator);
+
                bb.putInt(keyValueUpdateSet.size());
 
                for (KeyValue kv : keyValueUpdateSet) {
@@ -77,26 +87,32 @@ class Commit extends Entry {
        }
 
        public Entry getCopy(Slot s) {
-               return new Commit(s, seqnumtrans, keyValueUpdateSet);
+               return new Commit(s, seqnumtrans, transarbitrator, keyValueUpdateSet);
        }
 
-       public void updateLiveKeys(Set<KeyValue> kvSet) {
+       public Set<KeyValue> updateLiveKeys(Set<KeyValue> kvSet) {
 
                if (!this.isLive())
-                       return;
+                       return new HashSet<KeyValue>();
+
+               Set<KeyValue> toDelete = new HashSet<KeyValue>();
 
                for (KeyValue kv1 : kvSet) {
                        for (Iterator<KeyValue> i = keyValueUpdateSet.iterator(); i.hasNext();) {
                                KeyValue kv2 = i.next();
 
-                               if (kv1.getKey() == kv2.getKey()) {
-                                       keyValueUpdateSet.remove(kv2);
+                               if (kv1.getKey().equals(kv2.getKey())) {
+                                       toDelete.add(kv2);
+                                       i.remove();
                                        break;
                                }
                        }
                }
 
-               if (keyValueUpdateSet.size() == 0)
+               if (keyValueUpdateSet.size() == 0) {
                        this.setDead();
+               }
+
+               return toDelete;
        }
 }
\ No newline at end of file