Extract Conversation to separate file (i.e. it is no longer an inner class). Add...
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / FlowPatternFinder.java
1 package edu.uci.iotproject;
2
3 import org.pcap4j.core.NotOpenException;
4 import org.pcap4j.core.PcapHandle;
5 import org.pcap4j.core.PcapNativeException;
6 import org.pcap4j.core.PcapPacket;
7 import org.pcap4j.packet.IpV4Packet;
8 import org.pcap4j.packet.Packet;
9 import org.pcap4j.packet.TcpPacket;
10 import org.pcap4j.packet.DnsPacket;
11
12 import java.io.EOFException;
13 import java.net.UnknownHostException;
14 import java.time.Instant;
15 import java.util.*;
16 import java.util.concurrent.TimeoutException;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 /**
22  * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
23  * We use 2 threads:
24  *  1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
25  *  2) The second thread (checker thread) checks the collected conversation.
26  *
27  * @author Janus Varmarken
28  * @author Rahmadi Trimananda
29  */
30 public class FlowPatternFinder {
31
32     /* Class properties */
33     private Map<Conversation, List<PcapPacket>> connections;
34     private Queue<Conversation> conversations;
35     private DnsMap dnsMap;
36     private PcapHandle pcap;
37     private FlowPattern pattern;
38     private AtomicBoolean isEoF;
39     
40     
41     /* Constructor */
42     public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
43
44         this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
45         this.conversations = new ConcurrentLinkedQueue<Conversation>();
46         this.dnsMap = new DnsMap();
47         this.isEoF = new AtomicBoolean(false);
48
49         // Get input parameters
50         this.pcap = _pcap;
51         this.pattern = _pattern;
52     }
53     
54     
55     public void start() {
56     
57         // Spawn the main thread
58         Thread mainThread = new Thread(new Runnable() {
59             public void run() {
60                 findFlowPattern();
61             }
62         });
63         mainThread.start();
64
65         // Spawn the checker thread
66         Thread checkerThread = new Thread(new Runnable() {
67             public void run() {
68                 find();
69             }
70         });
71         checkerThread.start();
72
73         /* TODO: Join the threads if we want it to be blocking
74         try {
75             mainThread.join();
76             checkerThread.join();
77         } catch(InterruptedException ex) {
78             ex.printStackTrace();
79         }*/
80         System.out.println("[ start ] Main and checker threads started!");
81     }
82
83
84     /**
85      * Find patterns based on the FlowPattern object (run by a thread)
86      */
87     private void findFlowPattern() {
88         int counter = 0;
89         try {
90             PcapPacket packet;
91             Set<Integer> seqNumberSet = new HashSet<Integer>();
92             int patternLength = pattern.getLength();
93             while ((packet = pcap.getNextPacketEx()) != null) {
94
95                 // Check if this is a valid DNS packet
96                 dnsMap.validateAndAddNewEntry(packet);
97                 // For now, we only work support pattern search in TCP over IPv4.
98                 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
99                 TcpPacket tcpPacket = packet.get(TcpPacket.class);
100                 if (ipPacket == null || tcpPacket == null)
101                     continue;
102                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
103                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
104                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
105                 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
106                 // Is this packet related to the pattern and coming to/from the cloud server?
107                 boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
108                 boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
109                 if (!fromServer && !fromClient)  // Packet not related to pattern, skip it.
110                     continue;
111                 if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
112                     continue; 
113                 // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
114                 // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
115                 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
116                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
117                 // Create new conversation entry, or append packet to existing.
118                 List<PcapPacket> listPcapPacket = connections.get(conversation);
119                 if (listPcapPacket == null) {
120                     listPcapPacket = new ArrayList<PcapPacket>();
121                     connections.put(conversation, listPcapPacket);
122                 }
123                 int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
124                 boolean retransmission = seqNumberSet.contains(seqNumber);
125                 if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
126                     listPcapPacket.add(packet);
127                     // End of conversation -> trigger thread to check
128                     if (listPcapPacket.size() == patternLength)
129                         conversations.add(conversation);
130                     seqNumberSet.add(seqNumber);
131                 }
132             }
133         } catch (EOFException eofe) {
134             while (isEoF.compareAndSet(false, true) == false);  // Try to signal EoF!
135             System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
136         } catch (UnknownHostException |
137                  PcapNativeException  |
138                  NotOpenException     |
139                  TimeoutException ex) {
140             ex.printStackTrace();
141         }
142     }
143     
144
145     /**
146      * Checker to match collected patterns (run by a thread)
147      */
148     private void find() {
149
150         while (isEoF.get() == false) {  // Continue until EoF
151             // Get the object from the queue
152             while(conversations.peek() == null) {  // Wait until queue is not empty
153                 if (isEoF.get() == true)    // Return if EoF
154                     return;
155             }            
156             Conversation conversation = conversations.poll();
157             // Get the object and remove it from the Map (housekeeping)
158             List<PcapPacket> packets = connections.remove(conversation);
159             boolean completeMatch = true;
160             for (int i = 0; i < packets.size(); i++) {
161                 TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
162                 if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) {
163                     completeMatch = false;
164                     break;
165                 }
166             }
167             if (completeMatch) {
168                 PcapPacket firstPacketInFlow = packets.get(0);
169                 System.out.println(
170                         String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
171                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
172             } /*else {
173                 PcapPacket firstPacketInFlow = packets.get(0);
174                 System.out.println(
175                         String.format("[ detected a mismatch of pattern '%s' at %s]",
176                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
177             }*/
178         }
179     }
180
181 }