Reorganize code by creating a package for code that reassembles traffic flows at...
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / detection / ClusterMatcher.java
index 027717e656f8612f271451af2fa2aa88e8a63008..1ff2c0215b4eef77e7befe5d2afd0e60844bc6e8 100644 (file)
@@ -1,7 +1,7 @@
 package edu.uci.iotproject.detection;
 
-import edu.uci.iotproject.Conversation;
-import edu.uci.iotproject.TcpReassembler;
+import edu.uci.iotproject.trafficreassembly.layer3.Conversation;
+import edu.uci.iotproject.trafficreassembly.layer3.TcpReassembler;
 import edu.uci.iotproject.analysis.TcpConversationUtils;
 import edu.uci.iotproject.io.PcapHandleReader;
 import edu.uci.iotproject.util.PrintUtils;
@@ -14,7 +14,8 @@ import java.util.stream.Collectors;
 import static edu.uci.iotproject.util.PcapPacketUtils.*;
 
 /**
- * TODO add class documentation.
+ * Searches a traffic trace for sequences of packets "belong to" a given cluster (in other words, attempts to classify
+ * traffic as pertaining to a given cluster).
  *
  * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
  * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
@@ -29,7 +30,7 @@ public class ClusterMatcher implements PacketListener {
         final String inputPcapFile = path + "/2018-07/dlink/dlink.wlan1.local.pcap";
         final String signatureFile = path + "/2018-07/dlink/offSignature1.sig";
 
-        List<List<PcapPacket>> signature = PrintUtils.serializeClustersFromFile(signatureFile);
+        List<List<PcapPacket>> signature = PrintUtils.deserializeClustersFromFile(signatureFile);
         ClusterMatcher clusterMatcher = new ClusterMatcher(signature, null,
                 (sig, match) -> System.out.println(
                         String.format("[ !!! SIGNATURE DETECTED AT %s !!! ]",
@@ -49,14 +50,15 @@ public class ClusterMatcher implements PacketListener {
     }
 
     /**
-     * The signature that this {@link ClusterMatcher} is trying to detect in the observed traffic.
+     * The cluster that describes the sequence of packets that this {@link ClusterMatcher} is trying to detect in the
+     * observed traffic.
      */
-    private final List<List<PcapPacket>> mSignature;
+    private final List<List<PcapPacket>> mCluster;
 
     /**
-     * The directions of packets in the sequences that make up {@link #mSignature}.
+     * The ordered directions of packets in the sequences that make up {@link #mCluster}.
      */
-    private final Conversation.Direction[] mSignatureDirections;
+    private final Conversation.Direction[] mClusterMemberDirections;
 
     /**
      * For reassembling the observed traffic into TCP connections.
@@ -68,34 +70,48 @@ public class ClusterMatcher implements PacketListener {
      */
     private final String mRouterWanIp;
 
-    private final Observer[] mObservers;
+    private final ClusterMatchObserver[] mObservers;
 
