Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
index 9feb6d41f2ee45d486ba8dec58a0ce2531805fb2..d371fd4d98f3d4c049e6c399541ae99bf4ab8d7a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,9 +16,8 @@
 
 #pragma once
 
-#include <folly/MoveWrapper.h>
-#include <folly/io/IOBufQueue.h>
 #include <folly/Memory.h>
+#include <folly/io/IOBufQueue.h>
 #include <folly/io/async/AsyncUDPSocket.h>
 #include <folly/io/async/EventBase.h>
 
@@ -36,7 +35,8 @@ namespace folly {
  *       more than 1 packet will not work because they will end up with
  *       different event base to process.
  */
-class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
+class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
+                           , public AsyncSocketBase {
  public:
   class Callback {
    public:
@@ -55,11 +55,13 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
     /**
      * Invoked when a new packet is received
      */
-    virtual void onDataAvailable(const folly::SocketAddress& addr,
-                                 std::unique_ptr<folly::IOBuf> buf,
-                                 bool truncated) noexcept = 0;
+    virtual void onDataAvailable(
+      std::shared_ptr<AsyncUDPSocket> socket,
+      const folly::SocketAddress& addr,
+      std::unique_ptr<folly::IOBuf> buf,
+      bool truncated) noexcept = 0;
 
-    virtual ~Callback() {}
+    virtual ~Callback() = default;
   };
 
   /**
@@ -75,17 +77,22 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
         nextListener_(0) {
   }
 
-  ~AsyncUDPServerSocket() {
+  ~AsyncUDPServerSocket() override {
     if (socket_) {
       close();
     }
   }
 
-  void bind(const folly::SocketAddress& address) {
+  void bind(const folly::SocketAddress& addy) {
     CHECK(!socket_);
 
-    socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
-    socket_->bind(address);
+    socket_ = std::make_shared<AsyncUDPSocket>(evb_);
+    socket_->setReusePort(reusePort_);
+    socket_->bind(addy);
+  }
+
+  void setReusePort(bool reusePort) {
+    reusePort_ = reusePort;
   }
 
   folly::SocketAddress address() const {
@@ -93,6 +100,10 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
     return socket_->address();
   }
 
+  void getAddress(SocketAddress* a) const override {
+    *a = address();
+  }
+
   /**
    * Add a listener to the round robin list
    */
@@ -121,18 +132,24 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
 
   void close() {
     CHECK(socket_) << "Need to bind before closing";
+    socket_->close();
     socket_.reset();
   }
 
+  EventBase* getEventBase() const override {
+    return evb_;
+  }
+
  private:
   // AsyncUDPSocket::ReadCallback
-  void getReadBuffer(void** buf, size_t* len) noexcept {
+  void getReadBuffer(void** buf, size_t* len) noexcept override {
     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
   }
 
-  void onDataAvailable(const folly::SocketAddress& clientAddress,
-                       size_t len,
-                       bool truncated) noexcept {
+  void onDataAvailable(
+      const folly::SocketAddress& clientAddress,
+      size_t len,
+      bool truncated) noexcept override {
     buf_.postallocate(len);
     auto data = buf_.split(len);
 
@@ -148,28 +165,32 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
 
     auto client = clientAddress;
     auto callback = listeners_[nextListener_].second;
-    auto mvp =
-        folly::MoveWrapper<
-            std::unique_ptr<folly::IOBuf>>(std::move(data));
+    auto socket = socket_;
 
     // Schedule it in the listener's eventbase
     // XXX: Speed this up
-    std::function<void()> f = [client, callback, mvp, truncated] () mutable {
-      callback->onDataAvailable(client, std::move(*mvp), truncated);
+    auto f = [
+      socket,
+      client,
+      callback,
+      data = std::move(data),
+      truncated
+    ]() mutable {
+      callback->onDataAvailable(socket, client, std::move(data), truncated);
     };
 
-    listeners_[nextListener_].first->runInEventBaseThread(f);
+    listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
     ++nextListener_;
   }
 
-  void onReadError(const AsyncSocketException& ex) noexcept {
+  void onReadError(const AsyncSocketException& ex) noexcept override {
     LOG(ERROR) << ex.what();
 
     // Lets register to continue listening for packets
     socket_->resumeRead(this);
   }
 
-  void onReadClosed() noexcept {
+  void onReadClosed() noexcept override {
     for (auto& listener: listeners_) {
       auto callback = listener.second;
 
@@ -182,7 +203,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
   EventBase* const evb_;
   const size_t packetSize_;
 
-  std::unique_ptr<AsyncUDPSocket> socket_;
+  std::shared_ptr<AsyncUDPSocket> socket_;
 
   // List of listener to distribute packets among
   typedef std::pair<EventBase*, Callback*> Listener;
@@ -193,6 +214,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
 
   // Temporary buffer for data
   folly::IOBufQueue buf_;
+
+  bool reusePort_{false};
 };
 
-} // Namespace
+} // namespace folly