Put checks for explicit termination of conversation to use in FlowPatternFinder.
[pingpong.git] / Code / Projects / SmartPlugDetector / src / main / java / edu / uci / iotproject / FlowPatternFinder.java
index da0d3eb53f4e57a55825b7a6b4fc27f421367760..af35f89bb8add4fe6d9b274fd6c9e83e8f5c15c0 100644 (file)
@@ -94,7 +94,6 @@ public class FlowPatternFinder {
 //            TODO: For now, just compare using one hostname and one list per FlowPattern
 //            List<String> hostnameList = mPattern.getHostnameList();
 //            int hostIndex = 0;
-            int patternLength = mPattern.getLength();
             while ((packet = mPcap.getNextPacketEx()) != null) {
                 // Let DnsMap handle DNS packets.
                 if (packet.get(DnsPacket.class) != null) {
@@ -103,15 +102,12 @@ public class FlowPatternFinder {
                     continue;
                 }
                 // For now, we only work support pattern search in TCP over IPv4.
-                IpV4Packet ipPacket = packet.get(IpV4Packet.class);
-                TcpPacket tcpPacket = packet.get(TcpPacket.class);
+                final IpV4Packet ipPacket = packet.get(IpV4Packet.class);
+                final TcpPacket tcpPacket = packet.get(TcpPacket.class);
                 if (ipPacket == null || tcpPacket == null) {
                     continue;
                 }
-                if (tcpPacket.getPayload() == null) {
-                    // We skip non-payload control packets as these are less predictable
-                    continue;
-                }
+
                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
@@ -126,6 +122,7 @@ public class FlowPatternFinder {
                     // Packet not related to pattern, skip it.
                     continue;
                 }
+
                 // Conversations (connections/sessions) are identified by the four-tuple
                 // (clientIp, clientPort, serverIp, serverPort) (see Conversation Javadoc).
                 // Create "dummy" conversation for looking up an existing entry.
@@ -133,23 +130,41 @@ public class FlowPatternFinder {
                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
                 // Add the packet so that the "dummy" conversation can be immediately added to the map if no entry
                 // exists for the conversation that the current packet belongs to.
-                conversation.addPacket(packet, true);
+                if (tcpPacket.getHeader().getFin()) {
+                    // Record FIN packets.
+                    conversation.addFinPacket(packet);
+                }
+                if (tcpPacket.getPayload() != null) {
+                    // Record regular payload packets.
+                    conversation.addPacket(packet, true);
+                }
+                // Note: does not make sense to call attemptAcknowledgementOfFin here as the new packet has no FINs
+                // in its list, so if this packet is an ACK, it would not be added anyway.
+
+                // Need to retain a final reference to get access to the packet in the lambda below.
+                final PcapPacket finalPacket = packet;
                 // Add the new conversation to the map if an equal entry is not already present.
                 // If an existing entry is already present, the current packet is simply added to that conversation.
                 mConversations.merge(conversation, conversation, (existingEntry, toMerge) -> {
-                    // toMerge only has a single packet, which is the same as referred to by 'packet' variable, but need
-                    // this hack as 'packet' is not final and hence cannot be referred to in a lambda.
-                    existingEntry.addPacket(toMerge.getPackets().get(0), true);
+                    // toMerge may not have any payload packets if the current packet is a FIN packet.
+                    if (toMerge.getPackets().size() > 0) {
+                        existingEntry.addPacket(toMerge.getPackets().get(0), true);
+                    }
+                    if (toMerge.getFinAckPairs().size() > 0) {
+                        // Add the FIN packet to the existing entry.
+                        existingEntry.addFinPacket(toMerge.getFinAckPairs().get(0).getFinPacket());
+                    }
+                    if (finalPacket.get(TcpPacket.class).getHeader().getAck()) {
+                        existingEntry.attemptAcknowledgementOfFin(finalPacket);
+                    }
                     return existingEntry;
                 });
                 // Refresh reference to point to entry in map (in case packet was added to existing entry).
                 conversation = mConversations.get(conversation);
-                if (conversation.getPackets().size() == mPattern.getLength()) {
-//                if (conversation.getPackets().size() == mPattern.getLength(currentHostname)) {
-                    // Conversation reached a size that matches the expected size.
+                if (conversation.isGracefullyShutdown()) {
+                    // Conversation terminated gracefully, so we can now start analyzing it.
                     // Remove the Conversation from the map and start the analysis.
                     // Any future packets identified by the same four tuple will be tied to a new Conversation instance.
-                    // This might, for example, occur if the same conversation is reused for multiple events.
                     mConversations.remove(conversation);
                     // Create comparison task and send to executor service.
                     PatternComparisonTask<CompleteMatchPatternComparisonResult> comparisonTask =
@@ -160,6 +175,8 @@ public class FlowPatternFinder {
                 }
             }
         } catch (EOFException eofe) {
+            // TODO should check for leftover conversations in map here and fire tasks for those.
+            // TODO [cont'd] such tasks may be present if connections did not terminate gracefully or if there are longlived connections.
             System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
             System.out.println("[ findFlowPattern ] Now waiting for comparisons to finish...");
             // Wait for all comparisons to finish, then output their results to std.out.