Adding more devices into the experimental setup.
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / TcpReassembler.java
index 0a66934ed5ecec0ed03070a2792a2122609088a9..8c91c82b75c033534205cc707a6989605b1c78b9 100644 (file)
@@ -1,20 +1,20 @@
 package edu.uci.iotproject;
 
+import org.pcap4j.core.PacketListener;
 import org.pcap4j.core.PcapPacket;
 import org.pcap4j.packet.IpV4Packet;
 import org.pcap4j.packet.TcpPacket;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 /**
- * TODO add class documentation.
+ * Reassembles TCP conversations (streams).
+ * <b>Note: current version only supports TCP over IPv4.</b>
  *
  * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
  * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
  */
-public class TcpReassembler implements PcapPacketConsumer {
+public class TcpReassembler implements PacketListener {
 
     /**
      * Holds <em>open</em> {@link Conversation}s, i.e., {@code Conversation}s that have <em>not</em> been detected as
@@ -36,18 +36,31 @@ public class TcpReassembler implements PcapPacketConsumer {
     /**
      * Holds <em>terminated</em> {@link Conversation}s.
      */
-    private final Map<Conversation, Conversation> mTerminatedConversations = new HashMap<>();
+    private final List<Conversation> mTerminatedConversations = new ArrayList<>();
 
     @Override
-    public void consumePacket(PcapPacket pcapPacket) {
+    public void gotPacket(PcapPacket pcapPacket) {
+        IpV4Packet ipPacket = pcapPacket.get(IpV4Packet.class);
         TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
-        if (tcpPacket == null) {
+        if (ipPacket == null || tcpPacket == null) {
             return;
         }
         // ... TODO?
         processPacket(pcapPacket);
     }
 
+    /**
+     * Get the reassembled TCP connections. Note that if this is called while packets are still being processed (by
+     * calls to {@link #gotPacket(PcapPacket)}), the behavior is undefined and the returned list may be inconsistent.
+     * @return The reassembled TCP connections.
+     */
+    public List<Conversation> getTcpConversations() {
+        ArrayList<Conversation> combined = new ArrayList<>();
+        combined.addAll(mTerminatedConversations);
+        combined.addAll(mOpenConversations.values());
+        return combined;
+    }
+
     private void processPacket(PcapPacket pcapPacket) {
         TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class);
         // Handle client connection initiation attempts.
@@ -105,7 +118,7 @@ public class TcpReassembler implements PcapPacketConsumer {
                 // to establish a new conversation with the same four tuple as ongoingConv.
                 // Mark existing connection as terminated.
                 // TODO: is this 100% theoretically correct, e.g., if many connection attempts are made back to back? And RST packets?
-                mTerminatedConversations.put(ongoingConv, ongoingConv);
+                mTerminatedConversations.add(ongoingConv);
                 mOpenConversations.remove(ongoingConv);
             }
         }
@@ -138,15 +151,17 @@ public class TcpReassembler implements PcapPacketConsumer {
         if (!conv.isRetransmission(srvSynPacket) && !conv.addSynPacket(srvSynPacket)) {
             // For safety/debugging: if NOT a retransmission and add fails,
             // something has gone terribly wrong/invariant is broken.
-            throw new IllegalStateException("Attempt to add SYN ACK packet that was NOT a retransmission failed." +
+            throw new AssertionError("Attempt to add SYN ACK packet that was NOT a retransmission failed." +
                     Conversation.class.getSimpleName() + " invariant broken.");
         }
     }
 
     private void processRstPacket(PcapPacket rstPacket) {
         Conversation conv = getOngoingConversationOrCreateNew(rstPacket);
+        // Add RST packet to conversation.
+        conv.addRstPacket(rstPacket);
         // Move conversation to set of terminated conversations.
-        mTerminatedConversations.put(conv, conv);
+        mTerminatedConversations.add(conv);
         mOpenConversations.remove(conv, conv);
     }
 
@@ -165,7 +180,7 @@ public class TcpReassembler implements PcapPacketConsumer {
             conv.attemptAcknowledgementOfFin(ackPacket);
             if (conv.isGracefullyShutdown()) {
                 // Move conversation to set of terminated conversations.
-                mTerminatedConversations.put(conv, conv);
+                mTerminatedConversations.add(conv);
                 mOpenConversations.remove(conv);
             }
         }
@@ -209,7 +224,12 @@ public class TcpReassembler implements PcapPacketConsumer {
                 conv = Conversation.fromPcapPacket(pcapPacket, false);
             } else {
                 // TODO: can we do anything else but arbitrarily select who is designated as the server in this case?
-                conv = Conversation.fromPcapPacket(pcapPacket, false);
+                // We can check if the IP prefix matches a local IP when handling traffic observed inside the local
+                // network, but that obviously won't be a useful strategy for an observer at the WAN port.
+                String srcIp = pcapPacket.get(IpV4Packet.class).getHeader().getSrcAddr().getHostAddress();
+                // TODO: REPLACE THE ROUTER'S IP WITH A PARAMETER!!!
+                boolean clientIsSrc = srcIp.startsWith("10.0.1.") || srcIp.startsWith("192.168.1.") || srcIp.equals("128.195.205.105");
+                conv = Conversation.fromPcapPacket(pcapPacket, clientIsSrc);
             }
             mOpenConversations.put(conv, conv);
         }