Separating pattern collection and analysis into 2 different threads.
authorrtrimana <rtrimana@uci.edu>
Wed, 2 May 2018 23:04:43 +0000 (16:04 -0700)
committerrtrimana <rtrimana@uci.edu>
Wed, 2 May 2018 23:04:43 +0000 (16:04 -0700)
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPattern.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java

index 7ec2a74..a083e49 100644 (file)
@@ -54,10 +54,18 @@ public class FlowPattern {
 
     /**
      * Get the the sequence of packet lengths that defines this {@code FlowPattern}.
-     * @return the the sequence of packet lengths that defines this {@code FlowPattern}.
+     * @return the sequence of packet lengths that defines this {@code FlowPattern}.
      */
     public List<Integer> getPacketOrder() {
         return flowPacketOrder;
     }
+    
+    /**
+     * Get the length of the List of {@code FlowPattern}.
+     * @return the length of the List of {@code FlowPattern}.
+     */
+    public int getLength() {
+        return flowPacketOrder.size();
+    }
 
 }
index cb5a0fd..aff4534 100644 (file)
@@ -14,38 +14,82 @@ import java.net.UnknownHostException;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
+ * We use 2 threads:
+ *  1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
+ *  2) The second thread (checker thread) checks the collected conversation.
  *
  * @author Janus Varmarken
