/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
-#include <folly/MoveWrapper.h>
-#include <folly/io/IOBufQueue.h>
#include <folly/Memory.h>
+#include <folly/io/IOBufQueue.h>
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
* more than 1 packet will not work because they will end up with
* different event base to process.
*/
-class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
+class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
+ , public AsyncSocketBase {
public:
class Callback {
public:
/**
* Invoked when a new packet is received
*/
- virtual void onDataAvailable(const folly::SocketAddress& addr,
- std::unique_ptr<folly::IOBuf> buf,
- bool truncated) noexcept = 0;
+ virtual void onDataAvailable(
+ std::shared_ptr<AsyncUDPSocket> socket,
+ const folly::SocketAddress& addr,
+ std::unique_ptr<folly::IOBuf> buf,
+ bool truncated) noexcept = 0;
- virtual ~Callback() {}
+ virtual ~Callback() = default;
};
/**
nextListener_(0) {
}
- ~AsyncUDPServerSocket() {
+ ~AsyncUDPServerSocket() override {
if (socket_) {
close();
}
}
- void bind(const folly::SocketAddress& address) {
+ void bind(const folly::SocketAddress& addy) {
CHECK(!socket_);
- socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
- socket_->bind(address);
+ socket_ = std::make_shared<AsyncUDPSocket>(evb_);
+ socket_->setReusePort(reusePort_);
+ socket_->bind(addy);
+ }
+
+ void setReusePort(bool reusePort) {
+ reusePort_ = reusePort;
}
folly::SocketAddress address() const {
return socket_->address();
}
+ void getAddress(SocketAddress* a) const override {
+ *a = address();
+ }
+
/**
* Add a listener to the round robin list
*/
void close() {
CHECK(socket_) << "Need to bind before closing";
+ socket_->close();
socket_.reset();
}
+ EventBase* getEventBase() const override {
+ return evb_;
+ }
+
private:
// AsyncUDPSocket::ReadCallback
- void getReadBuffer(void** buf, size_t* len) noexcept {
+ void getReadBuffer(void** buf, size_t* len) noexcept override {
std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
}
- void onDataAvailable(const folly::SocketAddress& clientAddress,
- size_t len,
- bool truncated) noexcept {
+ void onDataAvailable(
+ const folly::SocketAddress& clientAddress,
+ size_t len,
+ bool truncated) noexcept override {
buf_.postallocate(len);
auto data = buf_.split(len);
auto client = clientAddress;
auto callback = listeners_[nextListener_].second;
- auto mvp =
- folly::MoveWrapper<
- std::unique_ptr<folly::IOBuf>>(std::move(data));
+ auto socket = socket_;
// Schedule it in the listener's eventbase
// XXX: Speed this up
- std::function<void()> f = [client, callback, mvp, truncated] () mutable {
- callback->onDataAvailable(client, std::move(*mvp), truncated);
+ auto f = [
+ socket,
+ client,
+ callback,
+ data = std::move(data),
+ truncated
+ ]() mutable {
+ callback->onDataAvailable(socket, client, std::move(data), truncated);
};
- listeners_[nextListener_].first->runInEventBaseThread(f);
+ listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
++nextListener_;
}
- void onReadError(const AsyncSocketException& ex) noexcept {
+ void onReadError(const AsyncSocketException& ex) noexcept override {
LOG(ERROR) << ex.what();
// Lets register to continue listening for packets
socket_->resumeRead(this);
}
- void onReadClosed() noexcept {
+ void onReadClosed() noexcept override {
for (auto& listener: listeners_) {
auto callback = listener.second;
EventBase* const evb_;
const size_t packetSize_;
- std::unique_ptr<AsyncUDPSocket> socket_;
+ std::shared_ptr<AsyncUDPSocket> socket_;
// List of listener to distribute packets among
typedef std::pair<EventBase*, Callback*> Listener;
// Temporary buffer for data
folly::IOBufQueue buf_;
+
+ bool reusePort_{false};
};
-} // Namespace
+} // namespace folly