package edu.uci.iotproject;
+import edu.uci.iotproject.comparison.ComparisonFunctions;
+import edu.uci.iotproject.comparison.CompleteMatchPatternComparisonResult;
+import edu.uci.iotproject.comparison.PatternComparisonTask;
+import edu.uci.iotproject.trafficreassembly.layer3.Conversation;
import org.pcap4j.core.NotOpenException;
import org.pcap4j.core.PcapHandle;
import org.pcap4j.core.PcapNativeException;
import org.pcap4j.core.PcapPacket;
+import org.pcap4j.packet.DnsPacket;
import org.pcap4j.packet.IpV4Packet;
-import org.pcap4j.packet.Packet;
import org.pcap4j.packet.TcpPacket;
-import org.pcap4j.packet.DnsPacket;
-import java.io.EOFException;
+import java.io.*;
import java.net.UnknownHostException;
-import java.time.Instant;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.*;
+
/**
- * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
- * We use 2 threads:
- * 1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
- * 2) The second thread (checker thread) checks the collected conversation.
+ * <p>Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.</p>
*
- * @author Janus Varmarken
- * @author Rahmadi Trimananda
+ * <p>
+ * The (entire) PCAP trace is traversed and parsed on one thread (specifically, the thread that calls
+ * {@link #findFlowPattern()}). This thread builds a {@link DnsMap} using the DNS packets present in the trace and uses
+ * that {@code DnsMap} to reassemble {@link Conversation}s that <em>potentially</em> match the provided
+ * {@link FlowPattern} (in that one end/party of said conversations matches the hostname(s) specified by the given
+ * {@code FlowPattern}).
+ * These potential matches are then examined on background worker thread(s) to determine if they are indeed a (complete)
+ * match of the provided {@code FlowPattern}.
+ * </p>
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
*/
public class FlowPatternFinder {
- /* Class properties */
- private Map<Conversation, List<PcapPacket>> connections;
- private Queue<Conversation> conversations;
- private DnsMap dnsMap;
- private PcapHandle pcap;
- private FlowPattern pattern;
- private AtomicBoolean isEoF;
-
-
- /* Constructor */
- public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
-
- this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
- this.conversations = new ConcurrentLinkedQueue<Conversation>();
- this.dnsMap = new DnsMap();
- this.isEoF = new AtomicBoolean(false);
-
- // Get input parameters
- this.pcap = _pcap;
- this.pattern = _pattern;
+ /* Begin class properties */
+ /**
+ * {@link ExecutorService} responsible for parallelizing pattern searches.
+ * Declared as static to allow for reuse of threads across different instances of {@code FlowPatternFinder} and to
+ * avoid the overhead of initializing a new thread pool for each {@code FlowPatternFinder} instance.
+ */
+ private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
+ /* End class properties */
+
+ /* Begin instance properties */
+ /**
+ * Holds a set of {@link Conversation}s that <em>potentially</em> match {@link #mPattern} since each individual
+ * {@code Conversation} is communication with the hostname identified by {@code mPattern.getHostname()}.
+ * Note that due to limitations of the {@link Set} interface (specifically, there is no {@code get(T t)} method),
+ * we have to resort to a {@link Map} (in which keys map to themselves) to "mimic" a set with {@code get(T t)}
+ * functionality.
+ *
+ * @see <a href="https://stackoverflow.com/questions/7283338/getting-an-element-from-a-set">this question on StackOverflow.com</a>
+ */
+ private final Map<Conversation, Conversation> mConversations;
+
+ /**
+ * Holds a list of trigger times.
+ */
+ private final List<Long> mTriggerTimes;
+ private static int triggerListCounter;
+
+ private final DnsMap mDnsMap;
+ private final PcapHandle mPcap;
+ private final FlowPattern mPattern;
+ private final ConversationPair mConvPair;
+ private final String FILE = "./devices/tplink_switch/datapoints.csv";
+ //private final String REF_FILE = "./devices/dlink_switch/dlink-june-26-2018.timestamps";
+ private final String REF_FILE = "./devices/tplink_switch/tplink-june-14-2018.timestamps";
+ //private final String REF_FILE = "./devices/tplink_switch/tplink-feb-13-2018.timestamps";
+ // Router time is in CET and we use PST for the trigger times
+ // Difference is 7 hours x 3600 x 1000ms = 25,200,000ms
+ private final long TIME_OFFSET = 25200000;
+
+ private final List<Future<CompleteMatchPatternComparisonResult>> mPendingComparisons = new ArrayList<>();
+ /* End instance properties */
+
+ /**
+ * Constructs a new {@code FlowPatternFinder}.
+ * @param pcap an <em>open</em> {@link PcapHandle} that provides access to the trace that is to be examined.
+ * @param pattern the {@link FlowPattern} to search for.
+ */
+ public FlowPatternFinder(PcapHandle pcap, FlowPattern pattern) {
+ this.mConversations = new HashMap<>();
+ this.mTriggerTimes = readTriggerTimes(REF_FILE);
+ triggerListCounter = 0;
+ this.mDnsMap = new DnsMap();
+ this.mPcap = Objects.requireNonNull(pcap,
+ String.format("Argument of type '%s' cannot be null", PcapHandle.class.getSimpleName()));
+ this.mPattern = Objects.requireNonNull(pattern,
+ String.format("Argument of type '%s' cannot be null", FlowPattern.class.getSimpleName()));
+ this.mConvPair = new ConversationPair(FILE, ConversationPair.Direction.DEVICE_TO_SERVER);
}
-
-
- public void start() {
-
- // Spawn the main thread
- Thread mainThread = new Thread(new Runnable() {
- public void run() {
- findFlowPattern();
- }
- });
- mainThread.start();
- // Spawn the checker thread
- Thread checkerThread = new Thread(new Runnable() {
- public void run() {
- find();
- }
- });
- checkerThread.start();
- /* TODO: Join the threads if we want it to be blocking
+ private List<Long> readTriggerTimes(String refFileName) {
+
+ List<Long> listTriggerTimes = new ArrayList<>();
try {
- mainThread.join();
- checkerThread.join();
- } catch(InterruptedException ex) {
- ex.printStackTrace();
- }*/
- System.out.println("[ start ] Main and checker threads started!");
+ File file = new File(refFileName);
+ BufferedReader br = new BufferedReader(new FileReader(file));
+ String s;
+ while ((s = br.readLine()) != null) {
+ listTriggerTimes.add(timeToMillis(s, false));
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ System.out.println("List has: " + listTriggerTimes.size());
+
+ return listTriggerTimes;
}
+ /**
+ * Starts the pattern search.
+ */
+ public void start() {
+
+ //findFlowPattern();
+ findSignatureBasedOnTimestamp();
+ }
/**
* Find patterns based on the FlowPattern object (run by a thread)
*/
private void findFlowPattern() {
- int counter = 0;
try {
PcapPacket packet;
- Set<Integer> seqNumberSet = new HashSet<Integer>();
- int patternLength = pattern.getLength();
- while ((packet = pcap.getNextPacketEx()) != null) {
-
- // Check if this is a valid DNS packet
- dnsMap.validateAndAddNewEntry(packet);
+// TODO: The new comparison method is pending
+// TODO: For now, just compare using one hostname and one list per FlowPattern
+// List<String> hostnameList = mPattern.getHostnameList();
+// int hostIndex = 0;
+ while ((packet = mPcap.getNextPacketEx()) != null) {
+ // Let DnsMap handle DNS packets.
+ if (packet.get(DnsPacket.class) != null) {
+ // Check if this is a valid DNS packet
+ mDnsMap.validateAndAddNewEntry(packet);
+ continue;
+ }
// For now, we only work support pattern search in TCP over IPv4.
- IpV4Packet ipPacket = packet.get(IpV4Packet.class);
- TcpPacket tcpPacket = packet.get(TcpPacket.class);
- if (ipPacket == null || tcpPacket == null)
+ final IpV4Packet ipPacket = packet.get(IpV4Packet.class);
+ final TcpPacket tcpPacket = packet.get(TcpPacket.class);
+ if (ipPacket == null || tcpPacket == null) {
continue;
+ }
+
String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
- // Is this packet related to the pattern and coming to/from the cloud server?
- boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
- boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
- if (!fromServer && !fromClient) // Packet not related to pattern, skip it.
+ // Is this packet related to the pattern; i.e. is it going to (or coming from) the cloud server?
+ boolean fromServer = mDnsMap.isRelatedToCloudServer(srcAddress, mPattern.getHostname());
+ boolean fromClient = mDnsMap.isRelatedToCloudServer(dstAddress, mPattern.getHostname());
+// String currentHostname = hostnameList.get(hostIndex);
+// boolean fromServer = mDnsMap.isRelatedToCloudServer(srcAddress, currentHostname);
+// boolean fromClient = mDnsMap.isRelatedToCloudServer(dstAddress, currentHostname);
+ if (!fromServer && !fromClient) {
+ // Packet not related to pattern, skip it.
continue;
- if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
- continue;
- // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
- // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
+ }
+
+ // Conversations (connections/sessions) are identified by the four-tuple
+ // (clientIp, clientPort, serverIp, serverPort) (see Conversation Javadoc).
+ // Create "dummy" conversation for looking up an existing entry.
Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
new Conversation(dstAddress, dstPort, srcAddress, srcPort);
- // Create new conversation entry, or append packet to existing.
- List<PcapPacket> listPcapPacket = connections.get(conversation);
- if (listPcapPacket == null) {
- listPcapPacket = new ArrayList<PcapPacket>();
- connections.put(conversation, listPcapPacket);
+ // Add the packet so that the "dummy" conversation can be immediately added to the map if no entry
+ // exists for the conversation that the current packet belongs to.
+ if (tcpPacket.getHeader().getFin()) {
+ // Record FIN packets.
+ conversation.addFinPacket(packet);
+ }
+ if (tcpPacket.getPayload() != null) {
+ // Record regular payload packets.
+ conversation.addPacket(packet, true);
}
- int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
- boolean retransmission = seqNumberSet.contains(seqNumber);
- if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
- listPcapPacket.add(packet);
- // End of conversation -> trigger thread to check
- if (listPcapPacket.size() == patternLength)
- conversations.add(conversation);
- seqNumberSet.add(seqNumber);
+ // Note: does not make sense to call attemptAcknowledgementOfFin here as the new packet has no FINs
+ // in its list, so if this packet is an ACK, it would not be added anyway.
+ // Need to retain a final reference to get access to the packet in the lambda below.
+ final PcapPacket finalPacket = packet;
+ // Add the new conversation to the map if an equal entry is not already present.
+ // If an existing entry is already present, the current packet is simply added to that conversation.
+ mConversations.merge(conversation, conversation, (existingEntry, toMerge) -> {
+ // toMerge may not have any payload packets if the current packet is a FIN packet.
+ if (toMerge.getPackets().size() > 0) {
+ existingEntry.addPacket(toMerge.getPackets().get(0), true);
+ }
+ if (toMerge.getFinAckPairs().size() > 0) {
+ // Add the FIN packet to the existing entry.
+ existingEntry.addFinPacket(toMerge.getFinAckPairs().get(0).getFinPacket());
+ }
+ if (finalPacket.get(TcpPacket.class).getHeader().getAck()) {
+ existingEntry.attemptAcknowledgementOfFin(finalPacket);
+ }
+ return existingEntry;
+ });
+ // Refresh reference to point to entry in map (in case packet was added to existing entry).
+ conversation = mConversations.get(conversation);
+ if (conversation.isGracefullyShutdown()) {
+ // Conversation terminated gracefully, so we can now start analyzing it.
+ // Remove the Conversation from the map and start the analysis.
+ // Any future packets identified by the same four tuple will be tied to a new Conversation instance.
+ mConversations.remove(conversation);
+ // Create comparison task and send to executor service.
+ PatternComparisonTask<CompleteMatchPatternComparisonResult> comparisonTask =
+ new PatternComparisonTask<>(conversation, mPattern, ComparisonFunctions.SUB_SEQUENCE_COMPLETE_MATCH);
+ mPendingComparisons.add(EXECUTOR_SERVICE.submit(comparisonTask));
+ // Increment hostIndex to find the next
+
}
}
} catch (EOFException eofe) {
- while (isEoF.compareAndSet(false, true) == false); // Try to signal EoF!
+ // TODO should check for leftover conversations in map here and fire tasks for those.
+ // TODO [cont'd] such tasks may be present if connections did not terminate gracefully or if there are longlived connections.
System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
+ System.out.println("[ findFlowPattern ] Now waiting for comparisons to finish...");
+ // Wait for all comparisons to finish, then output their results to std.out.
+ for(Future<CompleteMatchPatternComparisonResult> comparisonTask : mPendingComparisons) {
+ try {
+ // Blocks until result is ready.
+ CompleteMatchPatternComparisonResult comparisonResult = comparisonTask.get();
+ if (comparisonResult.getResult()) {
+ System.out.println(comparisonResult.getTextualDescription());
+ }
+ } catch (InterruptedException|ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
} catch (UnknownHostException |
PcapNativeException |
NotOpenException |
ex.printStackTrace();
}
}
-
/**
- * Checker to match collected patterns (run by a thread)
+ * Find patterns based on the FlowPattern object (run by a thread)
*/
- private void find() {
-
- while (isEoF.get() == false) { // Continue until EoF
- // Get the object from the queue
- while(conversations.peek() == null) { // Wait until queue is not empty
- if (isEoF.get() == true) // Return if EoF
- return;
- }
- Conversation conversation = conversations.poll();
- // Get the object and remove it from the Map (housekeeping)
- List<PcapPacket> packets = connections.remove(conversation);
- boolean completeMatch = true;
- for (int i = 0; i < packets.size(); i++) {
- TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
- if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) {
- completeMatch = false;
- break;
+ private void findSignatureBasedOnTimestamp() {
+ try {
+ PcapPacket packet;
+// TODO: The new comparison method is pending
+// TODO: For now, just compare using one hostname and one list per FlowPattern
+ while ((packet = mPcap.getNextPacketEx()) != null) {
+ // Let DnsMap handle DNS packets.
+ if (packet.get(DnsPacket.class) != null) {
+ // Check if this is a valid DNS packet
+ mDnsMap.validateAndAddNewEntry(packet);
+ continue;
+ }
+ // For now, we only work support pattern search in TCP over IPv4.
+ final IpV4Packet ipPacket = packet.get(IpV4Packet.class);
+ final TcpPacket tcpPacket = packet.get(TcpPacket.class);
+ if (ipPacket == null || tcpPacket == null) {
+ continue;
+ }
+
+ String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
+ String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
+ int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
+ int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
+ //System.out.println("Timestamp packet: " + packet.getTimestamp());
+ // Is this packet related to the pattern; i.e. is it going to (or coming from) the cloud server?
+ boolean fromServer = mDnsMap.isRelatedToCloudServer(srcAddress, mPattern.getHostname());
+ boolean fromClient = mDnsMap.isRelatedToCloudServer(dstAddress, mPattern.getHostname());
+ if (!fromServer && !fromClient) {
+ // Packet not related to pattern, skip it.
+ continue;
+ }
+ // Record the conversation pairs
+ if (tcpPacket.getPayload() != null && checkTimeStamp(packet)) {
+ //if (tcpPacket.getPayload() != null) {
+ mConvPair.writeConversationPair(packet, fromClient, fromServer);
}
}
- if (completeMatch) {
- PcapPacket firstPacketInFlow = packets.get(0);
- System.out.println(
- String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
- pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
- } /*else {
- PcapPacket firstPacketInFlow = packets.get(0);
- System.out.println(
- String.format("[ detected a mismatch of pattern '%s' at %s]",
- pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
- }*/
+ } catch (EOFException eofe) {
+ triggerListCounter = 0;
+ mConvPair.close();
+ System.out.println("[ findFlowPattern ] ConversationPair writer closed!");
+ System.out.println("[ findFlowPattern ] Frequencies of data points:");
+ mConvPair.printListFrequency();
+ } catch (UnknownHostException |
+ PcapNativeException |
+ NotOpenException |
+ TimeoutException ex) {
+ ex.printStackTrace();
}
}
+ private boolean checkTimeStamp(PcapPacket packet) {
- /**
- * Immutable class used for identifying a conversation/connection/session/flow (packet's belonging to the same
- * session between a client and a server).
- */
- private static class Conversation {
-
- private final String clientIp;
- private final int clientPort;
- private final String serverIp;
- private final int serverPort;
-
- public Conversation(String clientIp, int clientPort, String serverIp, int serverPort) {
- this.clientIp = clientIp;
- this.clientPort = clientPort;
- this.serverIp = serverIp;
- this.serverPort = serverPort;
- }
+ // Extract time from the packet's timestamp
+ String timeStamp = packet.getTimestamp().toString();
+ String timeString = timeStamp.substring(timeStamp.indexOf("T") + 1, timeStamp.indexOf("."));
+ // Timestamps are in CET (ahead of PST) so it should be deducted by TIME_OFFSET
+ long time = timeToMillis(timeString, true) - TIME_OFFSET;
+ //long time = timeToMillis(timeString, true);
+ //System.out.println("Gets here: " + time + " trigger time: " + mTriggerTimes.get(triggerListCounter));
- // =========================================================================================================
- // We simply reuse equals and hashCode methods of String.class to be able to use this immutable class as a key
- // in a Map.
- @Override
- public boolean equals(Object obj) {
- return obj instanceof Conversation && this.toString().equals(obj.toString());
+ // We accept packets that are at most 3 seconds away from the trigger time
+ if ((mTriggerTimes.get(triggerListCounter) <= time) &&
+ (time <= mTriggerTimes.get(triggerListCounter) + 3000)) {
+ //System.out.println("Gets here 1: " + timeString + " index: " + triggerListCounter);
+ return true;
+ } else {
+ // Handle the case that the timestamp is > 3000, but < next timestamp
+ // in the list. We ignore these packets.
+ if (time < mTriggerTimes.get(triggerListCounter)) {
+ // Timestamp is smaller than trigger, ignore!
+ //System.out.println("Gets here 2: " + timeString + " index: " + triggerListCounter);
+ return false;
+ } else { // Timestamp is greater than trigger, increment!
+ triggerListCounter = triggerListCounter + 1;
+ //System.out.println("Gets here 3: " + timeString + " index: " + triggerListCounter);
+ //return false;
+ return checkTimeStamp(packet);
+ }
}
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
- // =========================================================================================================
+ //System.out.println("Timestamp: " + timeToMillis(time, true));
+ //String time2 = "21:38:08";
+ //System.out.println("Timestamp: " + timeToMillis(time2, true));
+ }
+
+ /**
+ * A private function that returns time in milliseconds.
+ * @param time The time in the form of String.
+ * @param is24Hr If true, then this is in 24-hour format.
+ */
+ private long timeToMillis(String time, boolean is24Hr) {
- @Override
- public String toString() {
- return String.format("%s:%d %s:%d", clientIp, clientPort, serverIp, serverPort);
+ String format = null;
+ if (is24Hr) {
+ format = "hh:mm:ss";
+ } else { // 12 Hr format
+ format = "hh:mm:ss aa";
+ }
+ DateFormat sdf = new SimpleDateFormat(format);
+ Date date = null;
+ try {
+ date = sdf.parse(time);
+ } catch(Exception e) {
+ e.printStackTrace();
}
+ if (date == null)
+ return 0;
+ return date.getTime();
}
+
}