io/RecordIO-inl.h \
io/TypedIOBuf.h \
io/ShutdownSocketSet.h \
+ io/async/AsyncPipe.h \
io/async/AsyncTimeout.h \
io/async/AsyncTransport.h \
io/async/AsyncUDPServerSocket.h \
io/IOBufQueue.cpp \
io/RecordIO.cpp \
io/ShutdownSocketSet.cpp \
+ io/async/AsyncPipe.cpp \
io/async/AsyncTimeout.cpp \
io/async/AsyncUDPSocket.cpp \
io/async/AsyncServerSocket.cpp \
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/io/async/AsyncPipe.h>
+
+#include <folly/FileUtil.h>
+#include <folly/io/async/AsyncSocketException.h>
+
+using std::string;
+using std::unique_ptr;
+using folly::IOBuf;
+using folly::IOBufQueue;
+
+namespace folly {
+
+AsyncPipeReader::~AsyncPipeReader() {
+ close();
+}
+
+void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
+ VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
+ "): failed while reading: " << ex.what();
+
+ DCHECK(readCallback_ != nullptr);
+ AsyncReader::ReadCallback* callback = readCallback_;
+ readCallback_ = nullptr;
+ callback->readErr(ex);
+ close();
+}
+
+void AsyncPipeReader::close() {
+ unregisterHandler();
+ if (fd_ >= 0) {
+ changeHandlerFD(-1);
+
+ if (closeCb_) {
+ closeCb_(fd_);
+ } else {
+ ::close(fd_);
+ }
+ fd_ = -1;
+ }
+}
+
+void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
+ DestructorGuard dg(this);
+ CHECK(events & EventHandler::READ);
+
+ VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
+ assert(readCallback_ != nullptr);
+
+ uint16_t numReads = 0;
+ while (readCallback_) {
+ // Get the buffer to read into.
+ void* buf = nullptr;
+ size_t buflen = 0;
+ try {
+ readCallback_->getReadBuffer(&buf, &buflen);
+ } catch (const std::exception& ex) {
+ AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
+ string("ReadCallback::getReadBuffer() "
+ "threw exception: ") + ex.what());
+ failRead(aex);
+ return;
+ } catch (...) {
+ AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
+ string("ReadCallback::getReadBuffer() "
+ "threw non-exception type"));
+ failRead(ex);
+ return;
+ }
+ if (buf == nullptr || buflen == 0) {
+ AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
+ string("ReadCallback::getReadBuffer() "
+ "returned empty buffer"));
+ failRead(ex);
+ return;
+ }
+
+ // Perform the read
+ ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
+ if (bytesRead > 0) {
+ readCallback_->readDataAvailable(bytesRead);
+ // Fall through and continue around the loop if the read
+ // completely filled the available buffer.
+ // Note that readCallback_ may have been uninstalled or changed inside
+ // readDataAvailable().
+ if (static_cast<size_t>(bytesRead) < buflen) {
+ return;
+ }
+ } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // No more data to read right now.
+ return;
+ } else if (bytesRead < 0) {
+ AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
+ "read failed", errno);
+ failRead(ex);
+ return;
+ } else {
+ assert(bytesRead == 0);
+ // EOF
+
+ unregisterHandler();
+ AsyncReader::ReadCallback* callback = readCallback_;
+ readCallback_ = nullptr;
+ callback->readEOF();
+ return;
+ }
+ // Max reads per loop?
+ }
+}
+
+
+void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
+ AsyncWriter::WriteCallback* callback) {
+ if (closed()) {
+ if (callback) {
+ AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+ "attempt to write to closed pipe");
+ callback->writeErr(0, ex);
+ }
+ return;
+ }
+ bool wasEmpty = (queue_.empty());
+ folly::IOBufQueue iobq;
+ iobq.append(std::move(buf));
+ std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
+ std::move(iobq), callback);
+ queue_.emplace_back(std::move(p));
+ if (wasEmpty) {
+ handleWrite();
+ } else {
+ CHECK(!queue_.empty());
+ CHECK(isHandlerRegistered());
+ }
+}
+
+void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
+ std::unique_ptr<folly::IOBuf>&& buf,
+ WriteFlags) {
+ write(std::move(buf), callback);
+}
+
+void AsyncPipeWriter::closeOnEmpty() {
+ VLOG(5) << "close on empty";
+ if (queue_.empty()) {
+ closeNow();
+ } else {
+ closeOnEmpty_ = true;
+ CHECK(isHandlerRegistered());
+ }
+}
+
+void AsyncPipeWriter::closeNow() {
+ VLOG(5) << "close now";
+ if (!queue_.empty()) {
+ failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
+ "closed with pending writes"));
+ }
+ if (fd_ >= 0) {
+ unregisterHandler();
+ changeHandlerFD(-1);
+ if (closeCb_) {
+ closeCb_(fd_);
+ } else {
+ close(fd_);
+ }
+ fd_ = -1;
+ }
+}
+
+void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
+ DestructorGuard dg(this);
+ while (!queue_.empty()) {
+ // the first entry of the queue could have had a partial write, but needs to
+ // be tracked.
+ if (queue_.front().second) {
+ queue_.front().second->writeErr(0, ex);
+ }
+ queue_.pop_front();
+ }
+}
+
+
+void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
+ CHECK(events & EventHandler::WRITE);
+
+ handleWrite();
+}
+
+void AsyncPipeWriter::handleWrite() {
+ DestructorGuard dg(this);
+ assert(!queue_.empty());
+ do {
+ auto& front = queue_.front();
+ folly::IOBufQueue &curQueue = front.first;
+ DCHECK(!curQueue.empty());
+ // someday, support writev. The logic for partial writes is a bit complex
+ const IOBuf* head = curQueue.front();
+ CHECK(head->length());
+ ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
+ if (rc < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // pipe is full
+ VLOG(5) << "write blocked";
+ registerHandler(EventHandler::WRITE);
+ return;
+ } else {
+ failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+ "write failed", errno));
+ closeNow();
+ return;
+ }
+ } else if (rc == 0) {
+ registerHandler(EventHandler::WRITE);
+ return;
+ }
+ curQueue.trimStart(rc);
+ if (curQueue.empty()) {
+ auto cb = front.second;
+ queue_.pop_front();
+ if (cb) {
+ cb->writeSuccess();
+ }
+ } else {
+ VLOG(5) << "partial write blocked";
+ }
+ } while (!queue_.empty());
+
+ if (closeOnEmpty_) {
+ closeNow();
+ } else {
+ unregisterHandler();
+ }
+}
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/IOBufQueue.h>
+
+#include <list>
+#include <system_error>
+
+namespace folly {
+
+class AsyncSocketException;
+
+/**
+ * Read from a pipe in an async manner.
+ */
+class AsyncPipeReader : public EventHandler,
+ public AsyncReader,
+ public DelayedDestruction {
+ public:
+ typedef std::unique_ptr<AsyncPipeReader,
+ folly::DelayedDestruction::Destructor> UniquePtr;
+
+ AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
+ : EventHandler(eventBase, pipeFd),
+ fd_(pipeFd) {}
+
+ /**
+ * Set the read callback and automatically install/uninstall the handler
+ * for events.
+ */
+ void setReadCB(AsyncReader::ReadCallback* callback) override {
+ if (callback == readCallback_) {
+ return;
+ }
+ readCallback_ = callback;
+ if (readCallback_ && !isHandlerRegistered()) {
+ registerHandler(EventHandler::READ | EventHandler::PERSIST);
+ } else if (!readCallback_ && isHandlerRegistered()) {
+ unregisterHandler();
+ }
+ }
+
+ /**
+ * Get the read callback
+ */
+ AsyncReader::ReadCallback* getReadCallback() const override {
+ return readCallback_;
+ }
+
+ /**
+ * Set a special hook to close the socket (otherwise, will call close())
+ */
+ void setCloseCallback(std::function<void(int)> closeCb) {
+ closeCb_ = closeCb;
+ }
+
+ private:
+ ~AsyncPipeReader();
+
+ void handlerReady(uint16_t events) noexcept override;
+ void failRead(const AsyncSocketException& ex);
+ void close();
+
+ int fd_;
+ AsyncReader::ReadCallback* readCallback_{nullptr};
+ std::function<void(int)> closeCb_;
+};
+
+/**
+ * Write to a pipe in an async manner.
+ */
+class AsyncPipeWriter : public EventHandler,
+ public AsyncWriter,
+ public DelayedDestruction {
+ public:
+ typedef std::unique_ptr<AsyncPipeWriter,
+ folly::DelayedDestruction::Destructor> UniquePtr;
+
+ AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
+ : EventHandler(eventBase, pipeFd),
+ fd_(pipeFd) {}
+
+ /**
+ * Asynchronously write the given iobuf to this pipe, and invoke the callback
+ * on success/error.
+ */
+ void write(std::unique_ptr<folly::IOBuf> iob,
+ AsyncWriter::WriteCallback* wcb = nullptr);
+
+ /**
+ * Set a special hook to close the socket (otherwise, will call close())
+ */
+ void setCloseCallback(std::function<void(int)> closeCb) {
+ closeCb_ = closeCb;
+ }
+
+ /**
+ * Returns true if the pipe is closed
+ */
+ bool closed() const {
+ return (fd_ < 0 || closeOnEmpty_);
+ }
+
+ /**
+ * Notify the pipe to close as soon as all pending writes complete
+ */
+ void closeOnEmpty();
+
+ /**
+ * Close the pipe immediately, and fail all pending writes
+ */
+ void closeNow();
+
+ /**
+ * Return true if there are currently writes pending (eg: the pipe is blocked
+ * for writing)
+ */
+ bool hasPendingWrites() const {
+ return !queue_.empty();
+ }
+
+ // AsyncWriter methods
+ void write(folly::AsyncWriter::WriteCallback* callback, const void* buf,
+ size_t bytes, WriteFlags flags = WriteFlags::NONE) override {
+ writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
+ }
+ void writev(folly::AsyncWriter::WriteCallback*, const iovec*,
+ size_t, WriteFlags = WriteFlags::NONE) override {
+ throw std::runtime_error("writev is not supported. Please use writeChain.");
+ }
+ void writeChain(folly::AsyncWriter::WriteCallback* callback,
+ std::unique_ptr<folly::IOBuf>&& buf,
+ WriteFlags flags = WriteFlags::NONE) override;
+
+ private:
+ void handlerReady(uint16_t events) noexcept override;
+ void handleWrite();
+ void failAllWrites(const AsyncSocketException& ex);
+
+ int fd_;
+ std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
+ bool closeOnEmpty_{false};
+ std::function<void(int)> closeCb_;
+
+ ~AsyncPipeWriter() {
+ closeNow();
+ }
+};
+
+} // folly
virtual ~AsyncTransport() = default;
};
-// Transitional intermediate interface. This is deprecated.
-// Wrapper around folly::AsyncTransport, that includes read/write callbacks
-class AsyncTransportWrapper : virtual public AsyncTransport {
+class AsyncReader {
public:
- typedef std::unique_ptr<AsyncTransportWrapper, Destructor> UniquePtr;
-
class ReadCallback {
public:
virtual ~ReadCallback() = default;
virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
};
+ // Read methods that aren't part of AsyncTransport.
+ virtual void setReadCB(ReadCallback* callback) = 0;
+ virtual ReadCallback* getReadCallback() const = 0;
+
+ protected:
+ virtual ~AsyncReader() = default;
+};
+
+class AsyncWriter {
+ public:
class WriteCallback {
public:
virtual ~WriteCallback() = default;
const AsyncSocketException& ex) noexcept = 0;
};
- // Read/write methods that aren't part of AsyncTransport
- virtual void setReadCB(ReadCallback* callback) = 0;
- virtual ReadCallback* getReadCallback() const = 0;
-
+ // Write methods that aren't part of AsyncTransport
virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
WriteFlags flags = WriteFlags::NONE) = 0;
virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
virtual void writeChain(WriteCallback* callback,
std::unique_ptr<IOBuf>&& buf,
WriteFlags flags = WriteFlags::NONE) = 0;
+
+ protected:
+ virtual ~AsyncWriter() = default;
+};
+
+// Transitional intermediate interface. This is deprecated.
+// Wrapper around folly::AsyncTransport, that includes read/write callbacks
+class AsyncTransportWrapper : virtual public AsyncTransport,
+ virtual public AsyncReader,
+ virtual public AsyncWriter {
+ public:
+ using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
+
+ // Alias for inherited members from AsyncReader and AsyncWriter
+ // to keep compatibility.
+ using ReadCallback = AsyncReader::ReadCallback;
+ using WriteCallback = AsyncWriter::WriteCallback;
+ virtual void setReadCB(ReadCallback* callback) override = 0;
+ virtual ReadCallback* getReadCallback() const override = 0;
+ virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
+ WriteFlags flags = WriteFlags::NONE) override = 0;
+ virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
+ WriteFlags flags = WriteFlags::NONE) override = 0;
+ virtual void writeChain(WriteCallback* callback,
+ std::unique_ptr<IOBuf>&& buf,
+ WriteFlags flags = WriteFlags::NONE) override = 0;
};
} // folly
### AsyncPipe
-TODO: not currently open souce
-
Async reads/writes to a unix pipe, to send data between processes.
Why don't you just use AsyncSocket for now?
notify of overload, such as timeouts, or CPU usage. For sync
systems, you are almost always limited by the number of threads.
For more details see [No Time for
- Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf)
\ No newline at end of file
+ Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf)
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncPipe.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/Memory.h>
+#include <gtest/gtest.h>
+
+#include <fcntl.h>
+
+using namespace testing;
+
+namespace {
+
+class TestReadCallback : public folly::AsyncReader::ReadCallback {
+ public:
+ void readDataAvailable(size_t len) noexcept override {
+ readBuffer_.postallocate(len);
+ }
+
+ void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
+ auto res = readBuffer_.preallocate(4000, 65000);
+ *bufReturn = res.first;
+ *lenReturn = res.second;
+ }
+
+ void readEOF() noexcept override {}
+
+ void readErr(const folly::AsyncSocketException&) noexcept override {
+ error_ = true;
+ }
+
+ std::string getData() {
+ auto buf = readBuffer_.move();
+ buf->coalesce();
+ return std::string((char *)buf->data(), buf->length());
+ }
+
+ folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
+ bool error_{false};
+};
+
+class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
+ public:
+ void writeSuccess() noexcept override { writes_++; }
+
+ void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
+ error_ = true;
+ }
+
+ uint32_t writes_{0};
+ bool error_{false};
+};
+
+class AsyncPipeTest: public Test {
+ public:
+ void SetUp() override {
+ int rc = pipe(pipeFds_);
+ EXPECT_EQ(rc, 0);
+
+ EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
+ EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
+ reader_ = folly::make_unique<folly::AsyncPipeReader,
+ folly::DelayedDestruction::Destructor>(
+ &eventBase_, pipeFds_[0]);
+ writer_ = folly::make_unique<folly::AsyncPipeWriter,
+ folly::DelayedDestruction::Destructor>(
+ &eventBase_, pipeFds_[1]);
+ }
+
+ protected:
+ folly::EventBase eventBase_;
+ int pipeFds_[2];
+ folly::AsyncPipeReader::UniquePtr reader_;
+ folly::AsyncPipeWriter::UniquePtr writer_;
+ TestReadCallback readCallback_;
+ TestWriteCallback writeCallback_;
+};
+
+std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
+ auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
+ return buf;
+}
+
+} // anonymous namespace
+
+
+TEST_F(AsyncPipeTest, simple) {
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_FALSE(writeCallback_.error_);
+}
+
+TEST_F(AsyncPipeTest, blocked_writes) {
+ uint32_t writeAttempts = 0;
+ do {
+ ++writeAttempts;
+ writer_->write(getBuf("hello"), &writeCallback_);
+ } while (writeCallback_.writes_ == writeAttempts);
+ // there is one blocked write
+ writer_->closeOnEmpty();
+
+ reader_->setReadCB(&readCallback_);
+
+ eventBase_.loop();
+ std::string expected;
+ for (uint32_t i = 0; i < writeAttempts; i++) {
+ expected += "hello";
+ }
+ EXPECT_EQ(readCallback_.getData(), expected);
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+ EXPECT_FALSE(writeCallback_.error_);
+}
+
+TEST_F(AsyncPipeTest, writeOnClose) {
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ writer_->write(getBuf("hello"), &writeCallback_);
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_TRUE(writeCallback_.error_);
+}