From: rtrimana Date: Tue, 9 Oct 2018 00:42:33 +0000 (-0700) Subject: Fixing conflict for SignatureDetector.java. X-Git-Url: http://plrg.eecs.uci.edu/git/?p=pingpong.git;a=commitdiff_plain;h=c97e080c97a2efce5db17e4e24e67db97fa05643;hp=1ed1d61f9b357b0f2e9aed36d3aac6f8cd68d1d3 Fixing conflict for SignatureDetector.java. --- diff --git a/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_main.iml b/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_main.iml index 740525e..15e9a30 100644 --- a/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_main.iml +++ b/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_main.iml @@ -5,7 +5,6 @@ - @@ -13,6 +12,7 @@ + diff --git a/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_test.iml b/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_test.iml index 8d4ae84..d8e4114 100644 --- a/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_test.iml +++ b/Code/Projects/SmartPlugDetector/.idea/modules/SmartPlugDetector_test.iml @@ -5,7 +5,6 @@ - @@ -14,6 +13,7 @@ + diff --git a/Code/Projects/SmartPlugDetector/build.gradle b/Code/Projects/SmartPlugDetector/build.gradle index bb07bce..f8f539c 100644 --- a/Code/Projects/SmartPlugDetector/build.gradle +++ b/Code/Projects/SmartPlugDetector/build.gradle @@ -26,4 +26,7 @@ dependencies { // Apache Commons Math for clustering compile 'org.apache.commons:commons-math3:3.6.1' + + // JGraphT: Java Graph library + compile 'org.jgrapht:jgrapht-core:1.2.0' } \ No newline at end of file diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/ClusterMatcher.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/ClusterMatcher.java new file mode 100644 index 0000000..279ceea --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/ClusterMatcher.java @@ -0,0 +1,341 @@ +package edu.uci.iotproject.detection; + +import edu.uci.iotproject.Conversation; +import edu.uci.iotproject.TcpReassembler; +import edu.uci.iotproject.analysis.TcpConversationUtils; +import edu.uci.iotproject.io.PcapHandleReader; +import edu.uci.iotproject.util.PrintUtils; +import org.pcap4j.core.*; + +import java.time.ZoneId; +import java.util.*; +import java.util.stream.Collectors; + +import static edu.uci.iotproject.util.PcapPacketUtils.*; + +/** + * Searches a traffic trace for sequences of packets "belong to" a given cluster (in other words, attempts to classify + * traffic as pertaining to a given cluster). + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class ClusterMatcher implements PacketListener { + + // Test client + public static void main(String[] args) throws PcapNativeException, NotOpenException { + +// String path = "/scratch/July-2018"; // Rahmadi + String path = "/Users/varmarken/temp/UCI IoT Project/experiments"; // Janus + final String inputPcapFile = path + "/2018-07/dlink/dlink.wlan1.local.pcap"; + final String signatureFile = path + "/2018-07/dlink/offSignature1.sig"; + + List> signature = PrintUtils.deserializeClustersFromFile(signatureFile); + ClusterMatcher clusterMatcher = new ClusterMatcher(signature, null, + (sig, match) -> System.out.println( + String.format("[ !!! SIGNATURE DETECTED AT %s !!! ]", + match.get(0).getTimestamp().atZone(ZoneId.of("America/Los_Angeles"))) + ) + ); + + PcapHandle handle; + try { + handle = Pcaps.openOffline(inputPcapFile, PcapHandle.TimestampPrecision.NANO); + } catch (PcapNativeException pne) { + handle = Pcaps.openOffline(inputPcapFile); + } + PcapHandleReader reader = new PcapHandleReader(handle, p -> true, clusterMatcher); + reader.readFromHandle(); + clusterMatcher.performDetection(); + } + + /** + * The cluster that describes the sequence of packets that this {@link ClusterMatcher} is trying to detect in the + * observed traffic. + */ + private final List> mCluster; + + /** + * The ordered directions of packets in the sequences that make up {@link #mCluster}. + */ + private final Conversation.Direction[] mClusterMemberDirections; + + /** + * For reassembling the observed traffic into TCP connections. + */ + private final TcpReassembler mTcpReassembler = new TcpReassembler(); + + /** + * IP of the router's WAN port (if analyzed traffic is captured at the ISP's point of view). + */ + private final String mRouterWanIp; + + private final ClusterMatchObserver[] mObservers; + + /** + * Create a {@link ClusterMatcher}. + * @param cluster The cluster that traffic is matched against. + * @param routerWanIp The router's WAN IP if examining traffic captured at the ISP's point of view (used for + * determining the direction of packets). + * @param detectionObservers Client code that wants to get notified whenever the {@link ClusterMatcher} detects that + * (a subset of) the examined traffic is similar to the traffic that makes up + * {@code cluster}, i.e., when the examined traffic is classified as pertaining to + * {@code cluster}. + */ + public ClusterMatcher(List> cluster, String routerWanIp, ClusterMatchObserver... detectionObservers) { + mCluster = Collections.unmodifiableList(Objects.requireNonNull(cluster, "cluster cannot be null")); + mObservers = Objects.requireNonNull(detectionObservers, "detectionObservers cannot be null"); + if (mCluster.isEmpty() || mCluster.stream().anyMatch(inner -> inner.isEmpty())) { + throw new IllegalArgumentException("cluster is empty (or contains an empty inner List)"); + } + if (mObservers.length == 0) { + throw new IllegalArgumentException("no detectionObservers provided"); + } + mRouterWanIp = routerWanIp; + // Build the cluster members' direction sequence. + // Note: assumes that the provided cluster was captured within the local network (routerWanIp is set to null). + mClusterMemberDirections = getPacketDirections(mCluster.get(0), null); + /* + * Enforce restriction on cluster members: all representatives must exhibit the same direction pattern and + * contain the same number of packets. Note that this is a somewhat heavy operation, so it may be disabled later + * on in favor of performance. However, it is only run once (at instantiation), so the overhead may be warranted + * in order to ensure correctness, especially during the development/debugging phase. + */ + if (mCluster.stream(). + anyMatch(inner -> !Arrays.equals(mClusterMemberDirections, getPacketDirections(inner, null)))) { + throw new IllegalArgumentException( + "cluster members must contain the same number of packets and exhibit the same packet direction " + + "pattern" + ); + } + } + + @Override + public void gotPacket(PcapPacket packet) { + // Present packet to TCP reassembler so that it can be mapped to a connection (if it is a TCP packet). + mTcpReassembler.gotPacket(packet); + } + + /** + * Get the cluster that describes the packet sequence that this {@link ClusterMatcher} is searching for. + * @return the cluster that describes the packet sequence that this {@link ClusterMatcher} is searching for. + */ + public List> getCluster() { + return mCluster; + } + + public void performDetection() { + /* + * Let's start out simple by building a version that only works for signatures that do not span across multiple + * TCP conversations... + */ + for (Conversation c : mTcpReassembler.getTcpConversations()) { + if (c.isTls() && c.getTlsApplicationDataPackets().isEmpty() || !c.isTls() && c.getPackets().isEmpty()) { + // Skip empty conversations. + continue; + } + for (List signatureSequence : mCluster) { + if (isTlsSequence(signatureSequence) != c.isTls()) { + // We consider it a mismatch if one is a TLS application data sequence and the other is not. + continue; + } + // Fetch set of packets to examine based on TLS or not. + List cPkts = c.isTls() ? c.getTlsApplicationDataPackets() : c.getPackets(); + /* + * Note: we embed the attempt to detect the signature sequence in a loop in order to capture those cases + * where the same signature sequence appears multiple times in one Conversation. + * + * Note: as the cluster can be made up of identical sequences, we must keep track of whether we detected + * a match and, if so, break the inner for-each loop in order to prevent raising an alarm for each + * cluster-member (prevent duplicate detections of the same event). However, a negative side-effect of + * this is that, in doing so, we will also skip searching for subsequent different cluster members in + * the current conversation if the current cluster member is a match. + * + * Note: since we expect all sequences that together make up the signature to exhibit the same direction + * pattern, we can simply pass the precomputed direction array for the signature sequence so that it + * won't have to be recomputed internally in each call to findSubsequenceInSequence(). + */ + Optional> match; + boolean matchFound = false; + while ((match = findSubsequenceInSequence(signatureSequence, cPkts, mClusterMemberDirections, null)). + isPresent()) { + matchFound = true; + List matchSeq = match.get(); + // Notify observers about the match. + Arrays.stream(mObservers).forEach(o -> o.onMatch(ClusterMatcher.this, matchSeq)); + /* + * Get the index in cPkts of the last packet in the sequence of packets that matches the searched + * signature sequence. + */ + int matchSeqEndIdx = cPkts.indexOf(matchSeq.get(matchSeq.size()-1)); + // We restart the search for the signature sequence immediately after that index, so truncate cPkts. + cPkts = cPkts.stream().skip(matchSeqEndIdx + 1).collect(Collectors.toList()); + } + if (matchFound) { + // Break inner for-each loop in order to avoid duplicate detection of same event (see comment above) + break; + } + } + /* + * TODO: + * if no item in cluster matches, also perform a distance-based matching to cover those cases where we did + * not manage to capture every single mutation of the sequence during training. + * + * Need to compute average/centroid of cluster to do so...? Compute within-cluster variance, then check if + * distance between input conversation and cluster average/centroid is smaller than or equal to the computed + * variance? + */ + } + } + + /** + * Checks if {@code sequence} is a sequence of TLS packets. Note: the current implementation relies on inspection + * of the port numbers when deciding between TLS vs. non-TLS. Therefore, only the first packet of {@code sequence} + * is examined as it is assumed that all packets in {@code sequence} pertain to the same {@link Conversation} and + * hence share the same set of two src/dst port numbers (albeit possibly alternating between which one is the src + * and which one is the dst, as packets in {@code sequence} may be in alternating directions). + * @param sequence The sequence of packets for which it is to be determined if it is a sequence of TLS packets or + * non-TLS packets. + * @return {@code true} if {@code sequence} is a sequence of TLS packets, {@code false} otherwise. + */ + private boolean isTlsSequence(List sequence) { + // NOTE: Assumes ALL packets in sequence pertain to the same TCP connection! + PcapPacket firstPkt = sequence.get(0); + int srcPort = getSourcePort(firstPkt); + int dstPort = getDestinationPort(firstPkt); + return TcpConversationUtils.isTlsPort(srcPort) || TcpConversationUtils.isTlsPort(dstPort); + } + + /** + * Examine if a given sequence of packets ({@code sequence}) contains a given shorter sequence of packets + * ({@code subsequence}). Note: the current implementation actually searches for a substring as it does not allow + * for interleaving packets in {@code sequence} that are not in {@code subsequence}; for example, if + * {@code subsequence} consists of packet lengths [2, 3, 5] and {@code sequence} consists of packet lengths + * [2, 3, 4, 5], the result will be that there is no match (because of the interleaving 4). If we are to allow + * interleaving packets, we need a modified version of + * this. + * + * @param subsequence The sequence to search for. + * @param sequence The sequence to search. + * @param subsequenceDirections The directions of packets in {@code subsequence} such that for all {@code i}, + * {@code subsequenceDirections[i]} is the direction of the packet returned by + * {@code subsequence.get(i)}. May be set to {@code null}, in which this call will + * internally compute the packet directions. + * @param sequenceDirections The directions of packets in {@code sequence} such that for all {@code i}, + * {@code sequenceDirections[i]} is the direction of the packet returned by + * {@code sequence.get(i)}. May be set to {@code null}, in which this call will internally + * compute the packet directions. + * + * @return An {@link Optional} containing the part of {@code sequence} that matches {@code subsequence}, or an empty + * {@link Optional} if no part of {@code sequence} matches {@code subsequence}. + */ + private Optional> findSubsequenceInSequence(List subsequence, + List sequence, + Conversation.Direction[] subsequenceDirections, + Conversation.Direction[] sequenceDirections) { + if (sequence.size() < subsequence.size()) { + // If subsequence is longer, it cannot be contained in sequence. + return Optional.empty(); + } + if (isTlsSequence(subsequence) != isTlsSequence(sequence)) { + // We consider it a mismatch if one is a TLS application data sequence and the other is not. + return Optional.empty(); + } + // If packet directions have not been precomputed by calling code, we need to construct them. + if (subsequenceDirections == null) { + subsequenceDirections = getPacketDirections(subsequence, mRouterWanIp); + } + if (sequenceDirections == null) { + sequenceDirections = getPacketDirections(sequence, mRouterWanIp); + } + int subseqIdx = 0; + int seqIdx = 0; + while (seqIdx < sequence.size()) { + PcapPacket subseqPkt = subsequence.get(subseqIdx); + PcapPacket seqPkt = sequence.get(seqIdx); + // We only have a match if packet lengths and directions match. + if (subseqPkt.getOriginalLength() == seqPkt.getOriginalLength() && + subsequenceDirections[subseqIdx] == sequenceDirections[seqIdx]) { + // A match; advance both indices to consider next packet in subsequence vs. next packet in sequence. + subseqIdx++; + seqIdx++; + if (subseqIdx == subsequence.size()) { + // We managed to match the entire subsequence in sequence. + // Return the sublist of sequence that matches subsequence. + /* + * TODO: + * ASSUMES THE BACKING LIST (i.e., 'sequence') IS _NOT_ STRUCTURALLY MODIFIED, hence may not work + * for live traces! + */ + return Optional.of(sequence.subList(seqIdx - subsequence.size(), seqIdx)); + } + } else { + // Mismatch. + if (subseqIdx > 0) { + /* + * If we managed to match parts of subsequence, we restart the search for subsequence in sequence at + * the index of sequence where the current mismatch occurred. I.e., we must reset subseqIdx, but + * leave seqIdx untouched. + */ + subseqIdx = 0; + } else { + /* + * First packet of subsequence didn't match packet at seqIdx of sequence, so we move forward in + * sequence, i.e., we continue the search for subsequence in sequence starting at index seqIdx+1 of + * sequence. + */ + seqIdx++; + } + } + } + return Optional.empty(); + } + + /** + * Given a {@code List}, generate a {@code Conversation.Direction[]} such that each entry in the + * resulting {@code Conversation.Direction[]} specifies the direction of the {@link PcapPacket} at the corresponding + * index in the input list. + * @param packets The list of packets for which to construct a corresponding array of packet directions. + * @param routerWanIp The IP of the router's WAN port. This is used for determining the direction of packets when + * the traffic is captured just outside the local network (at the ISP side of the router). Set to + * {@code null} if {@code packets} stem from traffic captured within the local network. + * @return A {@code Conversation.Direction[]} specifying the direction of the {@link PcapPacket} at the + * corresponding index in {@code packets}. + */ + private static Conversation.Direction[] getPacketDirections(List packets, String routerWanIp) { + Conversation.Direction[] directions = new Conversation.Direction[packets.size()]; + for (int i = 0; i < packets.size(); i++) { + PcapPacket pkt = packets.get(i); + if (getSourceIp(pkt).equals(getDestinationIp(pkt))) { + // Sanity check: we shouldn't be processing loopback traffic + throw new AssertionError("loopback traffic detected"); + } + if (isSrcIpLocal(pkt) || getSourceIp(pkt).equals(routerWanIp)) { + directions[i] = Conversation.Direction.CLIENT_TO_SERVER; + } else if (isDstIpLocal(pkt) || getDestinationIp(pkt).equals(routerWanIp)) { + directions[i] = Conversation.Direction.SERVER_TO_CLIENT; + } else { + throw new IllegalArgumentException("no local IP or router WAN port IP found, can't detect direction"); + } + } + return directions; + } + + /** + * Interface used by client code to register for receiving a notification whenever the {@link ClusterMatcher} + * detects traffic that is similar to the traffic that makes up the cluster returned by + * {@link ClusterMatcher#getCluster()}. + */ + interface ClusterMatchObserver { + /** + * Callback that is invoked whenever a sequence that is similar to a sequence associated with the cluster (i.e., + * a sequence is a member of the cluster) is detected in the traffic that the associated {@link ClusterMatcher} + * observes. + * @param clusterMatcher The {@link ClusterMatcher} that detected a match (classified traffic as pertaining to + * its associated cluster). + * @param match The traffic that was deemed to match the cluster associated with {@code clusterMatcher}. + */ + void onMatch(ClusterMatcher clusterMatcher, List match); + } + +} diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/SignatureDetector.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/SignatureDetector.java index 5ba189d..d7696f3 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/SignatureDetector.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/detection/SignatureDetector.java @@ -1,41 +1,100 @@ package edu.uci.iotproject.detection; -import edu.uci.iotproject.Conversation; -import edu.uci.iotproject.TcpReassembler; -import edu.uci.iotproject.analysis.TcpConversationUtils; +import edu.uci.iotproject.analysis.TriggerTrafficExtractor; +import edu.uci.iotproject.analysis.UserAction; import edu.uci.iotproject.io.PcapHandleReader; import edu.uci.iotproject.util.PrintUtils; +import org.jgrapht.GraphPath; +import org.jgrapht.alg.shortestpath.DijkstraShortestPath; +import org.jgrapht.graph.DefaultWeightedEdge; +import org.jgrapht.graph.SimpleDirectedWeightedGraph; import org.pcap4j.core.*; +import java.time.Duration; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.FormatStyle; import java.util.*; -import java.util.stream.Collectors; - -import static edu.uci.iotproject.util.PcapPacketUtils.*; +import java.util.function.Consumer; /** - * TODO add class documentation. + * Detects an event signature that spans one or multiple TCP connections. * * @author Janus Varmarken {@literal } * @author Rahmadi Trimananda {@literal } */ -public class SignatureDetector implements PacketListener { +public class SignatureDetector implements PacketListener, ClusterMatcher.ClusterMatchObserver { // Test client public static void main(String[] args) throws PcapNativeException, NotOpenException { - -// String path = "/scratch/July-2018"; // Rahmadi + // String path = "/scratch/July-2018"; // Rahmadi String path = "/Users/varmarken/temp/UCI IoT Project/experiments"; // Janus + + // Kwikset Doorlock Sep 12 experiment + final String inputPcapFile = path + "/2018-08/kwikset-doorlock/kwikset3.wlan1.local.pcap"; + // Kwikset Doorlock PHONE signatures + final String onSignatureFile = path + "/2018-08/kwikset-doorlock/onSignature-Kwikset-Doorlock-phone.sig"; + final String offSignatureFile = path + "/2018-08/kwikset-doorlock/offSignature-Kwikset-Doorlock-phone.sig"; + + /* + // D-Link Plug experiment final String inputPcapFile = path + "/2018-07/dlink/dlink.wlan1.local.pcap"; - final String signatureFile = path + "/2018-07/dlink/offSignature1.sig"; + // D-Link Plug DEVICE signatures + final String onSignatureFile = path + "/2018-07/dlink/onSignature-DLink-Plug-device.sig"; + final String offSignatureFile = path + "/2018-07/dlink/offSignature-DLink-Plug-device.sig"; + // D-Link Plug PHONE signatures + final String onSignatureFile = path + "/2018-07/dlink/onSignature-DLink-Plug-phone.sig"; + final String offSignatureFile = path + "/2018-07/dlink/offSignature-DLink-Plug-phone.sig"; + */ + + /* + // D-Link Siren experiment + final String inputPcapFile = path + "/2018-08/dlink-siren/dlink-siren.wlan1.local.pcap"; + // D-Link Siren DEVICE signatures + final String onSignatureFile = path + "/2018-08/dlink-siren/onSignature-DLink-Siren-device.sig"; + final String offSignatureFile = path + "/2018-08/dlink-siren/offSignature-DLink-Siren-device.sig"; + // D-Link Siren PHONE signatures + final String onSignatureFile = path + "/2018-08/dlink-siren/onSignature-DLink-Siren-phone.sig"; + final String offSignatureFile = path + "/2018-08/dlink-siren/offSignature-DLink-Siren-phone.sig"; + */ + + List>> onSignature = PrintUtils.deserializeSignatureFromFile(onSignatureFile); + List>> offSignature = PrintUtils.deserializeSignatureFromFile(offSignatureFile); - List> signature = PrintUtils.deserializeClustersFromFile(signatureFile); - SignatureDetector signatureDetector = new SignatureDetector(signature, null, - (sig, match) -> System.out.println( - String.format("[ !!! SIGNATURE DETECTED AT %s !!! ]", - match.get(0).getTimestamp().atZone(ZoneId.of("America/Los_Angeles"))) - ) - ); + SignatureDetector onDetector = new SignatureDetector(onSignature, null); + SignatureDetector offDetector = new SignatureDetector(offSignature, null); + + final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.MEDIUM). + withLocale(Locale.US).withZone(ZoneId.of("America/Los_Angeles")); + + // Outputs information about a detected event to std.out + final Consumer outputter = ua -> { + String eventDescription; + switch (ua.getType()) { + case TOGGLE_ON: + eventDescription = "ON"; + break; + case TOGGLE_OFF: + eventDescription = "OFF"; + break; + default: + throw new AssertionError("unhandled event type"); + } + String output = String.format("[ !!! %s SIGNATURE DETECTED at %s !!! ]", + eventDescription, dateTimeFormatter.format(ua.getTimestamp())); + System.out.println(output); + }; + + // Let's create observers that construct a UserAction representing the detected event. + final List detectedEvents = new ArrayList<>(); + onDetector.addObserver((searched, match) -> { + PcapPacket firstPkt = match.get(0).get(0); + detectedEvents.add(new UserAction(UserAction.Type.TOGGLE_ON, firstPkt.getTimestamp())); + }); + offDetector.addObserver((searched, match) -> { + PcapPacket firstPkt = match.get(0).get(0); + detectedEvents.add(new UserAction(UserAction.Type.TOGGLE_OFF, firstPkt.getTimestamp())); + }); PcapHandle handle; try { @@ -43,352 +102,218 @@ public class SignatureDetector implements PacketListener { } catch (PcapNativeException pne) { handle = Pcaps.openOffline(inputPcapFile); } - PcapHandleReader reader = new PcapHandleReader(handle, p -> true, signatureDetector); + PcapHandleReader reader = new PcapHandleReader(handle, p -> true, onDetector, offDetector); reader.readFromHandle(); - signatureDetector.performDetection(); + + // TODO: need a better way of triggering detection than this... + onDetector.mClusterMatchers.forEach(cm -> cm.performDetection()); + offDetector.mClusterMatchers.forEach(cm -> cm.performDetection()); + + // Sort the list of detected events by timestamp to make it easier to compare it line-by-line with the trigger + // times file. + Collections.sort(detectedEvents, Comparator.comparing(UserAction::getTimestamp)); + // Output the detected events + detectedEvents.forEach(outputter); } /** - * The signature that this {@link SignatureDetector} is trying to detect in the observed traffic. + * The signature that this {@link SignatureDetector} is searching for. */ - private final List> mSignature; + private final List>> mSignature; /** - * The directions of packets in the sequences that make up {@link #mSignature}. + * The {@link ClusterMatcher}s in charge of detecting each individual sequence of packets that together make up the + * the signature. */ - private final Conversation.Direction[] mSignatureDirections; + private final List mClusterMatchers; /** - * For reassembling the observed traffic into TCP connections. + * For each {@code i} ({@code i >= 0 && i < pendingMatches.length}), {@code pendingMatches[i]} holds the matches + * found by the {@link ClusterMatcher} at {@code mClusterMatchers.get(i)} that have yet to be "consumed", i.e., + * have yet to be included in a signature detected by this {@link SignatureDetector} (a signature can be encompassed + * of multiple packet sequences occurring shortly after one another on multiple connections). */ - private final TcpReassembler mTcpReassembler = new TcpReassembler(); + private final List>[] pendingMatches; /** - * IP of the router's WAN port (if analyzed traffic is captured at the ISP's point of view). + * Maps a {@link ClusterMatcher} to its corresponding index in {@link #pendingMatches}. */ - private final String mRouterWanIp; + private final Map mClusterMatcherIds; - private final Observer[] mObservers; + private final List mObservers = new ArrayList<>(); - public SignatureDetector(List> signature, String routerWanIp, Observer... detectionObservers) { - mSignature = Collections.unmodifiableList(Objects.requireNonNull(signature, "signature cannot be null")); - mObservers = Objects.requireNonNull(detectionObservers, "detectionObservers cannot be null"); - if (mSignature.isEmpty() || mSignature.stream().anyMatch(inner -> inner.isEmpty())) { - throw new IllegalArgumentException("signature is empty (or contains empty inner List)"); + public SignatureDetector(List>> searchedSignature, String routerWanIp) { + // note: doesn't protect inner lists from changes :'( + mSignature = Collections.unmodifiableList(searchedSignature); + // Generate corresponding/appropriate ClusterMatchers based on the provided signature + List clusterMatchers = new ArrayList<>(); + for (List> cluster : mSignature) { + clusterMatchers.add(new ClusterMatcher(cluster, routerWanIp, this)); } - if (mObservers.length == 0) { - throw new IllegalArgumentException("no detectionObservers provided"); + mClusterMatchers = Collections.unmodifiableList(clusterMatchers); + + // < exploratory > + pendingMatches = new List[mClusterMatchers.size()]; + for (int i = 0; i < pendingMatches.length; i++) { + pendingMatches[i] = new ArrayList<>(); } - mRouterWanIp = routerWanIp; - // Build the signature's direction sequence. - // Note: assumes that the provided signature was captured within the local network (routerWanIp is set to null). - mSignatureDirections = getPacketDirections(mSignature.get(0), null); - /* - * Enforce restriction on cluster/signature members: all representatives must exhibit the same direction pattern - * and contain the same number of packets. Note that this is a somewhat heavy operation, so it may be disabled - * later on in favor of performance. However, it is only run once (at instantiation), so the overhead may be - * warranted in order to ensure correctness, especially during the development/debugging phase. - */ - if (mSignature.stream(). - anyMatch(inner -> !Arrays.equals(mSignatureDirections, getPacketDirections(inner, null)))) { - throw new IllegalArgumentException( - "signature members must contain the same number of packets and exhibit the same packet direction " + - "pattern" - ); + Map clusterMatcherIds = new HashMap<>(); + for (int i = 0; i < mClusterMatchers.size(); i++) { + clusterMatcherIds.put(mClusterMatchers.get(i), i); } + mClusterMatcherIds = Collections.unmodifiableMap(clusterMatcherIds); + } + + public void addObserver(SignatureDetectionObserver observer) { + mObservers.add(observer); + } + + public boolean removeObserver(SignatureDetectionObserver observer) { + return mObservers.remove(observer); } @Override public void gotPacket(PcapPacket packet) { - // Present packet to TCP reassembler so that it can be mapped to a connection (if it is a TCP packet). - mTcpReassembler.gotPacket(packet); + // simply delegate packet reception to all ClusterMatchers. + mClusterMatchers.forEach(cm -> cm.gotPacket(packet)); } + @Override + public void onMatch(ClusterMatcher clusterMatcher, List match) { + // Add the match at the corresponding index + pendingMatches[mClusterMatcherIds.get(clusterMatcher)].add(match); + checkSignatureMatch(); + } -// public void performDetection() { -// // Let's start out simple by building a version that only works for signatures that do not span across multiple -// // TCP conversations... -// for (Conversation c : mTcpReassembler.getTcpConversations()) { -// for (List sequence : mSignature) { -// boolean matchFound = isSequenceInConversation(sequence, c); -// if (matchFound) { -// for (Observer obs : mObservers) { -// obs.onSequenceDetected(sequence, c); -// } -// // Found signature in current conversation, so break inner loop and continue with next conversation. -// // TODO: signature can be present more than once in Conversation... -// break; -// } -// } -// /* -// * TODO: -// * if no item in cluster matches, also perform a distance-based matching to cover those cases where we did -// * not manage to capture every single mutation of the sequence during training. -// * -// * Need to compute average/centroid of cluster to do so...? Compute within-cluster variance, then check if -// * distance between input conversation and cluster average/centroid is smaller than or equal to the computed -// * variance? -// */ -// } -// } - + private void checkSignatureMatch() { + // << Graph-based approach using Balint's idea. >> + // This implementation assumes that the packets in the inner lists (the sequences) are ordered by asc timestamp. - public void performDetection() { - /* - * Let's start out simple by building a version that only works for signatures that do not span across multiple - * TCP conversations... - */ - for (Conversation c : mTcpReassembler.getTcpConversations()) { - if (c.isTls() && c.getTlsApplicationDataPackets().isEmpty() || !c.isTls() && c.getPackets().isEmpty()) { - // Skip empty conversations. - continue; - } - for (List signatureSequence : mSignature) { - if (isTlsSequence(signatureSequence) != c.isTls()) { - // We consider it a mismatch if one is a TLS application data sequence and the other is not. - continue; + // There cannot be a signature match until each ClusterMatcher has found a match of its respective sequence. + if (Arrays.stream(pendingMatches).noneMatch(l -> l.isEmpty())) { + // Construct the DAG + final SimpleDirectedWeightedGraph graph = + new SimpleDirectedWeightedGraph<>(DefaultWeightedEdge.class); + // Add a vertex for each match found by all ClusterMatchers + // And maintain an array to keep track of what cluster matcher each vertex corresponds to + final List[] vertices = new List[pendingMatches.length]; + for (int i = 0; i < pendingMatches.length; i++) { + vertices[i] = new ArrayList<>(); + for (List sequence : pendingMatches[i]) { + Vertex v = new Vertex(sequence); + vertices[i].add(v); // retain reference for later when we are to add edges + graph.addVertex(v); // add to vertex to graph } - // Fetch set of packets to examine based on TLS or not. - List cPkts = c.isTls() ? c.getTlsApplicationDataPackets() : c.getPackets(); - /* - * Note: we embed the attempt to detect the signature sequence in a loop in order to capture those cases - * where the same signature sequence appears multiple times in one Conversation. - * - * Note: as the cluster can be made up of identical sequences, we must keep track of whether we detected - * a match and, if so, break the inner for-each loop in order to prevent raising an alarm for each - * cluster-member (prevent duplicate detections of the same event). However, a negative side-effect of - * this is that, in doing so, we will also skip searching for subsequent different cluster members in - * the current conversation if the current cluster member is a match. - * - * Note: since we expect all sequences that together make up the signature to exhibit the same direction - * pattern, we can simply pass the precomputed direction array for the signature sequence so that it - * won't have to be recomputed internally in each call to findSubsequenceInSequence(). - */ - Optional> match; - boolean matchFound = false; - while ((match = findSubsequenceInSequence(signatureSequence, cPkts, mSignatureDirections, null)). - isPresent()) { - matchFound = true; - List matchSeq = match.get(); - // Notify observers about the match. - Arrays.stream(mObservers).forEach(o -> o.onSignatureDetected(mSignature, matchSeq)); - /* - * Get the index in cPkts of the last packet in the sequence of packets that matches the searched - * signature sequence. - */ - int matchSeqEndIdx = cPkts.indexOf(matchSeq.get(matchSeq.size()-1)); - // We restart the search for the signature sequence immediately after that index, so truncate cPkts. - cPkts = cPkts.stream().skip(matchSeqEndIdx + 1).collect(Collectors.toList()); - } - if (matchFound) { - // Break inner for-each loop in order to avoid duplicate detection of same event (see comment above) - break; - } - - -// match.ifPresent(ps -> Arrays.stream(mObservers).forEach(o -> o.onSignatureDetected(mSignature, ps))); -// if (match.isPresent()) { -// /* -// * We found an element in the signature cluster that was present in conversation, so no need to scan -// * conversation for remaining members of signature cluster (in fact, we'd be getting duplicate -// * output in those cases where the cluster is made up of identical sequences if we did not stop the -// * search here). -// * -// * TODO: -// * How do we handle those cases where the conversation matches the signature more than once (for -// * example, the long-lived connections used for sending the trigger from the cloud)? -// */ -// break; -// } } - } - } - -// /** -// * Examine if a {@link Conversation} contains a given sequence of packets. Note: the current implementation actually -// * searches for a substring as it does not allow for interleaved packets in {@code c} that are not in -// * {@code sequence}; for example, if {@code sequence} consists of packet lengths [2, 3, 5] and {@code c} consists of -// * packet lengths [2, 3, 4, 5], the result will be {@code false}. If we are to allow interleaved packets, we need -// * a modified version of this. -// * @param sequence The sequence to look for. -// * @param c The {@link Conversation} to search for {@code sequence} in. -// * @return {@code true} if {@code c} contains {@code sequence}, {@code false} otherwise. -// */ -// private boolean isSequenceInConversation(List sequence, Conversation c) { -// // TODO add offset argument to allow looking for sequence starting later in Conversation. -// // The packets we match against differ depending on whether the signature is a TLS or non-TLS signature. -// boolean tlsSequence = isTlsSequence(sequence); -// if (tlsSequence && !c.isTls()) { -// // If we're looking for a TLS signature and this conversation does not appear to be a TLS conversation, we -// // are done. Note: this assumes that they do NOT start performing TLS on new ports that are not captured in -// // Conversation.isTls() -// return false; -// } -// // Based on TLS or non-TLS signature, fetch the corresponding list of packets to match against. -// List packets = tlsSequence ? c.getTlsApplicationDataPackets() : c.getPackets(); -// // If sequence is longer than the conversation, it can obviously not be contained in the conversation. -// if (packets.size() < sequence.size()) { -// return false; -// } -// /* -// * Generate packet direction array for c. We have already generated the packet direction array for sequence as -// * part of the constructor (mSignatureDirections). -// */ -// Conversation.Direction[] cDirections = getPacketDirections(packets, mRouterWanIp); -// int seqIdx = 0; -// int convIdx = 0; -// while (convIdx < packets.size()) { -// PcapPacket seqPkt = sequence.get(seqIdx); -// PcapPacket convPkt = packets.get(convIdx); -// // We only have a match if packet lengths and directions match. -// if (convPkt.getOriginalLength() == seqPkt.getOriginalLength() && -// mSignatureDirections[seqIdx] == cDirections[convIdx]) { -// // A match, advance both indices to consider next packet in sequence vs. next packet in conversation -// seqIdx++; -// convIdx++; -// if (seqIdx == sequence.size()) { -// // we managed to match the full sequence in the conversation. -// return true; -// } -// } else { -// // Mismatch. -// if (seqIdx > 0) { -// /* -// * If we managed to match parts of sequence, we restart the search for sequence in c at the index of -// * c where the current mismatch occurred. I.e., we must reset seqIdx, but leave convIdx untouched. -// */ -// seqIdx = 0; -// } else { -// /* -// * First packet of sequence didn't match packet at convIdx of conversation, so we move forward in -// * conversation, i.e., we continue the search for sequence in c starting at index convIdx+1 of c. -// */ -// convIdx++; -// } -// } -// } -// return false; -// } - - private boolean isTlsSequence(List sequence) { - // NOTE: Assumes ALL packets in sequence pertain to the same TCP connection! - PcapPacket firstPkt = sequence.get(0); - int srcPort = getSourcePort(firstPkt); - int dstPort = getDestinationPort(firstPkt); - return TcpConversationUtils.isTlsPort(srcPort) || TcpConversationUtils.isTlsPort(dstPort); - } - -// private List findeSequenceInConversation(List sequence, Conversation conv, int offset) { -// if (isTlsSequence(sequence) != conv.isTls()) { -// // We consider it a mismatch if one is a TLS Application Data sequence and the other is not. -// return null; -// } -// List convPackets = conv.isTls() ? conv.getTlsApplicationDataPackets() : conv.getPackets(); + // Add dummy source and sink vertices to facilitate search. + final Vertex source = new Vertex(null); + final Vertex sink = new Vertex(null); + graph.addVertex(source); + graph.addVertex(sink); + // The source is connected to all vertices that wrap the sequences detected by ClusterMatcher at index 0. + // Note: zero cost edges as this is just a dummy link to facilitate search from a common start node. + for (Vertex v : vertices[0]) { + DefaultWeightedEdge edge = graph.addEdge(source, v); + graph.setEdgeWeight(edge, 0.0); + } + // Similarly, all vertices that wrap the sequences detected by the last ClusterMatcher of the signature + // are connected to the sink node. + for (Vertex v : vertices[vertices.length-1]) { + DefaultWeightedEdge edge = graph.addEdge(v, sink); + graph.setEdgeWeight(edge, 0.0); + } + // Now link sequences detected by ClusterMatcher at index i to sequences detected by ClusterMatcher at index + // i+1 if they obey the timestamp constraint (i.e., that the latter is later in time than the former). + for (int i = 0; i < vertices.length; i++) { + int j = i + 1; + if (j < vertices.length) { + for (Vertex iv : vertices[i]) { + PcapPacket ivLast = iv.sequence.get(iv.sequence.size()-1); + for (Vertex jv : vertices[j]) { + PcapPacket jvFirst = jv.sequence.get(jv.sequence.size()-1); + if (ivLast.getTimestamp().isBefore(jvFirst.getTimestamp())) { + DefaultWeightedEdge edge = graph.addEdge(iv, jv); + // The weight is the duration of the i'th sequence plus the duration between the i'th + // and i+1'th sequence. + Duration d = Duration. + between(iv.sequence.get(0).getTimestamp(), jvFirst.getTimestamp()); + // Unfortunately weights are double values, so must convert from long to double. + // TODO: need nano second precision? If so, use d.toNanos(). + // TODO: risk of overflow when converting from long to double..? + graph.setEdgeWeight(edge, Long.valueOf(d.toMillis()).doubleValue()); + } + // Alternative version if we cannot assume that sequences are ordered by timestamp: +// if (iv.sequence.stream().max(Comparator.comparing(PcapPacket::getTimestamp)).get() +// .getTimestamp().isBefore(jv.sequence.stream().min( +// Comparator.comparing(PcapPacket::getTimestamp)).get().getTimestamp())) { // -// } - - private Optional> findSubsequenceInSequence(List subsequence, - List sequence, - Conversation.Direction[] subsequenceDirections, - Conversation.Direction[] sequenceDirections) { - if (sequence.size() < subsequence.size()) { - // If subsequence is longer, it cannot be contained in sequence. - return Optional.empty(); - } - if (isTlsSequence(subsequence) != isTlsSequence(sequence)) { - // We consider it a mismatch if one is a TLS application data sequence and the other is not. - return Optional.empty(); - } - // If packet directions have not been precomputed by calling code, we need to construct them. - if (subsequenceDirections == null) { - subsequenceDirections = getPacketDirections(subsequence, mRouterWanIp); - } - if (sequenceDirections == null) { - sequenceDirections = getPacketDirections(sequence, mRouterWanIp); - } - int subseqIdx = 0; - int seqIdx = 0; - while (seqIdx < sequence.size()) { - PcapPacket subseqPkt = subsequence.get(subseqIdx); - PcapPacket seqPkt = sequence.get(seqIdx); - // We only have a match if packet lengths and directions match. - if (subseqPkt.getOriginalLength() == seqPkt.getOriginalLength() && - subsequenceDirections[subseqIdx] == sequenceDirections[seqIdx]) { - // A match; advance both indices to consider next packet in subsequence vs. next packet in sequence. - subseqIdx++; - seqIdx++; - if (subseqIdx == subsequence.size()) { - // We managed to match the entire subsequence in sequence. - // Return the sublist of sequence that matches subsequence. - /* - * TODO: - * ASSUMES THE BACKING LIST (i.e., 'sequence') IS _NOT_ STRUCTURALLY MODIFIED, hence may not work - * for live traces! - */ - return Optional.of(sequence.subList(seqIdx - subsequence.size(), seqIdx)); +// } + } + } } - } else { - // Mismatch. - if (subseqIdx > 0) { - /* - * If we managed to match parts of subsequence, we restart the search for subsequence in sequence at - * the index of sequence where the current mismatch occurred. I.e., we must reset subseqIdx, but - * leave seqIdx untouched. - */ - subseqIdx = 0; - } else { - /* - * First packet of subsequence didn't match packet at seqIdx of sequence, so we move forward in - * sequence, i.e., we continue the search for subsequence in sequence starting at index seqIdx+1 of - * sequence. - */ - seqIdx++; + } + // Graph construction complete, run shortest-path to find a (potential) signature match. + DijkstraShortestPath dijkstra = new DijkstraShortestPath<>(graph); + GraphPath shortestPath = dijkstra.getPath(source, sink); + if (shortestPath != null) { + // The total weight is the duration between the first packet of the first sequence and the last packet + // of the last sequence, so we simply have to compare the weight against the timeframe that we allow + // the signature to span. For now we just use the inclusion window we defined for training purposes. + // Note however, that we must convert back from double to long as the weight is stored as a double in + // JGraphT's API. + if (((long)shortestPath.getWeight()) < TriggerTrafficExtractor.INCLUSION_WINDOW_MILLIS) { + // There's a signature match! + // Extract the match from the vertices + List> signatureMatch = new ArrayList<>(); + for(Vertex v : shortestPath.getVertexList()) { + if (v == source || v == sink) { + // Skip the dummy source and sink nodes. + continue; + } + signatureMatch.add(v.sequence); + // As there is a one-to-one correspondence between vertices[] and pendingMatches[], we know that + // the sequence we've "consumed" for index i of the matched signature is also at index i in + // pendingMatches. We must remove it from pendingMatches so that we don't use it to construct + // another signature match in a later call. + pendingMatches[signatureMatch.size()-1].remove(v.sequence); + } + // Declare success: notify observers + mObservers.forEach(obs -> obs.onSignatureDetected(mSignature, + Collections.unmodifiableList(signatureMatch))); } } } - return Optional.empty(); } /** - * Given a {@code List}, generate a {@code Conversation.Direction[]} such that each entry in the - * resulting {@code Conversation.Direction[]} specifies the direction of the {@link PcapPacket} at the corresponding - * index in the input list. - * @param packets The list of packets for which to construct a corresponding array of packet directions. - * @param routerWanIp The IP of the router's WAN port. This is used for determining the direction of packets when - * the traffic is captured just outside the local network (at the ISP side of the router). Set to - * {@code null} if {@code packets} stem from traffic captured within the local network. - * @return A {@code Conversation.Direction[]} specifying the direction of the {@link PcapPacket} at the - * corresponding index in {@code packets}. + * Used for registering for notifications of signatures detected by a {@link SignatureDetector}. */ - private static Conversation.Direction[] getPacketDirections(List packets, String routerWanIp) { - Conversation.Direction[] directions = new Conversation.Direction[packets.size()]; - for (int i = 0; i < packets.size(); i++) { - PcapPacket pkt = packets.get(i); - if (getSourceIp(pkt).equals(getDestinationIp(pkt))) { - // Sanity check: we shouldn't be processing loopback traffic - throw new AssertionError("loopback traffic detected"); - } - if (isSrcIpLocal(pkt) || getSourceIp(pkt).equals(routerWanIp)) { - directions[i] = Conversation.Direction.CLIENT_TO_SERVER; - } else if (isDstIpLocal(pkt) || getDestinationIp(pkt).equals(routerWanIp)) { - directions[i] = Conversation.Direction.SERVER_TO_CLIENT; - } else { - throw new IllegalArgumentException("no local IP or router WAN port IP found, can't detect direction"); - } - } - return directions; - } - - interface Observer { -// /** -// * Callback that is invoked when a sequence associated with the signature/cluster (i.e., the sequence is a -// * member of the cluster that makes up the signature) is detected in a {@link Conversation}. -// * @param sequence The sequence that was detected in {@code conversation}. -// * @param conversation The {@link Conversation} that {@code sequence} was detected in. -// */ -// void onSequenceDetected(List sequence, Conversation conversation); + interface SignatureDetectionObserver { - void onSignatureDetected(List> signature, List match); + /** + * Invoked when the {@link SignatureDetector} detects the presence of a signature in the traffic that it's + * examining. + * @param searchedSignature The signature that the {@link SignatureDetector} reporting the match is searching + * for. + * @param matchingTraffic The actual traffic trace that matches the searched signature. + */ + void onSignatureDetected(List>> searchedSignature, + List> matchingTraffic); } + /** + * Encapsulates a {@code List} so as to allow the list to be used as a vertex in a graph while avoiding + * the expensive {@link AbstractList#equals(Object)} calls when adding vertices to the graph. + * Using this wrapper makes the incurred {@code equals(Object)} calls delegate to {@link Object#equals(Object)} + * instead of {@link AbstractList#equals(Object)}. The net effect is a faster implementation, but the graph will not + * recognize two lists that contain the same items--from a value and not reference point of view--as the same + * vertex. However, this is fine for our purposes -- in fact restricting it to reference equality seems more + * appropriate. + */ + private static class Vertex { + private final List sequence; + private Vertex(List wrappedSequence) { + sequence = wrappedSequence; + } + } }