X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=src%2Fjava%2Fiotcloud%2FTable.java;h=6cd2b323d9cf6a07789f2b8dcd9da318ad7da73e;hp=ae4ed164e2823d1ae6afae2d9a1cb1230d2cfb42;hb=9623ac4706e157eb7001dab1792ddb50f6a1b345;hpb=89e8371e2ca27ce4f581596497b786eb0a209678 diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index ae4ed16..6cd2b32 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -7,15 +7,15 @@ import javax.crypto.*; public class Table { int numslots; HashMap table=new HashMap(); - HashMap lastmessage=new HashMap(); + HashMap> lastmessagetable=new HashMap>(); SlotBuffer buffer; CloudComm cloud; private Mac hmac; long sequencenumber; - long machineid; + long localmachineid; - public Table(String baseurl, String password, long _machineid) { - machineid=_machineid; + public Table(String baseurl, String password, long _localmachineid) { + localmachineid=_localmachineid; buffer = new SlotBuffer(); sequencenumber = 1; initCloud(baseurl, password); @@ -52,6 +52,18 @@ public class Table { validateandupdate(newslots); } + public IoTString get(IoTString key) { + KeyValue kv=table.get(key); + if (kv != null) + return kv.getValue(); + else + return null; + } + + public IoTString put(IoTString key, IoTString value) { + return null; + } + void validateandupdate(Slot[] newslots) { //The cloud communication layer has checked slot HMACs already //before decoding @@ -70,44 +82,77 @@ public class Table { } - void processEntry(KeyValue entry, SlotIndexer indexer, Slot slot) { + void processEntry(KeyValue entry, SlotIndexer indexer) { IoTString key=entry.getKey(); KeyValue oldvalue=table.get(key); if (oldvalue != null) { - oldvalue.setDead(); + oldvalue.decrementLiveCount(); } table.put(key, entry); } - void processEntry(LastMessage entry, SlotIndexer indexer, Slot slot) { - updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), null, entry); + void processEntry(LastMessage entry, SlotIndexer indexer) { + updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry); } - void processEntry(RejectedMessage entry, SlotIndexer indexer, Slot slot) { - + void processEntry(RejectedMessage entry, SlotIndexer indexer) { + long oldseqnum=entry.getOldSeqNum(); + long newseqnum=entry.getNewSeqNum(); + boolean isequal=entry.getEqual(); + long machineid=entry.getMachineID(); + for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) { + Slot slot=indexer.getSlot(seqnum); + if (slot != null) { + long slotmachineid=slot.getMachineID(); + if (isequal!=(slotmachineid==machineid)) { + throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum); + } + } + } } - + void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) { - + } - void updateLastMessage(long machineid, long seqnum, Slot slot, LastMessage entry) { + void updateLastMessage(long machineid, long seqnum, Liveness liveness) { + Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); + if (lastmsgentry == null) + return; + + long lastmsgseqnum = lastmsgentry.getFirst(); + Liveness lastentry = lastmsgentry.getSecond(); + if (lastentry instanceof LastMessage) { + ((LastMessage)lastentry).decrementLiveCount(); + } else if (lastentry instanceof Slot) { + ((Slot)lastentry).decrementLiveCount(); + } else { + throw new Error("Unrecognized type"); + } + //Check that nothing funny happened + if (machineid == localmachineid) { + if (lastmsgseqnum != seqnum) + throw new Error("Server Error: Mismatch on local machine sequence number"); + } else { + if (lastmsgseqnum > seqnum) + throw new Error("Server Error: Rolback on remote machine sequence number"); + } } void processSlot(SlotIndexer indexer, Slot slot) { - updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, null); + updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot); for(Entry entry : slot.getEntries()) { switch(entry.getType()) { case Entry.TypeKeyValue: - processEntry((KeyValue)entry, indexer, slot); + processEntry((KeyValue)entry, indexer); break; case Entry.TypeLastMessage: - processEntry((LastMessage)entry, indexer, slot); + processEntry((LastMessage)entry, indexer); break; case Entry.TypeRejectedMessage: - processEntry((RejectedMessage)entry, indexer, slot); + processEntry((RejectedMessage)entry, indexer); break; case Entry.TypeTableStatus: processEntry((TableStatus)entry, indexer, slot);