Adding pre-processing for training set---we take packet lengths directly from a train...
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / FlowPatternFinder.java
1 package edu.uci.iotproject;
2
3 import org.pcap4j.core.NotOpenException;
4 import org.pcap4j.core.PcapHandle;
5 import org.pcap4j.core.PcapNativeException;
6 import org.pcap4j.core.PcapPacket;
7 import org.pcap4j.packet.IpV4Packet;
8 import org.pcap4j.packet.Packet;
9 import org.pcap4j.packet.TcpPacket;
10 import org.pcap4j.packet.DnsPacket;
11
12 import java.io.EOFException;
13 import java.net.UnknownHostException;
14 import java.time.Instant;
15 import java.util.*;
16 import java.util.concurrent.TimeoutException;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 /**
22  * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
23  * We use 2 threads:
24  *  1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
25  *  2) The second thread (checker thread) checks the collected conversation.
26  *
27  * @author Janus Varmarken
28  * @author Rahmadi Trimananda
29  */
30 public class FlowPatternFinder {
31
32     /* Class properties */
33     private Map<Conversation, List<PcapPacket>> connections;
34     private Queue<Conversation> conversations;
35     private DnsMap dnsMap;
36     private PcapHandle pcap;
37     private FlowPattern pattern;
38     private AtomicBoolean isEoF;
39    
40     
41     /* Constructor */
42     public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
43
44         this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
45         this.conversations = new ConcurrentLinkedQueue<Conversation>();
46         this.dnsMap = new DnsMap();
47         this.isEoF = new AtomicBoolean(false);
48
49         // Get input parameters
50         this.pcap = _pcap;
51         this.pattern = _pattern;
52     }
53     
54     
55     public void start() {
56     
57         // Spawn the main thread
58         Thread mainThread = new Thread(new Runnable() {
59             public void run() {
60                 findFlowPattern();
61             }
62         });
63         mainThread.start();
64
65         // Spawn the checker thread
66         Thread checkerThread = new Thread(new Runnable() {
67             public void run() {
68                 find();
69             }
70         });
71         checkerThread.start();
72
73         /* TODO: Join the threads if we want it to be blocking
74         try {
75             mainThread.join();
76             checkerThread.join();
77         } catch(InterruptedException ex) {
78             ex.printStackTrace();
79         }*/
80         System.out.println("[ start ] Main and checker threads started!");
81     }
82
83
84     /**
85      * Find patterns based on the FlowPattern object (run by a thread)
86      */
87     private void findFlowPattern() {
88         int counter = 0;
89         try {
90             PcapPacket packet;
91             Set<Integer> seqNumberSet = new HashSet<Integer>();
92             int patternLength = pattern.getLength();
93             while ((packet = pcap.getNextPacketEx()) != null) {
94
95                 // Check if this is a valid DNS packet
96                 dnsMap.validateAndAddNewEntry(packet);
97                 // For now, we only work support pattern search in TCP over IPv4.
98                 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
99                 TcpPacket tcpPacket = packet.get(TcpPacket.class);
100                 if (ipPacket == null || tcpPacket == null)
101                     continue;
102                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
103                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
104                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
105                 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
106                 // Is this packet related to the pattern and coming to/from the cloud server?
107                 boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
108                 boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
109                 if (!fromServer && !fromClient)  // Packet not related to pattern, skip it.
110                     continue;
111                 if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
112                     continue; 
113                 // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
114                 // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
115                 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
116                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
117                 // Create new conversation entry, or append packet to existing.
118                 List<PcapPacket> listPcapPacket = connections.get(conversation);
119                 if (listPcapPacket == null) {
120                     listPcapPacket = new ArrayList<PcapPacket>();
121                     connections.put(conversation, listPcapPacket);
122                 }
123                 int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
124                 boolean retransmission = seqNumberSet.contains(seqNumber);
125                 if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
126                     listPcapPacket.add(packet);
127                     // End of conversation -> trigger thread to check
128                     if (listPcapPacket.size() == patternLength)
129                         conversations.add(conversation);
130                     seqNumberSet.add(seqNumber);
131                 }
132             }
133         } catch (EOFException eofe) {
134             while (isEoF.compareAndSet(false, true) == false);  // Try to signal EoF!
135             System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
136         } catch (UnknownHostException |
137                  PcapNativeException  |
138                  NotOpenException     |
139                  TimeoutException ex) {
140             ex.printStackTrace();
141         }
142     }
143     
144
145     /**
146      * Checker to match collected patterns (run by a thread)
147      */
148     private void find() {
149
150         while (isEoF.get() == false) {  // Continue until EoF
151             // Get the object from the queue
152             while(conversations.peek() == null) {  // Wait until queue is not empty
153                 if (isEoF.get() == true)    // Return if EoF
154                     return;
155             }            
156             Conversation conversation = conversations.poll();
157             // Get the object and remove it from the Map (housekeeping)
158             List<PcapPacket> packets = connections.remove(conversation);
159             boolean completeMatch = true;
160             for (int i = 0; i < packets.size(); i++) {
161                 TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
162                 if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) {
163                     completeMatch = false;
164                     break;
165                 }
166             }
167             if (completeMatch) {
168                 PcapPacket firstPacketInFlow = packets.get(0);
169                 System.out.println(
170                         String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
171                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
172             } /*else {
173                 PcapPacket firstPacketInFlow = packets.get(0);
174                 System.out.println(
175                         String.format("[ detected a mismatch of pattern '%s' at %s]",
176                                 pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
177             }*/
178         }
179     }
180
181
182     /**
183      * Immutable class used for identifying a conversation/connection/session/flow (packet's belonging to the same
184      * session between a client and a server).
185      */
186     private static class Conversation {
187
188         private final String clientIp;
189         private final int clientPort;
190         private final String serverIp;
191         private final int serverPort;
192
193         public Conversation(String clientIp, int clientPort, String serverIp, int serverPort) {
194             this.clientIp = clientIp;
195             this.clientPort = clientPort;
196             this.serverIp = serverIp;
197             this.serverPort = serverPort;
198         }
199
200
201         // =========================================================================================================
202         // We simply reuse equals and hashCode methods of String.class to be able to use this immutable class as a key
203         // in a Map.
204         @Override
205         public boolean equals(Object obj) {
206             return obj instanceof Conversation && this.toString().equals(obj.toString());
207         }
208
209         @Override
210         public int hashCode() {
211             return toString().hashCode();
212         }
213         // =========================================================================================================
214
215         @Override
216         public String toString() {
217             return String.format("%s:%d %s:%d", clientIp, clientPort, serverIp, serverPort);
218         }
219     }
220 }