From: James Sedgwick Date: Fri, 21 Nov 2014 20:59:49 +0000 (-0800) Subject: AsyncSocketHandler X-Git-Tag: v0.22.0~149 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=96e30d3178411fe9dd80d79003d472b91296e8c3;p=folly.git AsyncSocketHandler Summary: mostly copypasta from TAsyncTransportHandler Test Plan: compiles Reviewed By: davejwatson@fb.com Subscribers: trunkagent, fugalh, njormrod, folly-diffs@ FB internal diff: D1690973 Signature: t1:1690973:1416529528:4feb187a68ad5405662b9b0efb160edd253a2977 --- diff --git a/folly/experimental/wangle/channel/AsyncSocketHandler.h b/folly/experimental/wangle/channel/AsyncSocketHandler.h new file mode 100644 index 00000000..91277b68 --- /dev/null +++ b/folly/experimental/wangle/channel/AsyncSocketHandler.h @@ -0,0 +1,153 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +class AsyncSocketHandler + : public folly::wangle::BytesToBytesHandler, + public AsyncSocket::ReadCallback { + public: + explicit AsyncSocketHandler( + std::shared_ptr socket) + : socket_(std::move(socket)) {} + + AsyncSocketHandler(AsyncSocketHandler&&) = default; + + ~AsyncSocketHandler() { + if (socket_) { + detachReadCallback(); + } + } + + void attachReadCallback() { + socket_->setReadCB(socket_->good() ? this : nullptr); + } + + void detachReadCallback() { + if (socket_->getReadCallback() == this) { + socket_->setReadCB(nullptr); + } + } + + void attachEventBase(folly::EventBase* eventBase) { + if (eventBase && !socket_->getEventBase()) { + socket_->attachEventBase(eventBase); + } + } + + void detachEventBase() { + detachReadCallback(); + if (socket_->getEventBase()) { + socket_->detachEventBase(); + } + } + + void attachPipeline(Context* ctx) override { + CHECK(!ctx_); + ctx_ = ctx; + } + + folly::wangle::Future write( + Context* ctx, + std::unique_ptr buf) override { + if (UNLIKELY(!buf)) { + return folly::wangle::makeFuture(); + } + + if (!socket_->good()) { + VLOG(5) << "socket is closed in write()"; + return folly::wangle::makeFuture(AsyncSocketException( + AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, + "socket is closed in write()")); + } + + auto cb = new WriteCallback(); + auto future = cb->promise_.getFuture(); + socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags()); + return future; + }; + + folly::wangle::Future close(Context* ctx) { + if (socket_) { + detachReadCallback(); + socket_->closeNow(); + } + return folly::wangle::makeFuture(); + } + + // Must override to avoid warnings about hidden overloaded virtual due to + // AsyncSocket::ReadCallback::readEOF() + void readEOF(Context* ctx) override { + ctx->fireReadEOF(); + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) override { + const auto readBufferSettings = ctx_->getReadBufferSettings(); + const auto ret = bufQueue_.preallocate( + readBufferSettings.first, + readBufferSettings.second); + *bufReturn = ret.first; + *lenReturn = ret.second; + } + + void readDataAvailable(size_t len) noexcept override { + bufQueue_.postallocate(len); + ctx_->fireRead(bufQueue_); + } + + void readEOF() noexcept override { + ctx_->fireReadEOF(); + } + + void readErr(const AsyncSocketException& ex) + noexcept override { + ctx_->fireReadException(ex); + } + + private: + class WriteCallback : private AsyncSocket::WriteCallback { + void writeSuccess() noexcept override { + promise_.setValue(); + delete this; + } + + void writeErr(size_t bytesWritten, + const AsyncSocketException& ex) + noexcept override { + promise_.setException(ex); + delete this; + } + + private: + friend class AsyncSocketHandler; + folly::wangle::Promise promise_; + }; + + Context* ctx_{nullptr}; + folly::IOBufQueue bufQueue_; + std::shared_ptr socket_{nullptr}; +}; + +}} diff --git a/folly/experimental/wangle/channel/ChannelTest.cpp b/folly/experimental/wangle/channel/ChannelTest.cpp index 6b7ec897..2f155226 100644 --- a/folly/experimental/wangle/channel/ChannelTest.cpp +++ b/folly/experimental/wangle/channel/ChannelTest.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include #include #include @@ -103,3 +105,13 @@ TEST(ChannelTest, PlzCompile2) { .finalize(); pipeline.read(42); } + +TEST(ChannelTest, HandlersCompile) { + EventBase eb; + auto socket = AsyncSocket::newSocket(&eb); + ChannelPipeline> pipeline; + pipeline + .addBack(AsyncSocketHandler(socket)) + .addBack(OutputBufferingHandler()) + .finalize(); +} diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 33924b6d..f8eb8e55 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -106,7 +106,7 @@ class AsyncSocket : virtual public AsyncTransport { * * If getReadBuffer() throws an exception, returns a nullptr buffer, or * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. + * readErr() method will be invoked. * * getReadBuffer() is not allowed to change the transport state before it * returns. (For example, it should never uninstall the read callback, or @@ -144,11 +144,11 @@ class AsyncSocket : virtual public AsyncTransport { virtual void readEOF() noexcept = 0; /** - * readError() will be invoked if an error occurs reading from the + * readErr() will be invoked if an error occurs reading from the * transport. * * The read callback will be automatically uninstalled immediately before - * readError() is invoked. + * readErr() is invoked. * * @param ex An exception describing the error that occurred. */ @@ -174,7 +174,7 @@ class AsyncSocket : virtual public AsyncTransport { virtual void writeSuccess() noexcept = 0; /** - * writeError() will be invoked if an error occurs writing the data. + * writeErr() will be invoked if an error occurs writing the data. * * @param bytesWritten The number of bytes that were successfull * @param ex An exception describing the error that occurred.