add makefile
[iotcloud.git] / src / java / iotcloud / Table.java
index ae4ed16..6cd2b32 100644 (file)
@@ -7,15 +7,15 @@ import javax.crypto.*;
 public class Table {
        int numslots;
        HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
-       HashMap<Long, Long> lastmessage=new HashMap<Long, Long>();
+       HashMap<Long, Pair<Long, Liveness>> lastmessagetable=new HashMap<Long, Pair<Long, Liveness>>();
        SlotBuffer buffer;
        CloudComm cloud;
        private Mac hmac;
        long sequencenumber;
-       long machineid;
+       long localmachineid;
        
-       public Table(String baseurl, String password, long _machineid) {
-               machineid=_machineid;
+       public Table(String baseurl, String password, long _localmachineid) {
+               localmachineid=_localmachineid;
                buffer = new SlotBuffer();
                sequencenumber = 1;
                initCloud(baseurl, password);
@@ -52,6 +52,18 @@ public class Table {
                validateandupdate(newslots);
        }
 
+       public IoTString get(IoTString key) {
+               KeyValue kv=table.get(key);
+               if (kv != null)
+                       return kv.getValue();
+               else
+                       return null;
+       }
+
+       public IoTString put(IoTString key, IoTString value) {
+               return null;
+       }
+       
        void validateandupdate(Slot[] newslots) {
                //The cloud communication layer has checked slot HMACs already
                //before decoding
@@ -70,44 +82,77 @@ public class Table {
                
        }
 
-       void processEntry(KeyValue entry, SlotIndexer indexer, Slot slot) {
+       void processEntry(KeyValue entry, SlotIndexer indexer) {
                IoTString key=entry.getKey();
                KeyValue oldvalue=table.get(key);
                if (oldvalue != null) {
-                       oldvalue.setDead();
+                       oldvalue.decrementLiveCount();
                }
                table.put(key, entry);
        }
 
-       void processEntry(LastMessage entry, SlotIndexer indexer, Slot slot) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), null, entry);
+       void processEntry(LastMessage entry, SlotIndexer indexer) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
        }
 
-       void processEntry(RejectedMessage entry, SlotIndexer indexer, Slot slot) {
-               
+       void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+               long oldseqnum=entry.getOldSeqNum();
+               long newseqnum=entry.getNewSeqNum();
+               boolean isequal=entry.getEqual();
+               long machineid=entry.getMachineID();
+               for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) {
+                       Slot slot=indexer.getSlot(seqnum);
+                       if (slot != null) {
+                               long slotmachineid=slot.getMachineID();
+                               if (isequal!=(slotmachineid==machineid)) {
+                                       throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
+                               }
+                       }
+               }
        }
-
+       
        void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-
+               
        }
 
-       void updateLastMessage(long machineid, long seqnum, Slot slot, LastMessage entry) {
+       void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+               Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+               if (lastmsgentry == null)
+                       return;
+
+               long lastmsgseqnum = lastmsgentry.getFirst();
+               Liveness lastentry = lastmsgentry.getSecond();
+               if (lastentry instanceof LastMessage) {
+                       ((LastMessage)lastentry).decrementLiveCount();
+               } else if (lastentry instanceof Slot) {
+                       ((Slot)lastentry).decrementLiveCount();
+               } else {
+                       throw new Error("Unrecognized type");
+               }
                
+               //Check that nothing funny happened
+               if (machineid == localmachineid) {
+                       if (lastmsgseqnum != seqnum)
+                               throw new Error("Server Error: Mismatch on local machine sequence number");
+               } else {
+                       if (lastmsgseqnum > seqnum)
+                               throw new Error("Server Error: Rolback on remote machine sequence number");
+               }
        }
        
        void processSlot(SlotIndexer indexer, Slot slot) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, null);
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
 
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
-                               processEntry((KeyValue)entry, indexer, slot);
+                               processEntry((KeyValue)entry, indexer);
                                break;
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer, slot);
+                               processEntry((LastMessage)entry, indexer);
                                break;
                        case Entry.TypeRejectedMessage:
-                               processEntry((RejectedMessage)entry, indexer, slot);
+                               processEntry((RejectedMessage)entry, indexer);
                                break;
                        case Entry.TypeTableStatus:
                                processEntry((TableStatus)entry, indexer, slot);