+import argparse\r
+import ipaddress\r
+import socket\r
+import unicodecsv as csv\r
+\r
+from scapy.all import *\r
+\r
+\r
+def full_duplex(p):\r
+ """\r
+ For reassembling bidirectional sessions (streams). By default, Scapy only groups packets in one direction. That is,\r
+ bidirectional sessions are split into two sessions, one with client-to-server packets, and one with server-to-client\r
+ packets.\r
+\r
+ Note that this is simplified session reassembly as it does not consider TCP FIN/RST packets --- packets are mapped\r
+ to their respective session based solely on the (src_ip, src_port, dst_ip, dst_port) four-tuple. If the client (or\r
+ server) closes a TCP stream and the client by chance selects the same ephemeral port number when contacting the same\r
+ server again, the two DIFFERENT TCP streams will be identified as a single stream.\r
+\r
+ Code courtesy of: https://pen-testing.sans.org/blog/2017/10/13/scapy-full-duplex-stream-reassembly\r
+\r
+ Also note that this assumes Ethernet as layer-2 wrapper for everything. This assumption holds for our TP-Link trace,\r
+ but will not hold in general. See discussion at:\r
+ https://gist.github.com/MarkBaggett/d8933453f431c111169158ce7f4e2222#file-scapy_helper-py\r
+\r
+ :param p: A Scapy packet object.\r
+ :return: Session identifier for the packet.\r
+ """\r
+ sess = "Other"\r
+ if 'Ether' in p:\r
+ if 'IP' in p:\r
+ if 'TCP' in p:\r
+ sess = str(sorted(["TCP", p[IP].src, p[TCP].sport, p[IP].dst, p[TCP].dport],key=str))\r
+ elif 'UDP' in p:\r
+ sess = str(sorted(["UDP", p[IP].src, p[UDP].sport, p[IP].dst, p[UDP].dport] ,key=str))\r
+ elif 'ICMP' in p:\r
+ sess = str(sorted(["ICMP", p[IP].src, p[IP].dst, p[ICMP].code, p[ICMP].type, p[ICMP].id] ,key=str))\r
+ else:\r
+ sess = str(sorted(["IP", p[IP].src, p[IP].dst, p[IP].proto] ,key=str))\r
+ elif 'ARP' in p:\r
+ sess = str(sorted(["ARP", p[ARP].psrc, p[ARP].pdst],key=str))\r
+ else:\r
+ sess = p.sprintf("Ethernet type=%04xr,Ether.type%")\r
+ return sess\r
+\r
+\r
+def get_tls_app_data_pkts(session):\r
+ """\r
+ Extract the TLS Application Data packets from a (TCP) stream.\r
+ :param tcp_session: The (TCP) stream.\r
+ :return: The (ordered) list of TLS application data packets in session.\r
+ """\r
+ return session.filter(lambda pkt: TLS in pkt and pkt[TLS].type == 23)\r
+\r
+\r
+def find_matches(pcap_file, device_ip, sig_duration):\r
+ """\r
+ Find all matches of [C-->S, S-->C] signatures in TLS conversations involving the device with IP=device_ip. Packet\r
+ lengths are not considered, only directions and timing (packet lengths are assumed unavaiable due to TLS padding).\r
+ :param pcap_file: The pcap file that is the target of the signature matching.\r
+ :param device_ip: IP of the device whose TLS sessions are to be examined for matches.\r
+ :param sig_duration: Maximum duration between request and response packets.\r
+ :return: A list of (request_packet, reply_packets) tuples, where reply_packets is a list of reply packets that\r
+ satisfy the signature match conditions (i.e., that they are within sig_duration after the request packet\r
+ and that no other request packet interleaves the request_packet and the reply packet).\r
+ """\r
+ # Read all packets into memory (stored as a list).\r
+ # This is slow and consumes lots of memory.\r
+ # There are more efficient ways to read the pcap (which clear each packet from memory after it's been processed).\r
+ # However, to simplify the detection implementation we stick with the quick-and-dirty approach.\r
+ pkts = rdpcap(pcap_file)\r
+ matches = []\r
+ # Group packets into sessions (streams)\r
+ sessions_dict = pkts.sessions(full_duplex)\r
+ for sess_key in sessions_dict:\r
+ session = sessions_dict[sess_key]\r
+ tls_app_data_pkts = get_tls_app_data_pkts(session)\r
+ if len(tls_app_data_pkts) == 0:\r
+ # Session w/o any TLS traffic, not relevant.\r
+ continue\r
+ first_pkt = tls_app_data_pkts[0]\r
+ if IP not in first_pkt:\r
+ # Only consider IPv4 traffic.\r
+ continue\r
+ if first_pkt[IP].src != device_ip and first_pkt[IP].dst != device_ip:\r
+ # Traffic from some other device; ignore -- not relevant to us.\r
+ continue\r
+ if ipaddress.ip_address(first_pkt[IP].src).is_multicast or ipaddress.ip_address(first_pkt[IP].dst).is_multicast:\r
+ # Don't include multicast traffic in the results.\r
+ # (Should never occur as TLS is not used for multicast?)\r
+ continue\r
+ # Now let's find all the potential matches for the current TLS session.\r
+ for i, request_pkt in enumerate(tls_app_data_pkts):\r
+ if request_pkt[IP].src != device_ip:\r
+ # We are trying to find matches for a simple [C->S, S->C] signature, so we want to first identify an\r
+ # outbound (device-to-cloud) packet and then subsequently find all potential reply packets\r
+ # (cloud-to-device). If this is a cloud-to-device packet, it is of no interest to us at this stage, so\r
+ # move on.\r
+ continue\r
+ # All subsequent cloud-to-device packets (replies) in this TLS session that lie within the signature\r
+ # duration after this packet AND that are not preceded by a device-to-cloud packet that is later than the\r
+ # current packet can be paired with the current packet to constitute a potential signature match.\r
+ idx = i+1\r
+ replies = []\r
+ while idx < len(tls_app_data_pkts) and tls_app_data_pkts[idx][IP].dst == device_ip:\r
+ reply_pkt = tls_app_data_pkts[idx]\r
+ if reply_pkt.time - request_pkt.time <= sig_duration:\r
+ # Could have this check in the loop condition as well. But some times packet order != timestamp\r
+ # order.\r
+ replies.append(reply_pkt)\r
+ idx += 1\r
+ matches.append((request_pkt, replies))\r
+ return matches\r
+\r
+\r
+def get_pkt_key(pkt):\r
+ """\r
+ Get a string representation of a packet that can be used as a key in a dictionary.\r
+ :param pkt: A Scapy packet.\r
+ :return: A string representation of a packet that can be used as a key in a dictionary.\r
+ """\r
+ return f'src={pkt.src} dst={pkt.dst} timestamp={pkt.time}'\r
+\r
+\r
+def build_pkt_number_dict(pcap_file):\r
+ """\r
+ Create a dictionary mapping packets to their packet number in pcap_file.\r
+ The keys are generated by passing each packet to get_pkt_key(pkt).\r
+ :param pcap_file: The pcap file for which a packet number dictionary is desired.\r
+ :return: A dictionary mapping packet keys (obtainable from get_pkt_key(pkt)) to the packets packet number.\r
+ """\r
+ pkts = rdpcap(pcap_file)\r
+ map = {}\r
+ for i, pkt in enumerate(pkts):\r
+ pkt_num = i + 1\r
+ key = get_pkt_key(pkt)\r
+ assert(key not in map)\r
+ map[key] = pkt_num\r
+ assert(len(map) == len(pkts))\r
+ # Double check that numbers come out right. Can be removed in final version.\r
+ pkts = rdpcap(pcap_file)\r
+ for i, pkt in enumerate(pkts):\r
+ pkt_key = get_pkt_key(pkt)\r
+ assert(pkt_key in map and map[pkt_key] == i+1)\r
+ return map\r
+\r
+\r
+def add_pkt_numbers_to_matches(pcap_file, matches):\r
+ """\r
+ Hacky way to augment the matches with packet numbers. Assumes the same device does not send or receive more than\r
+ one packet at a given timestamp.\r
+ :param pcap_file: The pcap file where the matches were found in.\r
+ :param matches: The matches.\r
+ :return: matches augmented with packet numbers; each packet is converted to a (pkt, pkt_number) tuple.\r
+ """\r
+ pkt_nums_dict = build_pkt_number_dict(pcap_file)\r
+ result = []\r
+ for req_pkt, replies in matches:\r
+ req_pkt_num = pkt_nums_dict[get_pkt_key(req_pkt)] #find_pkt_number(req_pkt, pcap_file)\r
+ numbered_req_pkt = (req_pkt, req_pkt_num)\r
+ numbered_reply_pkts = []\r
+ for reply_pkt in replies:\r
+ reply_pkt_num = pkt_nums_dict[get_pkt_key(reply_pkt)] #find_pkt_number(reply_pkt, pcap_file)\r
+ numbered_reply_pkts.append((reply_pkt, reply_pkt_num))\r
+ result.append((numbered_req_pkt, numbered_reply_pkts))\r
+ return result\r
+\r
+\r
+def write_matches_to_csv(matches, csv_filename):\r
+ """\r
+ Output matches to a .csv file.\r
+ matches argument is expected to be in the format returned by add_pkt_numbers_to_matches(pcap_file, matches).\r
+ :param matches: A list of matches w/ packet numbers, as returned by add_pkt_numbers_to_matches(pcap_file, matches).\r
+ :param csv_filename: Path to the .csv file where the output is to be written.\r
+ :return: None.\r
+ """\r
+ key_req_pkt = 'request_pkt'\r
+ key_reply_pkts = 'reply_pkts'\r
+ key_reply_pkts_count = 'number_of_reply_pkts'\r
+ key_conversation_info = 'tls_conversation_between'\r
+ columns = [key_req_pkt, key_reply_pkts, key_reply_pkts_count, key_conversation_info]\r
+ with open (csv_filename, 'wb') as csv_file:\r
+ writer = csv.DictWriter(csv_file, fieldnames=columns)\r
+ writer.writeheader()\r
+ for m in matches:\r
+ request_pkt = m[0][0]\r
+ request_pkt_num = m[0][1]\r
+ reply_pkts_numbers = []\r
+ for (reply_pkt, reply_pkt_num) in m[1]:\r
+ reply_pkts_numbers.append(reply_pkt_num)\r
+ info = f'{request_pkt[IP].src+":"+str(request_pkt[TCP].sport)} and ' + \\r
+ f'{request_pkt[IP].dst+":"+str(request_pkt[TCP].dport)}'\r
+ row = { key_req_pkt: request_pkt_num,\r
+ key_reply_pkts: '; '.join(str(pkt_num) for pkt_num in reply_pkts_numbers),\r
+ key_reply_pkts_count: len(reply_pkts_numbers),\r
+ key_conversation_info: info}\r
+ writer.writerow(row)\r
+\r
+\r
+if __name__ == '__main__':\r
+ desc = 'Perform detection on padded TLS traffic; ' + \\r
+ 'i.e., the detection is entirely based on timing information and packet directions. ' + \\r
+ 'NOTE: THIS CODE IS SIMPLIFIED AND ONLY WORKS FOR SIMPLE [Client-to-Server, Server-to-Client] TWO ' + \\r
+ 'PACKET SIGNATURES.'\r
+ parser = argparse.ArgumentParser(description=desc)\r
+ parser.add_argument('pcap_file', help='Full path to the target pcap file (detection target trace).')\r
+ parser.add_argument('device_ip', help='Perform detection on TLS flows from this device (identified by IP) only.')\r
+ h = 'Duration of the signature ' + \\r
+ '(max time between request and reply packet for the two packets to be considered a match). ' + \\r
+ 'Unit: seconds (floating point number expected).'\r
+ parser.add_argument('signature_duration',\r
+ help=h, type=float)\r
+ parser.add_argument('output_csv', help='Filename of CSV file where results are to be written.')\r
+ args = parser.parse_args()\r
+\r
+ pcap_file = args.pcap_file\r
+ device_ip = args.device_ip\r
+ signature_duration = args.signature_duration\r
+ output_csv = args.output_csv\r
+\r
+ load_layer('tls')\r
+\r
+ matches = find_matches(pcap_file, device_ip, signature_duration)\r
+ matches = add_pkt_numbers_to_matches(pcap_file, matches)\r
+ write_matches_to_csv(matches, output_csv)\r
+\r