add check for missing messages
[iotcloud.git] / src / java / iotcloud / Table.java
index c92aef93ec4b7df6c918fca27dec4e8093a77141..05a85f8267e4592a321c4d3cb0c77f1c52c3c8c2 100644 (file)
@@ -1,9 +1,15 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
-import javax.crypto.spec.*;
-import javax.crypto.*;
+
+/**
+ * IoTTable data structure.  Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
 
 final public class Table {
        private int numslots;
@@ -11,7 +17,6 @@ final public class Table {
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
        private SlotBuffer buffer;
        private CloudComm cloud;
-       private Mac hmac;
        private long sequencenumber;
        private long localmachineid;
        private TableStatus lastTableStatus;
@@ -23,7 +28,7 @@ final public class Table {
                buffer = new SlotBuffer();
                numslots = buffer.capacity();
                sequencenumber = 0;
-               initCloud(baseurl, password);
+               cloud=new CloudComm(baseurl, password);
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
@@ -34,30 +39,9 @@ final public class Table {
                cloud=_cloud;
        }
 
-       private void initCloud(String baseurl, String password) {
-               try {
-                       SecretKeySpec secret=getKey(password);
-                       Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                       encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
-                       Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                       decryptCipher.init(Cipher.DECRYPT_MODE, secret);
-                       hmac = Mac.getInstance("HmacSHA256");
-                       hmac.init(secret);
-                       cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
-               } catch (Exception e) {
-                       throw new Error("Failed To Initialize Ciphers");
-               }
-       }
-
-       private SecretKeySpec getKey(String password) {
-               try {
-                       PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
-                       SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
-                       SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
-                       return secret;
-               } catch (Exception e) {
-                       throw new Error("Failed generating key.");
-               }
+       public void rebuild() {
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+               validateandupdate(newslots, true);
        }
 
        public void update() {
@@ -81,7 +65,8 @@ final public class Table {
                Slot[] array=cloud.putSlot(s, numslots);
                if (array == null) {
                        array = new Slot[] {s};
-                       validateandupdate(array, true);                                                                         // update data structure
+                       /* update data structure */
+                       validateandupdate(array, true);
                } else {
                        throw new Error("Error on initialization");
                }
@@ -109,7 +94,7 @@ final public class Table {
                }
 
                if ((numslots - buffer.size()) < FREE_SLOTS) {
-                       //have to check whether we have enough free slots
+                       /* have to check whether we have enough free slots */
                        long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
                        seqn = fullfirstseqn < 1?1:fullfirstseqn;
                        for(int i=0; i < FREE_SLOTS; i++, seqn++) {
@@ -165,7 +150,8 @@ search:
                else
                        insertedkv=false;
 
-               validateandupdate(array, true);                                                 // update data structure
+               /* update data structure */
+               validateandupdate(array, true);
 
                return insertedkv;
        }
@@ -179,9 +165,9 @@ search:
        }
 
 
-       private void validateandupdate(Slot[] newslots, boolean isput) {
-               //The cloud communication layer has checked slot HMACs already
-               //before decoding
+       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+               /The cloud communication layer has checked slot HMACs already
+                        before decoding */
                if (newslots.length==0)
                        return;
 
@@ -192,19 +178,24 @@ search:
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
+               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+               
                initExpectedSize();
                for(Slot slot: newslots) {
                        updateExpectedSize();
-                       processSlot(indexer, slot, isput);
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
                }
 
-               //If there is a gap, check to see if the server sent us everything
-               if (firstseqnum != (sequencenumber+1))
+               /* If there is a gap, check to see if the server sent us everything. */
+               if (firstseqnum != (sequencenumber+1)) {
                        checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty())
+                               throw new Error("Missing record for machines: "+machineSet);
+               }
 
                commitNewMaxSize();
 
-               //commit new to slots
+               /* Commit new to slots. */
                for(Slot slot:newslots) {
                        buffer.putSlot(slot);
                }
@@ -250,8 +241,8 @@ search:
                table.put(key, entry);
        }
 
-       private void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
+       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
        private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@ -278,7 +269,8 @@ search:
                lastTableStatus = entry;
        }
 
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
@@ -294,7 +286,7 @@ search:
                }
 
                if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum && !isput)
+                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
@@ -302,8 +294,8 @@ search:
                }
        }
 
-       private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
 
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
@@ -312,7 +304,7 @@ search:
                                break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer);
+                               processEntry((LastMessage)entry, indexer, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage:
@@ -335,7 +327,7 @@ search:
                        Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
                        if (prevslot != null &&
                                        !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
-                               throw new Error("Server Error: Invalid HMAC Chain");
+                               throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
                }
        }
 }