Code for reassembling TCP streams. Not thoroughly tested, but seems to work for a...
authorJanus Varmarken <varmarken@gmail.com>
Fri, 13 Jul 2018 23:52:32 +0000 (16:52 -0700)
committerJanus Varmarken <varmarken@gmail.com>
Fri, 13 Jul 2018 23:52:32 +0000 (16:52 -0700)
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Conversation.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java [new file with mode: 0644]

index fdb4a7f..b9f3e07 100644 (file)
@@ -72,6 +72,29 @@ public class Conversation {
     private List<FinAckPair> mFinPackets;
     /* End instance properties */
 
+    /**
+     * Factory method for creating a {@code Conversation} from a {@link PcapPacket}.
+     * @param pcapPacket The {@code PcapPacket} that wraps a TCP segment for which a {@code Conversation} is to be initiated.
+     * @param clientIsSrc If {@code true}, the source address and source port found in the IP datagram and TCP segment
+     *                    wrapped in the {@code PcapPacket} are regarded as pertaining to the client, and the destination
+     *                    address and destination port are regarded as pertaining to the server---and vice versa if set
+     *                    to {@code false}.
+     * @return A {@code Conversation} initiated with ip:port for client and server according to the direction of the packet.
+     */
+    public static Conversation fromPcapPacket(PcapPacket pcapPacket, boolean clientIsSrc) {
+        IpV4Packet ipPacket = pcapPacket.get(IpV4Packet.class);
+        TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
+        String clientIp = clientIsSrc ? ipPacket.getHeader().getSrcAddr().getHostAddress() :
+                ipPacket.getHeader().getDstAddr().getHostAddress();
+        String srvIp = clientIsSrc ? ipPacket.getHeader().getDstAddr().getHostAddress() :
+                ipPacket.getHeader().getSrcAddr().getHostAddress();
+        int clientPort = clientIsSrc ? tcpPacket.getHeader().getSrcPort().valueAsInt() :
+                tcpPacket.getHeader().getDstPort().valueAsInt();
+        int srvPort = clientIsSrc ? tcpPacket.getHeader().getDstPort().valueAsInt() :
+                tcpPacket.getHeader().getSrcPort().valueAsInt();
+        return new Conversation(clientIp, clientPort, srvIp, srvPort);
+    }
+
     /**
      * Constructs a new {@code Conversation}.
      * @param clientIp The IP of the host that is considered the client (i.e. the host that initiates the conversation)
@@ -213,6 +236,8 @@ public class Conversation {
     public void addFinPacket(PcapPacket finPacket) {
         // Precondition: verify that packet does indeed pertain to conversation.
         onAddPrecondition(finPacket);
+        // TODO: should call addSeqNumber here?
+        addSeqNumber(finPacket);
         mFinPackets.add(new FinAckPair(finPacket));
     }
 
@@ -327,7 +352,7 @@ public class Conversation {
      * @param packet The packet.
      * @return {@code true} if {@code packet} was determined to be a retransmission, {@code false} otherwise.
      */
-    private boolean isRetransmission(PcapPacket packet) {
+    public boolean isRetransmission(PcapPacket packet) {
         // Extract sequence number.
         int seqNo = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
         switch (getDirection(packet)) {
index f3b8a00..94b7820 100644 (file)
@@ -37,38 +37,46 @@ public class Main {
 //        MacLayerFlowPatternFinder finder = new MacLayerFlowPatternFinder(handle, pattern);
 //        finder.findFlowPattern();
         // -------------------------------------------------------------------------------------------------------------
-
-        //final String fileName = args.length > 0 ? args[0] : "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.dns.pcap";
-        final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/TPLink/wlan1/tplink.wlan1.local.pcap";
-        //final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/DLink/wlan1/dlink.wlan1.local.pcap";
-        final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON_SUBSET.pcap";
-//        final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON.pcap";
-//
-//        // ====== Debug code ======
-        PcapHandle handle;
-        PcapHandle trainingPcap;
-        try {
-            handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO);
-            trainingPcap = Pcaps.openOffline(trainingFileName, PcapHandle.TimestampPrecision.NANO);
-        } catch (PcapNativeException pne) {
-            handle = Pcaps.openOffline(fileName);
-            trainingPcap = Pcaps.openOffline(trainingFileName);
-        }
-//
-//        // TODO: The followings are the way to extract multiple hostnames and their associated packet lengths lists
-//        //List<String> list = new ArrayList<>();
-//        //list.add("events.tplinkra.com");
-//        //FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", list, trainingPcap);
-//        //List<String> list2 = new ArrayList<>();
-//        //list2.add("devs.tplinkcloud.com");
-//        //list2.add("events.tplinkra.com");
-//        //FlowPattern fp3 = new FlowPattern("TP_LINK_REMOTE_ON", list2, trainingPcap);
 //
-        FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", "events.tplinkra.com", trainingPcap);
-        //FlowPattern fp = new FlowPattern("DLINK_LOCAL_ON", "rfe-us-west-1.dch.dlink.com", trainingPcap);
-        FlowPatternFinder fpf = new FlowPatternFinder(handle, fp);
-        fpf.start();
-//
-//        // ========================
+//        //final String fileName = args.length > 0 ? args[0] : "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.dns.pcap";
+//        final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/TPLink/wlan1/tplink.wlan1.local.pcap";
+//        //final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/DLink/wlan1/dlink.wlan1.local.pcap";
+//        final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON_SUBSET.pcap";
+////        final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON.pcap";
+////
+////        // ====== Debug code ======
+//        PcapHandle handle;
+//        PcapHandle trainingPcap;
+//        try {
+//            handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO);
+//            trainingPcap = Pcaps.openOffline(trainingFileName, PcapHandle.TimestampPrecision.NANO);
+//        } catch (PcapNativeException pne) {
+//            handle = Pcaps.openOffline(fileName);
+//            trainingPcap = Pcaps.openOffline(trainingFileName);
+//        }
+////
+////        // TODO: The followings are the way to extract multiple hostnames and their associated packet lengths lists
+////        //List<String> list = new ArrayList<>();
+////        //list.add("events.tplinkra.com");
+////        //FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", list, trainingPcap);
+////        //List<String> list2 = new ArrayList<>();
+////        //list2.add("devs.tplinkcloud.com");
+////        //list2.add("events.tplinkra.com");
+////        //FlowPattern fp3 = new FlowPattern("TP_LINK_REMOTE_ON", list2, trainingPcap);
+////
+//        FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", "events.tplinkra.com", trainingPcap);
+//        //FlowPattern fp = new FlowPattern("DLINK_LOCAL_ON", "rfe-us-west-1.dch.dlink.com", trainingPcap);
+//        FlowPatternFinder fpf = new FlowPatternFinder(handle, fp);
+//        fpf.start();
+////
+////        // ========================
+
+
+        PcapReader pcapReader = new PcapReader(args[0]);
+        PcapProcessingPipeline pipeline = new PcapProcessingPipeline(pcapReader);
+        TcpReassembler tcpReassembler = new TcpReassembler();
+        pipeline.addPcapPacketConsumer(tcpReassembler);
+        pipeline.executePipeline();
+        System.out.println("Pipeline terminated");
     }
 }
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java
new file mode 100644 (file)
index 0000000..fb78eb7
--- /dev/null
@@ -0,0 +1,14 @@
+package edu.uci.iotproject;
+
+import org.pcap4j.core.PcapPacket;
+
+/**
+ * TODO add class documentation.
+ *
+ * @author Janus Varmarken
+ */
+public interface PcapPacketConsumer {
+
+    void consumePacket(PcapPacket pcapPacket);
+
+}
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java
new file mode 100644 (file)
index 0000000..5b3830c
--- /dev/null
@@ -0,0 +1,37 @@
+package edu.uci.iotproject;
+
+import org.pcap4j.core.PcapPacket;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO add class documentation.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class PcapProcessingPipeline {
+
+    private final PcapReader mPcapReader;
+    private final List<PcapPacketConsumer> mPacketConsumers;
+
+    public PcapProcessingPipeline(PcapReader pcapReader) {
+        mPcapReader = pcapReader;
+        mPacketConsumers = new ArrayList<>();
+    }
+
+    public void addPcapPacketConsumer(PcapPacketConsumer packetConsumer) {
+        mPacketConsumers.add(packetConsumer);
+    }
+
+    public void executePipeline() {
+        PcapPacket packet;
+        while ((packet = mPcapReader.readNextPacket()) != null) {
+            for (PcapPacketConsumer consumer : mPacketConsumers) {
+                consumer.consumePacket(packet);
+            }
+        }
+    }
+
+}
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java
new file mode 100644 (file)
index 0000000..c8106b2
--- /dev/null
@@ -0,0 +1,50 @@
+package edu.uci.iotproject;
+
+import org.pcap4j.core.*;
+
+import java.io.EOFException;
+import java.util.concurrent.TimeoutException;
+/**
+ * Opens and reads from a pcap file.
+ * This class is nothing but a simple wrapper around some functionality in {@link Pcaps} and {@link PcapHandle} which
+ * serves to simplify client code.
+ * Note that the file is read in offline mode, i.e., this class does not support live processing of packets.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class PcapReader {
+
+    private final PcapHandle mHandle;
+
+    /**
+     * Create a new {@code PcapReader} that reads the file specified by the absolute path {@code fileName}.
+     * @param fileName The absolute path to the pcap file to be read.
+     * @throws PcapNativeException If the pcap file cannot be opened.
+     */
+    public PcapReader(String fileName) throws PcapNativeException {
+        PcapHandle handle;
+        try {
+            handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO);
+        } catch (PcapNativeException pne) {
+            handle = Pcaps.openOffline(fileName);
+        }
+        mHandle = handle;
+    }
+
+    /**
+     * Reads the next packet in the pcap file.
+     * @return The next packet in the pcap file, or {@code null} if all packets have been read.
+     */
+    public PcapPacket readNextPacket() {
+        try {
+            return mHandle.getNextPacketEx();
+        } catch (EOFException eofe) {
+            return null;
+        } catch (PcapNativeException|TimeoutException|NotOpenException e) {
+            // Wrap checked exceptions in unchecked exceptions to simplify client code.
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java
new file mode 100644 (file)
index 0000000..0a66934
--- /dev/null
@@ -0,0 +1,218 @@
+package edu.uci.iotproject;
+
+import org.pcap4j.core.PcapPacket;
+import org.pcap4j.packet.IpV4Packet;
+import org.pcap4j.packet.TcpPacket;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TODO add class documentation.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class TcpReassembler implements PcapPacketConsumer {
+
+    /**
+     * Holds <em>open</em> {@link Conversation}s, i.e., {@code Conversation}s that have <em>not</em> been detected as
+     * (gracefully) terminated based on the set of packets observed thus far.
+     * A {@link Conversation} is moved to {@link #mTerminatedConversations} if it can be determined that it is has
+     * terminated. Termination can be detected by a) observing two {@link FinAckPair}s, one in each direction, (graceful
+     * termination, see {@link Conversation#isGracefullyShutdown()}) or b) by observing a SYN packet that matches the
+     * four tuple of an existing {@code Conversation}, but which holds a <em>different</em> sequence number than the
+     * same-direction SYN packet recorded for the {@code Conversation}.
+     * <p>
+     * Note that due to limitations of the {@link Set} interface (specifically, there is no {@code get(T t)} method),
+     * we have to resort to a {@link Map} (in which keys map to themselves) to "mimic" a set with {@code get(T t)}
+     * functionality.
+     *
+     * @see <a href="https://stackoverflow.com/questions/7283338/getting-an-element-from-a-set">this question on StackOverflow.com</a>
+     */
+    private final Map<Conversation, Conversation> mOpenConversations = new HashMap<>();
+
+    /**
+     * Holds <em>terminated</em> {@link Conversation}s.
+     */
+    private final Map<Conversation, Conversation> mTerminatedConversations = new HashMap<>();
+
+    @Override
+    public void consumePacket(PcapPacket pcapPacket) {
+        TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
+        if (tcpPacket == null) {
+            return;
+        }
+        // ... TODO?
+        processPacket(pcapPacket);
+    }
+
+    private void processPacket(PcapPacket pcapPacket) {
+        TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
+        // Handle client connection initiation attempts.
+        if (tcpPacket.getHeader().getSyn() && !tcpPacket.getHeader().getAck()) {
+            // A segment with the SYN flag set, but no ACK flag indicates that a client is attempting to initiate a new
+            // connection.
+            processNewConnectionRequest(pcapPacket);
+            return;
+        }
+        // Handle server connection initiation acknowledgement
+        if (tcpPacket.getHeader().getSyn() && tcpPacket.getHeader().getAck()) {
+            // A segment with both the SYN and ACK flags set indicates that the server has accepted the client's request
+            // to initiate a new connection.
+            processNewConnectionAck(pcapPacket);
+            return;
+        }
+        // Handle resets
+        if (tcpPacket.getHeader().getRst()) {
+            processRstPacket(pcapPacket);
+            return;
+        }
+        // Handle FINs
+        if (tcpPacket.getHeader().getFin()) {
+            // Handle FIN packet.
+            processFinPacket(pcapPacket);
+        }
+        // Handle ACKs (currently only ACKs of FINS)
+        if (tcpPacket.getHeader().getAck()) {
+            processAck(pcapPacket);
+        }
+        // Handle packets that carry payload (application data).
+        if (tcpPacket.getPayload() != null) {
+            processPayloadPacket(pcapPacket);
+        }
+    }
+
+    private void processNewConnectionRequest(PcapPacket clientSynPacket) {
+        // A SYN w/o ACK always originates from the client.
+        Conversation conv = Conversation.fromPcapPacket(clientSynPacket, true);
+        conv.addSynPacket(clientSynPacket);
+        // Is there an ongoing conversation for the same four tuple (clientIp, clientPort, serverIp, serverPort) as
+        // found in the new SYN packet?
+        Conversation ongoingConv = mOpenConversations.get(conv);
+        if (ongoingConv != null) {
+            if (ongoingConv.isRetransmission(clientSynPacket)) {
+                // SYN retransmission detected, do nothing.
+                return;
+                // TODO: the way retransmission detection is implemented may cause a bug for connections where we have
+                // not recorded the initial SYN, but only the SYN ACK, as retransmission is determined by comparing the
+                // sequence numbers of initial SYNs -- and if no initial SYN is present for the Conversation, the new
+                // SYN will be interpreted as a retransmission. Possible fix: let isRentransmission ALWAYS return false
+                // when presented with a SYN packet when the Conversation already holds a SYN ACK packet?
+            } else {
+                // New SYN has different sequence number than SYN recorded for ongoingConv, so this must be an attempt
+                // to establish a new conversation with the same four tuple as ongoingConv.
+                // Mark existing connection as terminated.
+                // TODO: is this 100% theoretically correct, e.g., if many connection attempts are made back to back? And RST packets?
+                mTerminatedConversations.put(ongoingConv, ongoingConv);
+                mOpenConversations.remove(ongoingConv);
+            }
+        }
+        // Finally, update the map of open connections with the new connection.
+        mOpenConversations.put(conv, conv);
+    }
+
+
+    /*
+     * TODO a problem across the board for all processXPacket methods below:
+     * if we start the capture in the middle of a TCP connection, we will not have an entry for the conversation in the
+     * map as we have not seen the initial SYN packet.
+     * Two ways we can address this:
+     * a) Perform null-checks and ignore packets for which we have not seen SYN
+     *    + easy to get correct
+     *    - we discard data (issue for long-lived connections!)
+     * b) Add a corresponding conversation entry whenever we encounter a packet that does not map to a conversation
+     *    + we consider all data
+     *    - not immediately clear if this will introduce bugs (incorrectly mapping packets to wrong conversations?)
+     *
+     *  [[[ I went with option b) for now; see getOngoingConversationOrCreateNew(PcapPacket pcapPacket). ]]]
+     */
+
+    private void processNewConnectionAck(PcapPacket srvSynPacket) {
+        // Find the corresponding ongoing connection, if any (if we start the capture just *after* the initial SYN, no
+        // ongoing conversation entry will exist, so it must be created in that case).
+//        Conversation conv = mOpenConversations.get(Conversation.fromPcapPacket(srvSynPacket, false));
+        Conversation conv = getOngoingConversationOrCreateNew(srvSynPacket);
+        // Note: exploits &&'s short-circuit operation: only attempts to add non-retransmissions.
+        if (!conv.isRetransmission(srvSynPacket) && !conv.addSynPacket(srvSynPacket)) {
+            // For safety/debugging: if NOT a retransmission and add fails,
+            // something has gone terribly wrong/invariant is broken.
+            throw new IllegalStateException("Attempt to add SYN ACK packet that was NOT a retransmission failed." +
+                    Conversation.class.getSimpleName() + " invariant broken.");
+        }
+    }
+
+    private void processRstPacket(PcapPacket rstPacket) {
+        Conversation conv = getOngoingConversationOrCreateNew(rstPacket);
+        // Move conversation to set of terminated conversations.
+        mTerminatedConversations.put(conv, conv);
+        mOpenConversations.remove(conv, conv);
+    }
+
+    private void processFinPacket(PcapPacket finPacket) {
+//        getOngoingConversationForPacket(finPacket).addFinPacket(finPacket);
+        getOngoingConversationOrCreateNew(finPacket).addFinPacket(finPacket);
+    }
+
+    private void processAck(PcapPacket ackPacket) {
+//        getOngoingConversationForPacket(ackPacket).attemptAcknowledgementOfFin(ackPacket);
+        // Note that unlike the style for SYN, FIN, and payload packets, for "ACK only" packets, we want to avoid
+        // creating a new conversation.
+        Conversation conv = getOngoingConversationForPacket(ackPacket);
+        if (conv != null) {
+            // The ACK may be an ACK of a FIN, so attempt to mark the FIN as ack'ed.
+            conv.attemptAcknowledgementOfFin(ackPacket);
+            if (conv.isGracefullyShutdown()) {
+                // Move conversation to set of terminated conversations.
+                mTerminatedConversations.put(conv, conv);
+                mOpenConversations.remove(conv);
+            }
+        }
+        // Note: add (additional) processing of ACKs (that are not ACKs of FINs) as necessary here...
+    }
+
+    private void processPayloadPacket(PcapPacket pcapPacket) {
+//        getOngoingConversationForPacket(pcapPacket).addPacket(pcapPacket, true);
+        getOngoingConversationOrCreateNew(pcapPacket).addPacket(pcapPacket, true);
+    }
+
+    /**
+     * Locates an ongoing conversation (if any) that {@code pcapPacket} pertains to.
+     * @param pcapPacket The packet that is to be mapped to an ongoing {@code Conversation}.
+     * @return The {@code Conversation} matching {@code pcapPacket} or {@code null} if there is no match.
+     */
+    private Conversation getOngoingConversationForPacket(PcapPacket pcapPacket) {
+        // We cannot know if this is a client-to-server or server-to-client packet without trying both options...
+        Conversation conv = mOpenConversations.get(Conversation.fromPcapPacket(pcapPacket, true));
+        if (conv == null) {
+            conv = mOpenConversations.get(Conversation.fromPcapPacket(pcapPacket, false));
+        }
+        return conv;
+    }
+
+    /**
+     * Like {@link #getOngoingConversationForPacket(PcapPacket)}, but creates and inserts a new {@code Conversation}
+     * into {@link #mOpenConversations} if no open conversation is found (i.e., in the case that
+     * {@link #getOngoingConversationForPacket(PcapPacket)} returns {@code null}).
+     *
+     * @param pcapPacket The packet that is to be mapped to an ongoing {@code Conversation}.
+     * @return The existing, ongoing {@code Conversation} matching {@code pcapPacket} or the newly created one in case
+     *         no match was found.
+     */
+    private Conversation getOngoingConversationOrCreateNew(PcapPacket pcapPacket) {
+        Conversation conv = getOngoingConversationForPacket(pcapPacket);
+        if (conv == null) {
+            TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
+            if (tcpPacket.getHeader().getSyn() && tcpPacket.getHeader().getAck()) {
+                // A SYN ACK packet always originates from the server (it is a reply to the initial SYN packet from the client)
+                conv = Conversation.fromPcapPacket(pcapPacket, false);
+            } else {
+                // TODO: can we do anything else but arbitrarily select who is designated as the server in this case?
+                conv = Conversation.fromPcapPacket(pcapPacket, false);
+            }
+            mOpenConversations.put(conv, conv);
+        }
+        return conv;
+    }
+}