Extract Conversation to separate file (i.e. it is no longer an inner class). Add...
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / FlowPatternFinder.java
index 8c7d41e..9bbd99c 100644 (file)
@@ -7,91 +7,155 @@ import org.pcap4j.core.PcapPacket;
 import org.pcap4j.packet.IpV4Packet;
 import org.pcap4j.packet.Packet;
 import org.pcap4j.packet.TcpPacket;
+import org.pcap4j.packet.DnsPacket;
 
 import java.io.EOFException;
+import java.net.UnknownHostException;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
+ * We use 2 threads:
+ *  1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
+ *  2) The second thread (checker thread) checks the collected conversation.
  *
  * @author Janus Varmarken
+ * @author Rahmadi Trimananda
  */
 public class FlowPatternFinder {
 
-    private final Map<String, Set<String>> dnsMap;
-    private final Map<Conversation, List<PcapPacket>> connections = new HashMap<>();
+    /* Class properties */
+    private Map<Conversation, List<PcapPacket>> connections;
+    private Queue<Conversation> conversations;
+    private DnsMap dnsMap;
+    private PcapHandle pcap;
+    private FlowPattern pattern;
+    private AtomicBoolean isEoF;
+    
+    
+    /* Constructor */
+    public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
+
+        this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
+        this.conversations = new ConcurrentLinkedQueue<Conversation>();
+        this.dnsMap = new DnsMap();
+        this.isEoF = new AtomicBoolean(false);
+
+        // Get input parameters
+        this.pcap = _pcap;
+        this.pattern = _pattern;
+    }
+    
+    
+    public void start() {
+    
+        // Spawn the main thread
+        Thread mainThread = new Thread(new Runnable() {
+            public void run() {
+                findFlowPattern();
+            }
+        });
+        mainThread.start();
 
-    public FlowPatternFinder(Map<String, Set<String>> dnsMap) {
-        this.dnsMap = Objects.requireNonNull(dnsMap);
+        // Spawn the checker thread
+        Thread checkerThread = new Thread(new Runnable() {
+            public void run() {
+                find();
+            }
+        });
+        checkerThread.start();
+
+        /* TODO: Join the threads if we want it to be blocking
+        try {
+            mainThread.join();
+            checkerThread.join();
+        } catch(InterruptedException ex) {
+            ex.printStackTrace();
+        }*/
+        System.out.println("[ start ] Main and checker threads started!");
     }
 
-    private static final Set<String> EMPTY_SET = Collections.unmodifiableSet(new HashSet<>());
 
-    // TODO clean up exceptions etc.
-    public void findFlowPattern(PcapHandle pcap, FlowPattern pattern)
-            throws PcapNativeException, NotOpenException, TimeoutException {
+    /**
+     * Find patterns based on the FlowPattern object (run by a thread)
+     */
+    private void findFlowPattern() {
+        int counter = 0;
         try {
             PcapPacket packet;
-
+            Set<Integer> seqNumberSet = new HashSet<Integer>();
+            int patternLength = pattern.getLength();
             while ((packet = pcap.getNextPacketEx()) != null) {
 
+                // Check if this is a valid DNS packet
+                dnsMap.validateAndAddNewEntry(packet);
                 // For now, we only work support pattern search in TCP over IPv4.
                 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
                 TcpPacket tcpPacket = packet.get(TcpPacket.class);
-                if (ipPacket == null || tcpPacket == null) {
+                if (ipPacket == null || tcpPacket == null)
                     continue;
-                }
                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
                 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
-                // Is this packet related to the pattern and coming from the cloud server?
-                boolean fromServer = dnsMap.getOrDefault(srcAddress, EMPTY_SET).contains(pattern.getHostname());
-                // Is this packet related to the pattern and going to the cloud server?
-                boolean fromClient = dnsMap.getOrDefault(dstAddress, EMPTY_SET).contains(pattern.getHostname());
-                if (!fromServer && !fromClient) {
-                    // Packet not related to pattern, skip it.
+                // Is this packet related to the pattern and coming to/from the cloud server?
+                boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
+                boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
+                if (!fromServer && !fromClient)  // Packet not related to pattern, skip it.
                     continue;
-                }
-                if (tcpPacket.getPayload() == null) {
-                    // We skip non-payload control packets as these are less predictable and should therefore not be
-                    // part of a signature (e.g. receiver can choose not to ACK immediately)
-                    continue;
-                }
-
+                if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
+                    continue; 
                 // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
                 // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
                 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
-                List<PcapPacket> listWrappedPacket = new ArrayList<>();
-                listWrappedPacket.add(packet);
                 // Create new conversation entry, or append packet to existing.
-                connections.merge(conversation, listWrappedPacket, (v1, v2) -> {
-                    // TODO: in theory, this is insufficient to detect retransmissions due to TCP seq.no. rollover.
-                    // TODO: bad for performance, O(n) for each packet added to flow (n being length of the flow).
-                    boolean retransmission = v1.stream().anyMatch(p -> p.get(TcpPacket.class).getHeader().getSequenceNumber() == v2.get(0).get(TcpPacket.class).getHeader().getSequenceNumber());
-                    if (!retransmission) {
-                        // Do not add if retransmission -> avoid duplicate packets in flow
-                        v1.addAll(v2);
-                    }
-                    return v1;
-                });
+                List<PcapPacket> listPcapPacket = connections.get(conversation);
+                if (listPcapPacket == null) {
+                    listPcapPacket = new ArrayList<PcapPacket>();
+                    connections.put(conversation, listPcapPacket);
+                }
+                int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
+                boolean retransmission = seqNumberSet.contains(seqNumber);
+                if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
+                    listPcapPacket.add(packet);
+                    // End of conversation -> trigger thread to check
+                    if (listPcapPacket.size() == patternLength)
+                        conversations.add(conversation);
+                    seqNumberSet.add(seqNumber);
+                }
             }
         } catch (EOFException eofe) {
-            System.out.println("findFlowPattern: finished processing entire file");
-            find(pattern);
+            while (isEoF.compareAndSet(false, true) == false);  // Try to signal EoF!
+            System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
+        } catch (UnknownHostException |
+                 PcapNativeException  |
+                 NotOpenException     |
+                 TimeoutException ex) {
+            ex.printStackTrace();
         }
     }
+    
 
-    private void find(FlowPattern pattern) {
-        for (Conversation con : connections.keySet()) {
-            List<PcapPacket> packets = connections.get(con);
-            if (packets.size() != pattern.getPacketOrder().size()) {
-                // Not a complete match if different number of packets.
-                continue;
-            }
+    /**
+     * Checker to match collected patterns (run by a thread)
+     */
+    private void find() {
+
+        while (isEoF.get() == false) {  // Continue until EoF
+            // Get the object from the queue
+            while(conversations.peek() == null) {  // Wait until queue is not empty
+                if (isEoF.get() == true)    // Return if EoF
+                    return;
+            }            
+            Conversation conversation = conversations.poll();
+            // Get the object and remove it from the Map (housekeeping)
+            List<PcapPacket> packets = connections.remove(conversation);
             boolean completeMatch = true;
             for (int i = 0; i < packets.size(); i++) {
                 TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
@@ -103,47 +167,14 @@ public class FlowPatternFinder {
             if (completeMatch) {
                 PcapPacket firstPacketInFlow = packets.get(0);
                 System.out.println(
-                        String.format("[ detected a complete match of pattern '%s' at %s]",
+                        String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
-            }
-        }
-    }
-
-    /**
-     * Immutable class used for identifying a conversation/connection/session/flow (packet's belonging to the same
-     * session between a client and a server).
-     */
-    private static class Conversation {
-
-        private final String clientIp;
-        private final int clientPort;
-        private final String serverIp;
-        private final int serverPort;
-
-        public Conversation(String clientIp, int clientPort, String serverIp, int serverPort) {
-            this.clientIp = clientIp;
-            this.clientPort = clientPort;
-            this.serverIp = serverIp;
-            this.serverPort = serverPort;
-        }
-
-
-        // =========================================================================================================
-        // We simply reuse equals and hashCode methods of String.class to be able to use this immutable class as a key
-        // in a Map.
-        @Override
-        public boolean equals(Object obj) {
-            return obj instanceof Conversation && this.toString().equals(obj.toString());
-        }
-        @Override
-        public int hashCode() {
-            return toString().hashCode();
-        }
-        // =========================================================================================================
-
-        @Override
-        public String toString() {
-            return String.format("%s:%d %s:%d", clientIp, clientPort, serverIp, serverPort);
+            } /*else {
+                PcapPacket firstPacketInFlow = packets.get(0);
+                System.out.println(
+                        String.format("[ detected a mismatch of pattern '%s' at %s]",
+                                pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
+            }*/
         }
     }