+ * @author Rahmadi Trimananda
  */
 public class FlowPatternFinder {
 
     /* Class properties */
-    private final Map<Conversation, List<PcapPacket>> connections = new HashMap<>();
+    private Map<Conversation, List<PcapPacket>> connections;
+    private Queue<Conversation> conversations;
     private DnsMap dnsMap;
+    private PcapHandle pcap;
+    private FlowPattern pattern;
+    private AtomicBoolean isEoF;
     
     
     /* Constructor */
-    public FlowPatternFinder() {
+    public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
+
+        this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
+        this.conversations = new ConcurrentLinkedQueue<Conversation>();
         this.dnsMap = new DnsMap();
+        this.isEoF = new AtomicBoolean(false);
+
+        // Get input parameters
+        this.pcap = _pcap;
+        this.pattern = _pattern;
+    }
+    
+    
+    public void start() {
+    
+        // Spawn the main thread
+        Thread mainThread = new Thread(new Runnable() {
+            public void run() {
+                findFlowPattern();
+            }
+        });
+        mainThread.start();
+
+        // Spawn the checker thread
+        Thread checkerThread = new Thread(new Runnable() {
+            public void run() {
+                find();
+            }
+        });
+        checkerThread.start();
+
+        /* TODO: Join the threads if we want it to be blocking
+        try {
+            mainThread.join();
+            checkerThread.join();
+        } catch(InterruptedException ex) {
+            ex.printStackTrace();
+        }*/
+        System.out.println("[ start ] Main and checker threads started!");
     }
 
 
     /**
-     * Find pattern based on the FlowPattern object
-     *
-     * @param   pcap        PCAP file handler
-     * @param   pattern     FlowPattern class object as a comparator
+     * Find patterns based on the FlowPattern object (run by a thread)
      */
-    // TODO clean up exceptions etc.
-    public void findFlowPattern(PcapHandle pcap, FlowPattern pattern)
-            throws PcapNativeException, NotOpenException, TimeoutException {
+    private void findFlowPattern() {
         int counter = 0;
         try {
             PcapPacket packet;
             Set<Integer> seqNumberSet = new HashSet<Integer>();
+            int patternLength = pattern.getLength();
             while ((packet = pcap.getNextPacketEx()) != null) {
 
                 // Check if this is a valid DNS packet
@@ -53,62 +97,65 @@ public class FlowPatternFinder {
                 // For now, we only work support pattern search in TCP over IPv4.
                 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
                 TcpPacket tcpPacket = packet.get(TcpPacket.class);
-                if (ipPacket == null || tcpPacket == null) {
+                if (ipPacket == null || tcpPacket == null)
                     continue;
-                }
                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
                 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
-                // Is this packet related to the pattern and coming from the cloud server?
+                // Is this packet related to the pattern and coming to/from the cloud server?
                 boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
-                // Is this packet related to the pattern and going to the cloud server?
                 boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
-                if (!fromServer && !fromClient) {
-                    // Packet not related to pattern, skip it.
-                    continue;
-                }
-                if (tcpPacket.getPayload() == null) {
-                    // We skip non-payload control packets as these are less predictable and should therefore not be
-                    // part of a signature (e.g. receiver can choose not to ACK immediately)
+                if (!fromServer && !fromClient)  // Packet not related to pattern, skip it.
                     continue;
-                }
-
+                if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
+                    continue; 
                 // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
                 // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
                 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
-                List<PcapPacket> listWrappedPacket = new ArrayList<>();               
-                listWrappedPacket.add(packet);
                 // Create new conversation entry, or append packet to existing.
-                connections.merge(conversation, listWrappedPacket, (v1, v2) -> {
-                    int seqNumber = v2.get(0).get(TcpPacket.class).getHeader().getSequenceNumber();
-                    boolean retransmission = seqNumberSet.contains(seqNumber);
-                    if (!retransmission) {
-                        // Do not add if retransmission -> avoid duplicate packets in flow
-                        v1.addAll(v2);
-                        seqNumberSet.add(seqNumber);
-                    }
-                    return v1;
-                });
+                List<PcapPacket> listPcapPacket = connections.get(conversation);
+                if (listPcapPacket == null) {
+                    listPcapPacket = new ArrayList<PcapPacket>();
+                    connections.put(conversation, listPcapPacket);
+                }
+                int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
+                boolean retransmission = seqNumberSet.contains(seqNumber);
+                if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
+                    listPcapPacket.add(packet);
+                    // End of conversation -> trigger thread to check
+                    if (listPcapPacket.size() == patternLength)
+                        conversations.add(conversation);
+                    seqNumberSet.add(seqNumber);
+                }
             }
         } catch (EOFException eofe) {
-            System.out.println("findFlowPattern: finished processing entire file");
-            find(pattern);
-        } catch (UnknownHostException ex) {
-            System.out.println(); 
+            while (isEoF.compareAndSet(false, true) == false);  // Try to signal EoF!
+            System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
+        } catch (UnknownHostException |
+                 PcapNativeException  |
+                 NotOpenException     |
+                 TimeoutException ex) {
             ex.printStackTrace();
         }
     }
+    
 
-
-    private void find(FlowPattern pattern) {
-        for (Conversation con : connections.keySet()) {
-            List<PcapPacket> packets = connections.get(con);
-            if (packets.size() != pattern.getPacketOrder().size()) {
-                // Not a complete match if different number of packets.
-                continue;
-            }
+    /**
+     * Checker to match collected patterns (run by a thread)
+     */
+    private void find() {
+
+        while (isEoF.get() == false) {  // Continue until EoF
+            // Get the object from the queue
+            while(conversations.peek() == null) {  // Wait until queue is not empty
+                if (isEoF.get() == true)    // Return if EoF
+                    return;
+            }            
+            Conversation conversation = conversations.poll();
+            // Get the object and remove it from the Map (housekeeping)
+            List<PcapPacket> packets = connections.remove(conversation);
             boolean completeMatch = true;
             for (int i = 0; i < packets.size(); i++) {
                 TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
@@ -120,9 +167,14 @@ public class FlowPatternFinder {
             if (completeMatch) {
                 PcapPacket firstPacketInFlow = packets.get(0);
                 System.out.println(
-                        String.format("[ detected a complete match of pattern '%s' at %s]",
+                        String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
-            }
+            } /*else {
+                PcapPacket firstPacketInFlow = packets.get(0);
+                System.out.println(
+                        String.format("[ detected a mismatch of pattern '%s' at %s]",
+                                pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
+            }*/
         }
     }
 
@@ -153,6 +205,7 @@ public class FlowPatternFinder {
         public boolean equals(Object obj) {
             return obj instanceof Conversation && this.toString().equals(obj.toString());
         }
+
         @Override
         public int hashCode() {
             return toString().hashCode();
index d2867df..e5a685f 100644 (file)
@@ -35,8 +35,8 @@ public class Main {
         } catch (PcapNativeException pne) {
             handle = Pcaps.openOffline(fileName);
         }
-        FlowPatternFinder fpf = new FlowPatternFinder();
-        fpf.findFlowPattern(handle, FlowPattern.TP_LINK_LOCAL_ON);
+        FlowPatternFinder fpf = new FlowPatternFinder(handle, FlowPattern.TP_LINK_LOCAL_ON);
+        fpf.start();
 
         // ========================
     }