-    public ClusterMatcher(List<List<PcapPacket>> signature, String routerWanIp, Observer... detectionObservers) {
-        mSignature = Collections.unmodifiableList(Objects.requireNonNull(signature, "signature cannot be null"));
-        mObservers = Objects.requireNonNull(detectionObservers, "detectionObservers cannot be null");
-        if (mSignature.isEmpty() || mSignature.stream().anyMatch(inner -> inner.isEmpty())) {
-            throw new IllegalArgumentException("signature is empty (or contains empty inner List)");
+    /**
+     * Create a {@link ClusterMatcher}.
+     * @param cluster The cluster that traffic is matched against.
+     * @param routerWanIp The router's WAN IP if examining traffic captured at the ISP's point of view (used for
+     *                    determining the direction of packets).
+     * @param detectionObservers Client code that wants to get notified whenever the {@link ClusterMatcher} detects that
+     *                          (a subset of) the examined traffic is similar to the traffic that makes up
+     *                          {@code cluster}, i.e., when the examined traffic is classified as pertaining to
+     *                          {@code cluster}.
+     */
+    public ClusterMatcher(List<List<PcapPacket>> cluster, String routerWanIp, ClusterMatchObserver... detectionObservers) {
+        // ===================== PRECONDITION SECTION =====================
+        cluster = Objects.requireNonNull(cluster, "cluster cannot be null");
+        if (cluster.isEmpty() || cluster.stream().anyMatch(inner -> inner.isEmpty())) {
+            throw new IllegalArgumentException("cluster is empty (or contains an empty inner List)");
         }
+        mObservers = Objects.requireNonNull(detectionObservers, "detectionObservers cannot be null");
         if (mObservers.length == 0) {
             throw new IllegalArgumentException("no detectionObservers provided");
         }
-        mRouterWanIp = routerWanIp;
-        // Build the signature's direction sequence.
-        // Note: assumes that the provided signature was captured within the local network (routerWanIp is set to null).
-        mSignatureDirections = getPacketDirections(mSignature.get(0), null);
+        // Build the cluster members' direction sequence.
+        // Note: assumes that the provided cluster was captured within the local network (routerWanIp is set to null).
+        mClusterMemberDirections = getPacketDirections(cluster.get(0), null);
         /*
-         * Enforce restriction on cluster/signature members: all representatives must exhibit the same direction pattern
-         * and contain the same number of packets. Note that this is a somewhat heavy operation, so it may be disabled
-         * later on in favor of performance. However, it is only run once (at instantiation), so the overhead may be
-         * warranted in order to ensure correctness, especially during the development/debugging phase.
+         * Enforce restriction on cluster members: all representatives must exhibit the same direction pattern and
+         * contain the same number of packets. Note that this is a somewhat heavy operation, so it may be disabled later
+         * on in favor of performance. However, it is only run once (at instantiation), so the overhead may be warranted
+         * in order to ensure correctness, especially during the development/debugging phase.
          */
-        if (mSignature.stream().
-                anyMatch(inner -> !Arrays.equals(mSignatureDirections, getPacketDirections(inner, null)))) {
+        if (cluster.stream().
+                anyMatch(inner -> !Arrays.equals(mClusterMemberDirections, getPacketDirections(inner, null)))) {
             throw new IllegalArgumentException(
-                    "signature members must contain the same number of packets and exhibit the same packet direction " +
+                    "cluster members must contain the same number of packets and exhibit the same packet direction " +
                             "pattern"
             );
         }
+        // ================================================================
+        // Prune the provided cluster.
+        mCluster = pruneCluster(cluster);
+        mRouterWanIp = routerWanIp;
     }
 
     @Override
@@ -104,34 +120,13 @@ public class ClusterMatcher implements PacketListener {
         mTcpReassembler.gotPacket(packet);
     }
 
-
-//    public void performDetection() {
-//        // Let's start out simple by building a version that only works for signatures that do not span across multiple
-//        // TCP conversations...
-//        for (Conversation c : mTcpReassembler.getTcpConversations()) {
-//            for (List<PcapPacket> sequence : mSignature) {
-//                boolean matchFound = isSequenceInConversation(sequence, c);
-//                if (matchFound) {
-//                    for (Observer obs : mObservers) {
-//                        obs.onSequenceDetected(sequence, c);
-//                    }
-//                    // Found signature in current conversation, so break inner loop and continue with next conversation.
-//                    // TODO: signature can be present more than once in Conversation...
-//                    break;
-//                }
-//            }
-//            /*
-//             * TODO:
-//             * if no item in cluster matches, also perform a distance-based matching to cover those cases where we did
-//             * not manage to capture every single mutation of the sequence during training.
-//             *
-//             * Need to compute average/centroid of cluster to do so...? Compute within-cluster variance, then check if
-//             * distance between input conversation and cluster average/centroid is smaller than or equal to the computed
-//             * variance?
-//             */
-//        }
-//    }
-
+    /**
+     * Get the cluster that describes the packet sequence that this {@link ClusterMatcher} is searching for.
+     * @return the cluster that describes the packet sequence that this {@link ClusterMatcher} is searching for.
+     */
+    public List<List<PcapPacket>> getCluster() {
+        return mCluster;
+    }
 
     public void performDetection() {
         /*
@@ -143,7 +138,7 @@ public class ClusterMatcher implements PacketListener {
                 // Skip empty conversations.
                 continue;
             }
-            for (List<PcapPacket> signatureSequence : mSignature) {
+            for (List<PcapPacket> signatureSequence : mCluster) {
                 if (isTlsSequence(signatureSequence) != c.isTls()) {
                     // We consider it a mismatch if one is a TLS application data sequence and the other is not.
                     continue;
@@ -154,24 +149,16 @@ public class ClusterMatcher implements PacketListener {
                  * Note: we embed the attempt to detect the signature sequence in a loop in order to capture those cases
                  * where the same signature sequence appears multiple times in one Conversation.
                  *
-                 * Note: as the cluster can be made up of identical sequences, we must keep track of whether we detected
-                 * a match and, if so, break the inner for-each loop in order to prevent raising an alarm for each
-                 * cluster-member (prevent duplicate detections of the same event). However, a negative side-effect of
-                 * this is that, in doing so, we will also skip searching for subsequent different cluster members in
-                 * the current conversation if the current cluster member is a match.
-                 *
                  * Note: since we expect all sequences that together make up the signature to exhibit the same direction
                  * pattern, we can simply pass the precomputed direction array for the signature sequence so that it
                  * won't have to be recomputed internally in each call to findSubsequenceInSequence().
                  */
                 Optional<List<PcapPacket>> match;
-                boolean matchFound = false;
-                while ((match = findSubsequenceInSequence(signatureSequence, cPkts, mSignatureDirections, null)).
+                while ((match = findSubsequenceInSequence(signatureSequence, cPkts, mClusterMemberDirections, null)).
                         isPresent()) {
-                    matchFound = true;
                     List<PcapPacket> matchSeq = match.get();
                     // Notify observers about the match.
-                    Arrays.stream(mObservers).forEach(o -> o.onSignatureDetected(mSignature, matchSeq));
+                    Arrays.stream(mObservers).forEach(o -> o.onMatch(ClusterMatcher.this, matchSeq));
                     /*
                      * Get the index in cPkts of the last packet in the sequence of packets that matches the searched
                      * signature sequence.
@@ -180,96 +167,29 @@ public class ClusterMatcher implements PacketListener {
                     // We restart the search for the signature sequence immediately after that index, so truncate cPkts.
                     cPkts = cPkts.stream().skip(matchSeqEndIdx + 1).collect(Collectors.toList());
                 }
-                if (matchFound) {
-                    // Break inner for-each loop in order to avoid duplicate detection of same event (see comment above)
-                    break;
-                }
-
-
-//                match.ifPresent(ps -> Arrays.stream(mObservers).forEach(o -> o.onSignatureDetected(mSignature, ps)));
-//                if (match.isPresent()) {
-//                    /*
-//                     * We found an element in the signature cluster that was present in conversation, so no need to scan
-//                     * conversation for remaining members of signature cluster (in fact, we'd be getting duplicate
-//                     * output in those cases where the cluster is made up of identical sequences if we did not stop the
-//                     * search here).
-//                     *
-//                     * TODO:
-//                     * How do we handle those cases where the conversation matches the signature more than once (for
-//                     * example, the long-lived connections used for sending the trigger from the cloud)?
-//                     */
-//                    break;
-//                }
             }
+            /*
+             * TODO:
+             * if no item in cluster matches, also perform a distance-based matching to cover those cases where we did
+             * not manage to capture every single mutation of the sequence during training.
+             *
+             * Need to compute average/centroid of cluster to do so...? Compute within-cluster variance, then check if
+             * distance between input conversation and cluster average/centroid is smaller than or equal to the computed
+             * variance?
+             */
         }
     }
 
-//    /**
-//     * Examine if a {@link Conversation} contains a given sequence of packets. Note: the current implementation actually
-//     * searches for a substring as it does not allow for interleaved packets in {@code c} that are not in
-//     * {@code sequence}; for example, if {@code sequence} consists of packet lengths [2, 3, 5] and {@code c} consists of
-//     * packet lengths [2, 3, 4, 5], the result will be {@code false}. If we are to allow interleaved packets, we need
-//     * a modified version of <a href="https://stackoverflow.com/a/20545604/1214974">this</a>.
-//     * @param sequence The sequence to look for.
-//     * @param c The {@link Conversation} to search for {@code sequence} in.
-//     * @return {@code true} if {@code c} contains {@code sequence}, {@code false} otherwise.
-//     */
-//    private boolean isSequenceInConversation(List<PcapPacket> sequence, Conversation c) {
-//        // TODO add offset argument to allow looking for sequence starting later in Conversation.
-//        // The packets we match against differ depending on whether the signature is a TLS or non-TLS signature.
-//        boolean tlsSequence = isTlsSequence(sequence);
-//        if (tlsSequence && !c.isTls()) {
-//            // If we're looking for a TLS signature and this conversation does not appear to be a TLS conversation, we
-//            // are done. Note: this assumes that they do NOT start performing TLS on new ports that are not captured in
-//            // Conversation.isTls()
-//            return false;
-//        }
-//        // Based on TLS or non-TLS signature, fetch the corresponding list of packets to match against.
-//        List<PcapPacket> packets = tlsSequence ? c.getTlsApplicationDataPackets() : c.getPackets();
-//        // If sequence is longer than the conversation, it can obviously not be contained in the conversation.
-//        if (packets.size() < sequence.size()) {
-//            return false;
-//        }
-//        /*
-//         * Generate packet direction array for c. We have already generated the packet direction array for sequence as
-//         * part of the constructor (mSignatureDirections).
-//         */
-//        Conversation.Direction[] cDirections = getPacketDirections(packets, mRouterWanIp);
-//        int seqIdx = 0;
-//        int convIdx = 0;
-//        while (convIdx < packets.size()) {
-//            PcapPacket seqPkt = sequence.get(seqIdx);
-//            PcapPacket convPkt = packets.get(convIdx);
-//            // We only have a match if packet lengths and directions match.
-//            if (convPkt.getOriginalLength() == seqPkt.getOriginalLength() &&
-//                    mSignatureDirections[seqIdx] == cDirections[convIdx]) {
-//                // A match, advance both indices to consider next packet in sequence vs. next packet in conversation
-//                seqIdx++;
-//                convIdx++;
-//                if (seqIdx == sequence.size()) {
-//                    // we managed to match the full sequence in the conversation.
-//                    return true;
-//                }
-//            } else {
-//                // Mismatch.
-//                if (seqIdx > 0) {
-//                    /*
-//                     * If we managed to match parts of sequence, we restart the search for sequence in c at the index of
-//                     * c where the current mismatch occurred. I.e., we must reset seqIdx, but leave convIdx untouched.
-//                     */
-//                    seqIdx = 0;
-//                } else {
-//                    /*
-//                     * First packet of sequence didn't match packet at convIdx of conversation, so we move forward in
-//                     * conversation, i.e., we continue the search for sequence in c starting at index convIdx+1 of c.
-//                     */
-//                    convIdx++;
-//                }
-//            }
-//        }
-//        return false;
-//    }
-
+    /**
+     * Checks if {@code sequence} is a sequence of TLS packets. Note: the current implementation relies on inspection
+     * of the port numbers when deciding between TLS vs. non-TLS. Therefore, only the first packet of {@code sequence}
+     * is examined as it is assumed that all packets in {@code sequence} pertain to the same {@link Conversation} and
+     * hence share the same set of two src/dst port numbers (albeit possibly alternating between which one is the src
+     * and which one is the dst, as packets in {@code sequence} may be in alternating directions).
+     * @param sequence The sequence of packets for which it is to be determined if it is a sequence of TLS packets or
+     *                 non-TLS packets.
+     * @return {@code true} if {@code sequence} is a sequence of TLS packets, {@code false} otherwise.
+     */
     private boolean isTlsSequence(List<PcapPacket> sequence) {
         // NOTE: Assumes ALL packets in sequence pertain to the same TCP connection!
         PcapPacket firstPkt = sequence.get(0);
@@ -278,15 +198,29 @@ public class ClusterMatcher implements PacketListener {
         return TcpConversationUtils.isTlsPort(srcPort) || TcpConversationUtils.isTlsPort(dstPort);
     }
 
-//    private List<PcapPacket> findeSequenceInConversation(List<PcapPacket> sequence, Conversation conv, int offset) {
-//        if (isTlsSequence(sequence) != conv.isTls()) {
-//            // We consider it a mismatch if one is a TLS Application Data sequence and the other is not.
-//            return null;
-//        }
-//        List<PcapPacket> convPackets = conv.isTls() ? conv.getTlsApplicationDataPackets() : conv.getPackets();
-//
-//    }
-
+    /**
+     * Examine if a given sequence of packets ({@code sequence}) contains a given shorter sequence of packets
+     * ({@code subsequence}). Note: the current implementation actually searches for a substring as it does not allow
+     * for interleaving packets in {@code sequence} that are not in {@code subsequence}; for example, if
+     * {@code subsequence} consists of packet lengths [2, 3, 5] and {@code sequence} consists of  packet lengths
+     * [2, 3, 4, 5], the result will be that there is no match (because of the interleaving 4). If we are to allow
+     * interleaving packets, we need a modified version of
+     * <a href="https://stackoverflow.com/a/20545604/1214974">this</a>.
+     *
+     * @param subsequence The sequence to search for.
+     * @param sequence The sequence to search.
+     * @param subsequenceDirections The directions of packets in {@code subsequence} such that for all {@code i},
+     *                              {@code subsequenceDirections[i]} is the direction of the packet returned by
+     *                              {@code subsequence.get(i)}. May be set to {@code null}, in which this call will
+     *                              internally compute the packet directions.
+     * @param sequenceDirections The directions of packets in {@code sequence} such that for all {@code i},
+     *                           {@code sequenceDirections[i]} is the direction of the packet returned by
+     *                           {@code sequence.get(i)}. May be set to {@code null}, in which this call will internally
+     *                           compute the packet directions.
+     *
+     * @return An {@link Optional} containing the part of {@code sequence} that matches {@code subsequence}, or an empty
+     *         {@link Optional} if no part of {@code sequence} matches {@code subsequence}.
+     */
     private Optional<List<PcapPacket>> findSubsequenceInSequence(List<PcapPacket> subsequence,
                                                                  List<PcapPacket> sequence,
                                                                  Conversation.Direction[] subsequenceDirections,
@@ -349,6 +283,35 @@ public class ClusterMatcher implements PacketListener {
         return Optional.empty();
     }
 
+    /**
+     * Given a cluster, produces a pruned version of that cluster. In the pruned version, there are no duplicate cluster
+     * members. Two cluster members are considered identical if their packets lengths and packet directions are
+     * identical. The resulting pruned cluster is unmodifiable (this applies to both the outermost list as well as the
+     * nested lists) in order to preserve its integrity when exposed to external code (e.g., through
+     * {@link #getCluster()}).
+     *
+     * @param cluster A cluster to prune.
+     * @return The resulting pruned cluster.
+     */
+    private final List<List<PcapPacket>> pruneCluster(List<List<PcapPacket>> cluster) {
+        List<List<PcapPacket>> prunedCluster = new ArrayList<>();
+        for (List<PcapPacket> originalClusterSeq : cluster) {
+            boolean alreadyPresent = false;
+            for (List<PcapPacket> prunedClusterSeq : prunedCluster) {
+                Optional<List<PcapPacket>> duplicate = findSubsequenceInSequence(originalClusterSeq, prunedClusterSeq,
+                        mClusterMemberDirections, mClusterMemberDirections);
+                if (duplicate.isPresent()) {
+                    alreadyPresent = true;
+                    break;
+                }
+            }
+            if (!alreadyPresent) {
+                prunedCluster.add(Collections.unmodifiableList(originalClusterSeq));
+            }
+        }
+        return Collections.unmodifiableList(prunedCluster);
+    }
+
     /**
      * Given a {@code List<PcapPacket>}, generate a {@code Conversation.Direction[]} such that each entry in the
      * resulting {@code Conversation.Direction[]} specifies the direction of the {@link PcapPacket} at the corresponding
@@ -379,16 +342,21 @@ public class ClusterMatcher implements PacketListener {
         return directions;
     }
 
-    interface Observer {
-//        /**
-//         * Callback that is invoked when a sequence associated with the signature/cluster (i.e., the sequence is a
-//         * member of the cluster that makes up the signature) is detected in a {@link Conversation}.
-//         * @param sequence The sequence that was detected in {@code conversation}.
-//         * @param conversation The {@link Conversation} that {@code sequence} was detected in.
-//         */
-//        void onSequenceDetected(List<PcapPacket> sequence, Conversation conversation);
-
-        void onSignatureDetected(List<List<PcapPacket>> signature, List<PcapPacket> match);
+    /**
+     * Interface used by client code to register for receiving a notification whenever the {@link ClusterMatcher}
+     * detects traffic that is similar to the traffic that makes up the cluster returned by
+     * {@link ClusterMatcher#getCluster()}.
+     */
+    interface ClusterMatchObserver {
+        /**
+         * Callback that is invoked whenever a sequence that is similar to a sequence associated with the cluster (i.e.,
+         * a sequence is a member of the cluster) is detected in the traffic that the associated {@link ClusterMatcher}
+         * observes.
+         * @param clusterMatcher The {@link ClusterMatcher} that detected a match (classified traffic as pertaining to
+         *                       its associated cluster).
+         * @param match The traffic that was deemed to match the cluster associated with {@code clusterMatcher}.
+         */
+        void onMatch(ClusterMatcher clusterMatcher, List<PcapPacket> match);
     }
 
 }