1 package edu.uci.iotproject;
3 import org.pcap4j.core.NotOpenException;
4 import org.pcap4j.core.PcapHandle;
5 import org.pcap4j.core.PcapNativeException;
6 import org.pcap4j.core.PcapPacket;
7 import org.pcap4j.packet.IpV4Packet;
8 import org.pcap4j.packet.Packet;
9 import org.pcap4j.packet.TcpPacket;
10 import org.pcap4j.packet.DnsPacket;
12 import java.io.EOFException;
13 import java.net.UnknownHostException;
14 import java.time.Instant;
16 import java.util.concurrent.TimeoutException;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.atomic.AtomicBoolean;
22 * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
24 * 1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
25 * 2) The second thread (checker thread) checks the collected conversation.
27 * @author Janus Varmarken
28 * @author Rahmadi Trimananda
30 public class FlowPatternFinder {
32 /* Class properties */
33 private Map<Conversation, List<PcapPacket>> connections;
34 private Queue<Conversation> conversations;
35 private DnsMap dnsMap;
36 private PcapHandle pcap;
37 private FlowPattern pattern;
38 private AtomicBoolean isEoF;
42 public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
44 this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
45 this.conversations = new ConcurrentLinkedQueue<Conversation>();
46 this.dnsMap = new DnsMap();
47 this.isEoF = new AtomicBoolean(false);
49 // Get input parameters
51 this.pattern = _pattern;
57 // Spawn the main thread
58 Thread mainThread = new Thread(new Runnable() {
65 // Spawn the checker thread
66 Thread checkerThread = new Thread(new Runnable() {
71 checkerThread.start();
73 /* TODO: Join the threads if we want it to be blocking
77 } catch(InterruptedException ex) {
80 System.out.println("[ start ] Main and checker threads started!");
85 * Find patterns based on the FlowPattern object (run by a thread)
87 private void findFlowPattern() {
91 Set<Integer> seqNumberSet = new HashSet<Integer>();
92 int patternLength = pattern.getLength();
93 while ((packet = pcap.getNextPacketEx()) != null) {
95 // Check if this is a valid DNS packet
96 dnsMap.validateAndAddNewEntry(packet);
97 // For now, we only work support pattern search in TCP over IPv4.
98 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
99 TcpPacket tcpPacket = packet.get(TcpPacket.class);
100 if (ipPacket == null || tcpPacket == null)
102 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
103 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
104 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
105 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
106 // Is this packet related to the pattern and coming to/from the cloud server?
107 boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
108 boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
109 if (!fromServer && !fromClient) // Packet not related to pattern, skip it.
111 if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
113 // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
114 // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
115 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
116 new Conversation(dstAddress, dstPort, srcAddress, srcPort);
117 // Create new conversation entry, or append packet to existing.
118 List<PcapPacket> listPcapPacket = connections.get(conversation);
119 if (listPcapPacket == null) {
120 listPcapPacket = new ArrayList<PcapPacket>();
121 connections.put(conversation, listPcapPacket);
123 int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
124 boolean retransmission = seqNumberSet.contains(seqNumber);
125 if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
126 listPcapPacket.add(packet);
127 // End of conversation -> trigger thread to check
128 if (listPcapPacket.size() == patternLength)
129 conversations.add(conversation);
130 seqNumberSet.add(seqNumber);
133 } catch (EOFException eofe) {
134 while (isEoF.compareAndSet(false, true) == false); // Try to signal EoF!
135 System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
136 } catch (UnknownHostException |
137 PcapNativeException |
139 TimeoutException ex) {
140 ex.printStackTrace();
146 * Checker to match collected patterns (run by a thread)
148 private void find() {
150 while (isEoF.get() == false) { // Continue until EoF
151 // Get the object from the queue
152 while(conversations.peek() == null) { // Wait until queue is not empty
153 if (isEoF.get() == true) // Return if EoF
156 Conversation conversation = conversations.poll();
157 // Get the object and remove it from the Map (housekeeping)
158 List<PcapPacket> packets = connections.remove(conversation);
159 boolean completeMatch = true;
160 for (int i = 0; i < packets.size(); i++) {
161 TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
162 if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) {
163 completeMatch = false;
168 PcapPacket firstPacketInFlow = packets.get(0);
170 String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
171 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
173 PcapPacket firstPacketInFlow = packets.get(0);
175 String.format("[ detected a mismatch of pattern '%s' at %s]",
176 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));