Expose pending messages in queue stats in AsyncServerSocket
authorJun Li <albertli@fb.com>
Tue, 27 Oct 2015 00:40:17 +0000 (17:40 -0700)
committerfacebook-github-bot-4 <folly-bot@fb.com>
Tue, 27 Oct 2015 01:20:20 +0000 (18:20 -0700)
Summary: Expose pending messages in accept queue in AsyncServerSocket.

Set default accept message queue size to 1024.

Reviewed By: djwatson

Differential Revision: D2525161

fb-gh-sync-id: a69ea0ee77729e4a8300bde3e3c07840f2d5d3cb

folly/io/async/AsyncServerSocket.h
folly/io/async/test/AsyncSocketTest2.cpp

index 9b82ebd5597a0c903a627a8b4047581ee69d8c87..d423d5f35b68d2b49140791675f332c42eb87c4f 100644 (file)
@@ -200,7 +200,7 @@ class AsyncServerSocket : public DelayedDestruction
 
   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
-  static const uint32_t kDefaultMaxMessagesInQueue = 0;
+  static const uint32_t kDefaultMaxMessagesInQueue = 1024;
   /**
    * Create a new AsyncServerSocket with the specified EventBase.
    *
@@ -564,6 +564,23 @@ class AsyncServerSocket : public DelayedDestruction
     return numDroppedConnections_;
   }
 
+  /**
+   * Get the current number of unprocessed messages in NotificationQueue.
+   *
+   * This method must be invoked from the AsyncServerSocket's primary
+   * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
+   * operation in the correct EventBase if your code is not in the server
+   * socket's primary EventBase.
+   */
+  int64_t getNumPendingMessagesInQueue() const {
+    assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+    int64_t numMsgs = 0;
+    for (const auto& callback : callbacks_) {
+      numMsgs += callback.consumer->getQueue()->size();
+    }
+    return numMsgs;
+  }
+
   /**
    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
    * (and thus on all subsequently-accepted connections). By default, keepalive
index 73295ca3b23ba90d5b4f817ab2f86512ed2fb4d9..1a5ebeb84e1d79f9d5a18f62161e90b7a6cd1495 100644 (file)
@@ -2195,3 +2195,46 @@ TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
 }
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+  EventBase eventBase;
+
+  // Counter of how many connections have been accepted
+  int count = 0;
+
+  // Create a server socket
+  auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+  serverSocket->bind(0);
+  serverSocket->listen(16);
+  folly::SocketAddress serverAddress;
+  serverSocket->getAddress(&serverAddress);
+
+  // Add a callback to accept connections
+  TestAcceptCallback acceptCallback;
+  acceptCallback.setConnectionAcceptedFn(
+      [&](int fd, const folly::SocketAddress& addr) {
+        count++;
+        CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+        if (count == 4) {
+          // all messages are processed, remove accept callback
+          serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+        }
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+    serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+  });
+  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+  serverSocket->startAccepting();
+
+  // Connect to the server socket, 4 clients, there are 4 connections
+  auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+  eventBase.loop();
+}