Add connection event callback to AsyncServerSocket
[folly.git] / folly / io / async / AsyncServerSocket.cpp
index 75e0da30ad45a8cfc01adf373c8dae438294bc27..b2b2d24be7510999634bf48aabdd529bfb050439 100644 (file)
@@ -91,6 +91,10 @@ void AsyncServerSocket::RemoteAcceptor::messageAvailable(
   switch (msg.type) {
     case MessageType::MSG_NEW_CONN:
     {
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onConnectionDequeuedByAcceptCallback(
+            msg.fd, msg.address);
+      }
       callback_->connectionAccepted(msg.fd, msg.address);
       break;
     }
@@ -515,7 +519,7 @@ void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
   // callback more efficiently without having to use a notification queue.
   RemoteAcceptor* acceptor = nullptr;
   try {
-    acceptor = new RemoteAcceptor(callback);
+    acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
     acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
   } catch (...) {
     callbacks_.pop_back();
@@ -722,6 +726,10 @@ void AsyncServerSocket::handlerReady(
 
     address.setFromSockaddr(saddr, addrLen);
 
+    if (clientSocket >= 0 && connectionEventCallback_) {
+      connectionEventCallback_->onConnectionAccepted(clientSocket, address);
+    }
+
     std::chrono::time_point<std::chrono::steady_clock> nowMs =
       std::chrono::steady_clock::now();
     auto timeSinceLastAccept = std::max<int64_t>(
@@ -737,6 +745,10 @@ void AsyncServerSocket::handlerReady(
         ++numDroppedConnections_;
         if (clientSocket >= 0) {
           closeNoInt(clientSocket);
+          if (connectionEventCallback_) {
+            connectionEventCallback_->onConnectionDropped(clientSocket,
+                                                          address);
+          }
         }
         continue;
       }
@@ -760,6 +772,9 @@ void AsyncServerSocket::handlerReady(
       } else {
         dispatchError("accept() failed", errno);
       }
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onConnectionAcceptError(errno);
+      }
       return;
     }
 
@@ -769,6 +784,9 @@ void AsyncServerSocket::handlerReady(
       closeNoInt(clientSocket);
       dispatchError("failed to set accepted socket to non-blocking mode",
                     errno);
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onConnectionDropped(clientSocket, address);
+      }
       return;
     }
 #endif
@@ -795,6 +813,7 @@ void AsyncServerSocket::dispatchSocket(int socket,
     return;
   }
 
+  const SocketAddress addr(address);
   // Create a message to send over the notification queue
   QueueMessage msg;
   msg.type = MessageType::MSG_NEW_CONN;
@@ -804,9 +823,13 @@ void AsyncServerSocket::dispatchSocket(int socket,
   // Loop until we find a free queue to write to
   while (true) {
     if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onConnectionEnqueuedForAcceptCallback(socket,
+                                                                        addr);
+      }
       // Success! return.
       return;
-   }
+    }
 
     // We couldn't add to queue.  Fall through to below
 
@@ -831,6 +854,9 @@ void AsyncServerSocket::dispatchSocket(int socket,
       LOG(ERROR) << "failed to dispatch newly accepted socket:"
                  << " all accept callback queues are full";
       closeNoInt(socket);
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onConnectionDropped(socket, addr);
+      }
       return;
     }
 
@@ -886,6 +912,9 @@ void AsyncServerSocket::enterBackoff() {
       // since we won't be able to re-enable ourselves later.
       LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
                  << " timer; unable to temporarly pause accepting";
+      if (connectionEventCallback_) {
+        connectionEventCallback_->onBackoffError();
+      }
       return;
     }
   }
@@ -903,6 +932,9 @@ void AsyncServerSocket::enterBackoff() {
   if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
     LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
                << "unable to temporarly pause accepting";
+    if (connectionEventCallback_) {
+      connectionEventCallback_->onBackoffError();
+    }
     return;
   }
 
@@ -912,6 +944,9 @@ void AsyncServerSocket::enterBackoff() {
   for (auto& handler : sockets_) {
     handler.unregisterHandler();
   }
+  if (connectionEventCallback_) {
+    connectionEventCallback_->onBackoffStarted();
+  }
 }
 
 void AsyncServerSocket::backoffTimeoutExpired() {
@@ -924,6 +959,9 @@ void AsyncServerSocket::backoffTimeoutExpired() {
 
   // If all of the callbacks were removed, we shouldn't re-enable accepts
   if (callbacks_.empty()) {
+    if (connectionEventCallback_) {
+      connectionEventCallback_->onBackoffEnded();
+    }
     return;
   }
 
@@ -942,6 +980,9 @@ void AsyncServerSocket::backoffTimeoutExpired() {
       abort();
     }
   }
+  if (connectionEventCallback_) {
+    connectionEventCallback_->onBackoffEnded();
+  }
 }