X-Git-Url: http://plrg.eecs.uci.edu/git/?p=pingpong.git;a=blobdiff_plain;f=Code%2FProjects%2FPacketLevelSignatureExtractor%2Fsrc%2Fmain%2Fjava%2Fedu%2Fuci%2Fiotproject%2Fdetection%2Flayer2%2FLayer2ClusterMatcher.java;h=7b576be3d79debd87329944a8a6b2884e5e8c400;hp=5021c3119693bf84b5dc076e79cdf30e614ab815;hb=987ea910fed24a1f3f51ded41b6aa98c4e2618ae;hpb=17a06200889140f90c1e735d1307085c87c8fc41 diff --git a/Code/Projects/PacketLevelSignatureExtractor/src/main/java/edu/uci/iotproject/detection/layer2/Layer2ClusterMatcher.java b/Code/Projects/PacketLevelSignatureExtractor/src/main/java/edu/uci/iotproject/detection/layer2/Layer2ClusterMatcher.java index 5021c31..7b576be 100644 --- a/Code/Projects/PacketLevelSignatureExtractor/src/main/java/edu/uci/iotproject/detection/layer2/Layer2ClusterMatcher.java +++ b/Code/Projects/PacketLevelSignatureExtractor/src/main/java/edu/uci/iotproject/detection/layer2/Layer2ClusterMatcher.java @@ -1,16 +1,19 @@ package edu.uci.iotproject.detection.layer2; +import edu.uci.iotproject.analysis.TriggerTrafficExtractor; import edu.uci.iotproject.trafficreassembly.layer2.Layer2FlowReassembler; import edu.uci.iotproject.trafficreassembly.layer2.Layer2Flow; import edu.uci.iotproject.trafficreassembly.layer2.Layer2FlowReassemblerObserver; import edu.uci.iotproject.detection.AbstractClusterMatcher; import edu.uci.iotproject.trafficreassembly.layer2.Layer2FlowObserver; +import org.jetbrains.annotations.NotNull; import org.pcap4j.core.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; /** @@ -27,16 +30,37 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye * of {@link #mCluster} and has so far matched {@code j} packets of that particular sequence. */ private final Map mPerFlowSeqMatchers = new HashMap<>(); +// private final Map mPerFlowRangeMatcher = new HashMap<>(); + private final Map> mPerFlowRangeMatcher = new HashMap<>(); private final Function mFlowFilter; + /** + * Specifying range-based instead of conservative exact matching. + */ + private final boolean mRangeBased; + + /** + * Epsilon value used by the DBSCAN algorithm; it is used again for range-based matching here. + */ + private final double mEps; + + private int mInclusionTimeMillis; + + /** + * Keeping track of maximum number of skipped packets + */ + //private int mMaxSkippedPackets; + private List mMaxSkippedPackets; + /** * Create a new {@link Layer2ClusterMatcher} that attempts to find occurrences of {@code cluster}'s members. * @param cluster The sequence mutations that the new {@link Layer2ClusterMatcher} should search for. */ - public Layer2ClusterMatcher(List> cluster) { + public Layer2ClusterMatcher(List> cluster, int inclusionTimeMillis, + boolean isRangeBased, double eps) { // Consider all flows if no flow filter specified. - this(cluster, flow -> true); + this(cluster, flow -> true, inclusionTimeMillis, isRangeBased, eps); } /** @@ -48,14 +72,32 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye * namely when the {@link Layer2FlowReassembler} notifies the {@link Layer2ClusterMatcher} about * the new flow. This functionality may for example come in handy when one only wants to search * for matches in the subset of flows that involves a specific (range of) MAC(s). + * @param inclusionTimeMillis Packet inclusion time limit for matching. + * @param isRangeBased The boolean that decides if it is range-based vs. strict matching. + * @param eps The epsilon value used in the DBSCAN algorithm. */ - public Layer2ClusterMatcher(List> cluster, Function flowFilter) { - super(cluster); + public Layer2ClusterMatcher(List> cluster, Function flowFilter, + int inclusionTimeMillis, boolean isRangeBased, double eps) { + super(cluster, isRangeBased); mFlowFilter = flowFilter; + mRangeBased = isRangeBased; + mEps = eps; + mInclusionTimeMillis = + inclusionTimeMillis == 0 ? TriggerTrafficExtractor.INCLUSION_WINDOW_MILLIS : inclusionTimeMillis; + //mMaxSkippedPackets = 0; + mMaxSkippedPackets = new ArrayList<>(); } @Override public void onNewPacket(Layer2Flow flow, PcapPacket newPacket) { + if (mRangeBased) { + rangeBasedMatching(flow, newPacket); + } else { + conservativeMatching(flow, newPacket); + } + } + + private void conservativeMatching(Layer2Flow flow, PcapPacket newPacket) { if (mPerFlowSeqMatchers.get(flow) == null) { // If this is the first time we encounter this flow, we need to set up sequence matchers for it. // All sequences of the cluster have the same length, so we only need to compute the length of the nested @@ -65,7 +107,7 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye Layer2SequenceMatcher[][] matchers = new Layer2SequenceMatcher[mCluster.size()][mCluster.get(0).size()]; // Prepare a "state 0" sequence matcher for each sequence variation in the cluster. for (int i = 0; i < matchers.length; i++) { - matchers[i][0] = new Layer2SequenceMatcher(mCluster.get(i)); + matchers[i][0] = new Layer2SequenceMatcher(mCluster.get(i), mInclusionTimeMillis); } // Associate the new sequence matcher table with the new flow mPerFlowSeqMatchers.put(flow, matchers); @@ -86,6 +128,8 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye boolean matched = sm.matchPacket(newPacket); if (matched) { if (sm.getMatchedPacketsCount() == sm.getTargetSequencePacketCount()) { + // Update maximum skipped packets + updateMaxSkippedPackets(flow.getPackets(), sm.getMatchedPackets()); // Sequence matcher has a match. Report it to observers. mObservers.forEach(o -> o.onMatch(this, sm.getMatchedPackets())); // Remove the now terminated sequence matcher. @@ -105,13 +149,85 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye // We always want to have a sequence matcher in state 0, regardless of if the one that advanced // from state zero completed its matching or if it replaced a different one in state 1 or not. if (sm.getMatchedPacketsCount() == 1) { - matchers[i][j] = new Layer2SequenceMatcher(sm.getTargetSequence()); + matchers[i][j] = new Layer2SequenceMatcher(sm.getTargetSequence(), mInclusionTimeMillis); } } } } } + // Update the maximum number of skipped packets + private void updateMaxSkippedPackets(List flowPackets, List matchedPackets) { + // Count number of skipped packets by looking into + // the difference of indices of two matched packets + for(int i = 1; i < matchedPackets.size(); ++i) { + int currIndex = flowPackets.indexOf(matchedPackets.get(i-1)); + int nextIndex = flowPackets.indexOf(matchedPackets.get(i)); + int skippedPackets = nextIndex - currIndex; +// if (mMaxSkippedPackets < skippedPackets) { +// mMaxSkippedPackets = skippedPackets; +// } + mMaxSkippedPackets.add(skippedPackets); + } + } + + private void rangeBasedMatching(Layer2Flow flow, PcapPacket newPacket) { + // TODO: For range-based matching, we need to create a new matcher every time we see the first element of + // the sequence (between lower and upper bounds). + if (mPerFlowRangeMatcher.get(flow) == null) { + // If this is the first time we encounter this flow, we need to set up a list of sequence matchers. + List listMatchers = new ArrayList<>(); + // Prepare a "state 0" sequence matcher. + Layer2RangeMatcher matcher = new Layer2RangeMatcher(mCluster.get(0), mCluster.get(1), + mInclusionTimeMillis, mEps); + listMatchers.add(matcher); + // Associate the new sequence matcher table with the new flow. + mPerFlowRangeMatcher.put(flow, listMatchers); + } + // Fetch table that contains sequence matchers for this flow. + List listMatchers = mPerFlowRangeMatcher.get(flow); + // Add a new matcher if all matchers have already advanced to the next stage. + // We always need a new matcher to match from NO packets. + boolean addOneArray = true; + for(Layer2RangeMatcher matcher : listMatchers) { + if (matcher.getMatchedPacketsCount() == 0) { + addOneArray = false; + } + } + // Add the new matcher into the list + if (addOneArray) { + Layer2RangeMatcher newMatcher = new Layer2RangeMatcher(mCluster.get(0), mCluster.get(1), + mInclusionTimeMillis, mEps); + listMatchers.add(newMatcher); + } + // Present packet to the sequence matchers. + // Make a shallow copy of the list so that we can clean up the actual list when a matcher is terminated. + // Otherwise, we would get an exception for changing the list while iterating on it. + List listMatchersCopy = new ArrayList<>(listMatchers); + for(Layer2RangeMatcher matcher : listMatchersCopy) { + Layer2RangeMatcher sm = matcher; + // Check if no packets are matched yet or if there are matched packets, the next packet to be matched + // has to be later than the last matched packet. + // In most traces, a small amount of the packets appear out of order (with regards to their timestamp), + // which is why this check is required. + // Obviously it would not be needed if packets where guaranteed to be processed in timestamp + // order here. + if (sm.getMatchedPacketsCount() == 0 || + newPacket.getTimestamp().isAfter(sm.getLastPacket().getTimestamp())) { + boolean matched = sm.matchPacket(newPacket); + if (matched) { + if (sm.getMatchedPacketsCount() == sm.getTargetSequencePacketCount()) { + // Update maximum skipped packets + updateMaxSkippedPackets(flow.getPackets(), sm.getMatchedPackets()); + // Sequence matcher has a match. Report it to observers. + mObservers.forEach(o -> o.onMatch(this, sm.getMatchedPackets())); + // Terminate sequence matcher since matching is complete. + listMatchers.remove(matcher); + } + } + } + } + } @Override protected List> pruneCluster(List> cluster) { @@ -150,4 +266,14 @@ public class Layer2ClusterMatcher extends AbstractClusterMatcher implements Laye System.out.println(">>> IGNORING FLOW: " + newFlow + " <<<"); } } + + /** + * Return the maximum number of skipped packets. + */ +// public int getMaxSkippedPackets() { +// return mMaxSkippedPackets; +// } + public List getMaxSkippedPackets() { + return mMaxSkippedPackets; + } }