2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/ExceptionWrapper.h>
19 #include <folly/Format.h>
20 #include <folly/Portability.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/Cursor.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25 #include <folly/portability/Fcntl.h>
26 #include <folly/portability/Sockets.h>
27 #include <folly/portability/SysUio.h>
28 #include <folly/portability/Unistd.h>
30 #include <boost/preprocessor/control/if.hpp>
33 #include <sys/types.h>
37 using std::unique_ptr;
39 namespace fsp = folly::portability::sockets;
43 static constexpr bool msgErrQueueSupported =
44 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
48 #endif // FOLLY_HAVE_MSG_ERRQUEUE
50 // static members initializers
51 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
53 const AsyncSocketException socketClosedLocallyEx(
54 AsyncSocketException::END_OF_FILE, "socket closed locally");
55 const AsyncSocketException socketShutdownForWritesEx(
56 AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
58 // TODO: It might help performance to provide a version of BytesWriteRequest that
59 // users could derive from, so we can avoid the extra allocation for each call
60 // to write()/writev(). We could templatize TFramedAsyncChannel just like the
61 // protocols are currently templatized for transports.
63 // We would need the version for external users where they provide the iovec
64 // storage space, and only our internal version would allocate it at the end of
67 /* The default WriteRequest implementation, used for write(), writev() and
70 * A new BytesWriteRequest operation is allocated on the heap for all write
71 * operations that cannot be completed immediately.
73 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
75 static BytesWriteRequest* newRequest(AsyncSocket* socket,
76 WriteCallback* callback,
79 uint32_t partialWritten,
80 uint32_t bytesWritten,
81 unique_ptr<IOBuf>&& ioBuf,
84 // Since we put a variable size iovec array at the end
85 // of each BytesWriteRequest, we have to manually allocate the memory.
86 void* buf = malloc(sizeof(BytesWriteRequest) +
87 (opCount * sizeof(struct iovec)));
89 throw std::bad_alloc();
92 return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
93 partialWritten, bytesWritten,
94 std::move(ioBuf), flags);
97 void destroy() override {
98 this->~BytesWriteRequest();
102 WriteResult performWrite() override {
103 WriteFlags writeFlags = flags_;
104 if (getNext() != nullptr) {
105 writeFlags |= WriteFlags::CORK;
108 socket_->adjustZeroCopyFlags(writeFlags);
110 auto writeResult = socket_->performWrite(
111 getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
112 bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
114 if (socket_->isZeroCopyRequest(writeFlags)) {
116 socket_->addZeroCopyBuf(std::move(ioBuf_));
118 socket_->addZeroCopyBuf(ioBuf_.get());
121 // this happens if at least one of the prev requests were sent
122 // with zero copy but not the last one
123 if (isComplete() && socket_->getZeroCopy() &&
124 socket_->containsZeroCopyBuf(ioBuf_.get())) {
125 socket_->setZeroCopyBuf(std::move(ioBuf_));
132 bool isComplete() override {
133 return opsWritten_ == getOpCount();
136 void consume() override {
137 // Advance opIndex_ forward by opsWritten_
138 opIndex_ += opsWritten_;
139 assert(opIndex_ < opCount_);
141 if (!socket_->isZeroCopyRequest(flags_)) {
142 // If we've finished writing any IOBufs, release them
144 for (uint32_t i = opsWritten_; i != 0; --i) {
146 ioBuf_ = ioBuf_->pop();
151 // Move partialBytes_ forward into the current iovec buffer
152 struct iovec* currentOp = writeOps_ + opIndex_;
153 assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
154 currentOp->iov_base =
155 reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
156 currentOp->iov_len -= partialBytes_;
158 // Increment the totalBytesWritten_ count by bytesWritten_;
159 assert(bytesWritten_ >= 0);
160 totalBytesWritten_ += uint32_t(bytesWritten_);
164 BytesWriteRequest(AsyncSocket* socket,
165 WriteCallback* callback,
166 const struct iovec* ops,
168 uint32_t partialBytes,
169 uint32_t bytesWritten,
170 unique_ptr<IOBuf>&& ioBuf,
172 : AsyncSocket::WriteRequest(socket, callback)
176 , ioBuf_(std::move(ioBuf))
178 , partialBytes_(partialBytes)
179 , bytesWritten_(bytesWritten) {
180 memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
183 // private destructor, to ensure callers use destroy()
184 ~BytesWriteRequest() override = default;
186 const struct iovec* getOps() const {
187 assert(opCount_ > opIndex_);
188 return writeOps_ + opIndex_;
191 uint32_t getOpCount() const {
192 assert(opCount_ > opIndex_);
193 return opCount_ - opIndex_;
196 uint32_t opCount_; ///< number of entries in writeOps_
197 uint32_t opIndex_; ///< current index into writeOps_
198 WriteFlags flags_; ///< set for WriteFlags
199 unique_ptr<IOBuf> ioBuf_; ///< underlying IOBuf, or nullptr if N/A
201 // for consume(), how much we wrote on the last write
202 uint32_t opsWritten_; ///< complete ops written
203 uint32_t partialBytes_; ///< partial bytes of incomplete op written
204 ssize_t bytesWritten_; ///< bytes written altogether
206 struct iovec writeOps_[]; ///< write operation(s) list
209 int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
210 folly::WriteFlags flags,
211 bool zeroCopyEnabled) noexcept {
212 int msg_flags = MSG_DONTWAIT;
214 #ifdef MSG_NOSIGNAL // Linux-only
215 msg_flags |= MSG_NOSIGNAL;
217 if (isSet(flags, WriteFlags::CORK)) {
218 // MSG_MORE tells the kernel we have more data to send, so wait for us to
219 // give it the rest of the data rather than immediately sending a partial
220 // frame, even when TCP_NODELAY is enabled.
221 msg_flags |= MSG_MORE;
224 #endif // MSG_NOSIGNAL
225 if (isSet(flags, WriteFlags::EOR)) {
226 // marks that this is the last byte of a record (response)
227 msg_flags |= MSG_EOR;
230 if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
231 msg_flags |= MSG_ZEROCOPY;
238 static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
241 AsyncSocket::AsyncSocket()
242 : eventBase_(nullptr),
243 writeTimeout_(this, nullptr),
244 ioHandler_(this, nullptr),
245 immediateReadHandler_(this) {
246 VLOG(5) << "new AsyncSocket()";
250 AsyncSocket::AsyncSocket(EventBase* evb)
252 writeTimeout_(this, evb),
253 ioHandler_(this, evb),
254 immediateReadHandler_(this) {
255 VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
259 AsyncSocket::AsyncSocket(EventBase* evb,
260 const folly::SocketAddress& address,
261 uint32_t connectTimeout)
263 connect(nullptr, address, connectTimeout);
266 AsyncSocket::AsyncSocket(EventBase* evb,
267 const std::string& ip,
269 uint32_t connectTimeout)
271 connect(nullptr, ip, port, connectTimeout);
274 AsyncSocket::AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId)
275 : zeroCopyBufId_(zeroCopyBufId),
277 writeTimeout_(this, evb),
278 ioHandler_(this, evb, fd),
279 immediateReadHandler_(this) {
280 VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" << fd
281 << ", zeroCopyBufId=" << zeroCopyBufId << ")";
285 state_ = StateEnum::ESTABLISHED;
288 AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
290 oldAsyncSocket->getEventBase(),
291 oldAsyncSocket->detachFd(),
292 oldAsyncSocket->getZeroCopyBufId()) {
293 preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
296 // init() method, since constructor forwarding isn't supported in most
298 void AsyncSocket::init() {
300 eventBase_->dcheckIsInEventBaseThread();
303 state_ = StateEnum::UNINIT;
304 eventFlags_ = EventHandler::NONE;
307 maxReadsPerEvent_ = 16;
308 connectCallback_ = nullptr;
309 errMessageCallback_ = nullptr;
310 readCallback_ = nullptr;
311 writeReqHead_ = nullptr;
312 writeReqTail_ = nullptr;
313 wShutdownSocketSet_.reset();
314 appBytesWritten_ = 0;
315 appBytesReceived_ = 0;
316 sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
319 AsyncSocket::~AsyncSocket() {
320 VLOG(7) << "actual destruction of AsyncSocket(this=" << this
321 << ", evb=" << eventBase_ << ", fd=" << fd_
322 << ", state=" << state_ << ")";
325 void AsyncSocket::destroy() {
326 VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
327 << ", fd=" << fd_ << ", state=" << state_;
328 // When destroy is called, close the socket immediately
331 // Then call DelayedDestruction::destroy() to take care of
332 // whether or not we need immediate or delayed destruction
333 DelayedDestruction::destroy();
336 int AsyncSocket::detachFd() {
337 VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
338 << ", evb=" << eventBase_ << ", state=" << state_
339 << ", events=" << std::hex << eventFlags_ << ")";
340 // Extract the fd, and set fd_ to -1 first, so closeNow() won't
341 // actually close the descriptor.
342 if (const auto socketSet = wShutdownSocketSet_.lock()) {
343 socketSet->remove(fd_);
347 // Call closeNow() to invoke all pending callbacks with an error.
349 // Update the EventHandler to stop using this fd.
350 // This can only be done after closeNow() unregisters the handler.
351 ioHandler_.changeHandlerFD(-1);
355 const folly::SocketAddress& AsyncSocket::anyAddress() {
356 static const folly::SocketAddress anyAddress =
357 folly::SocketAddress("0.0.0.0", 0);
361 void AsyncSocket::setShutdownSocketSet(
362 const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
363 const auto newSS = wNewSS.lock();
364 const auto shutdownSocketSet = wShutdownSocketSet_.lock();
366 if (newSS == shutdownSocketSet) {
370 if (shutdownSocketSet && fd_ != -1) {
371 shutdownSocketSet->remove(fd_);
374 if (newSS && fd_ != -1) {
378 wShutdownSocketSet_ = wNewSS;
381 void AsyncSocket::setCloseOnExec() {
382 int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
384 auto errnoCopy = errno;
385 throw AsyncSocketException(
386 AsyncSocketException::INTERNAL_ERROR,
387 withAddr("failed to set close-on-exec flag"),
392 void AsyncSocket::connect(ConnectCallback* callback,
393 const folly::SocketAddress& address,
395 const OptionMap &options,
396 const folly::SocketAddress& bindAddr) noexcept {
397 DestructorGuard dg(this);
398 eventBase_->dcheckIsInEventBaseThread();
402 // Make sure we're in the uninitialized state
403 if (state_ != StateEnum::UNINIT) {
404 return invalidState(callback);
407 connectTimeout_ = std::chrono::milliseconds(timeout);
408 connectStartTime_ = std::chrono::steady_clock::now();
409 // Make connect end time at least >= connectStartTime.
410 connectEndTime_ = connectStartTime_;
413 state_ = StateEnum::CONNECTING;
414 connectCallback_ = callback;
416 sockaddr_storage addrStorage;
417 sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
421 // Technically the first parameter should actually be a protocol family
422 // constant (PF_xxx) rather than an address family (AF_xxx), but the
423 // distinction is mainly just historical. In pretty much all
424 // implementations the PF_foo and AF_foo constants are identical.
425 fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
427 auto errnoCopy = errno;
428 throw AsyncSocketException(
429 AsyncSocketException::INTERNAL_ERROR,
430 withAddr("failed to create socket"),
433 if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
434 shutdownSocketSet->add(fd_);
436 ioHandler_.changeHandlerFD(fd_);
440 // Put the socket in non-blocking mode
441 int flags = fcntl(fd_, F_GETFL, 0);
443 auto errnoCopy = errno;
444 throw AsyncSocketException(
445 AsyncSocketException::INTERNAL_ERROR,
446 withAddr("failed to get socket flags"),
449 int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
451 auto errnoCopy = errno;
452 throw AsyncSocketException(
453 AsyncSocketException::INTERNAL_ERROR,
454 withAddr("failed to put socket in non-blocking mode"),
458 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
459 // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
460 rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
462 auto errnoCopy = errno;
463 throw AsyncSocketException(
464 AsyncSocketException::INTERNAL_ERROR,
465 "failed to enable F_SETNOSIGPIPE on socket",
470 // By default, turn on TCP_NODELAY
471 // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
472 // setNoDelay() will log an error message if it fails.
473 // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
475 if (address.getFamily() != AF_UNIX) {
476 (void)setNoDelay(true);
477 setZeroCopy(zeroCopyVal_);
480 VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
481 << ", fd=" << fd_ << ", host=" << address.describe().c_str();
484 if (bindAddr != anyAddress()) {
486 if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
487 auto errnoCopy = errno;
489 throw AsyncSocketException(
490 AsyncSocketException::NOT_OPEN,
491 "failed to setsockopt prior to bind on " + bindAddr.describe(),
495 bindAddr.getAddress(&addrStorage);
497 if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
498 auto errnoCopy = errno;
500 throw AsyncSocketException(
501 AsyncSocketException::NOT_OPEN,
502 "failed to bind to async socket: " + bindAddr.describe(),
507 // Apply the additional options if any.
508 for (const auto& opt: options) {
509 rv = opt.first.apply(fd_, opt.second);
511 auto errnoCopy = errno;
512 throw AsyncSocketException(
513 AsyncSocketException::INTERNAL_ERROR,
514 withAddr("failed to set socket option"),
519 // Perform the connect()
520 address.getAddress(&addrStorage);
523 state_ = StateEnum::FAST_OPEN;
524 tfoAttempted_ = true;
526 if (socketConnect(saddr, addr_.getActualSize()) < 0) {
531 // If we're still here the connect() succeeded immediately.
532 // Fall through to call the callback outside of this try...catch block
533 } catch (const AsyncSocketException& ex) {
534 return failConnect(__func__, ex);
535 } catch (const std::exception& ex) {
536 // shouldn't happen, but handle it just in case
537 VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
538 << "): unexpected " << typeid(ex).name() << " exception: "
540 AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
541 withAddr(string("unexpected exception: ") +
543 return failConnect(__func__, tex);
546 // The connection succeeded immediately
547 // The read callback may not have been set yet, and no writes may be pending
548 // yet, so we don't have to register for any events at the moment.
549 VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
550 assert(errMessageCallback_ == nullptr);
551 assert(readCallback_ == nullptr);
552 assert(writeReqHead_ == nullptr);
553 if (state_ != StateEnum::FAST_OPEN) {
554 state_ = StateEnum::ESTABLISHED;
556 invokeConnectSuccess();
559 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
561 if (noTransparentTls_) {
562 // Ignore return value, errors are ok
563 setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
566 VLOG(4) << "Disabling TSOCKS for fd " << fd_;
567 // Ignore return value, errors are ok
568 setsockopt(fd_, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
571 int rv = fsp::connect(fd_, saddr, len);
573 auto errnoCopy = errno;
574 if (errnoCopy == EINPROGRESS) {
575 scheduleConnectTimeout();
576 registerForConnectEvents();
578 throw AsyncSocketException(
579 AsyncSocketException::NOT_OPEN,
580 "connect failed (immediately)",
587 void AsyncSocket::scheduleConnectTimeout() {
588 // Connection in progress.
589 auto timeout = connectTimeout_.count();
591 // Start a timer in case the connection takes too long.
592 if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
593 throw AsyncSocketException(
594 AsyncSocketException::INTERNAL_ERROR,
595 withAddr("failed to schedule AsyncSocket connect timeout"));
600 void AsyncSocket::registerForConnectEvents() {
601 // Register for write events, so we'll
602 // be notified when the connection finishes/fails.
603 // Note that we don't register for a persistent event here.
604 assert(eventFlags_ == EventHandler::NONE);
605 eventFlags_ = EventHandler::WRITE;
606 if (!ioHandler_.registerHandler(eventFlags_)) {
607 throw AsyncSocketException(
608 AsyncSocketException::INTERNAL_ERROR,
609 withAddr("failed to register AsyncSocket connect handler"));
613 void AsyncSocket::connect(ConnectCallback* callback,
614 const string& ip, uint16_t port,
616 const OptionMap &options) noexcept {
617 DestructorGuard dg(this);
619 connectCallback_ = callback;
620 connect(callback, folly::SocketAddress(ip, port), timeout, options);
621 } catch (const std::exception& ex) {
622 AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
624 return failConnect(__func__, tex);
628 void AsyncSocket::cancelConnect() {
629 connectCallback_ = nullptr;
630 if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
635 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
636 sendTimeout_ = milliseconds;
638 eventBase_->dcheckIsInEventBaseThread();
641 // If we are currently pending on write requests, immediately update
642 // writeTimeout_ with the new value.
643 if ((eventFlags_ & EventHandler::WRITE) &&
644 (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
645 assert(state_ == StateEnum::ESTABLISHED);
646 assert((shutdownFlags_ & SHUT_WRITE) == 0);
647 if (sendTimeout_ > 0) {
648 if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
649 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
650 withAddr("failed to reschedule send timeout in setSendTimeout"));
651 return failWrite(__func__, ex);
654 writeTimeout_.cancelTimeout();
659 void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
660 VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this
661 << ", fd=" << fd_ << ", callback=" << callback
662 << ", state=" << state_;
664 // In the latest stable kernel 4.14.3 as of 2017-12-04, unix domain
665 // socket does not support MSG_ERRQUEUE. So recvmsg(MSG_ERRQUEUE)
666 // will read application data from unix doamin socket as error
667 // message, which breaks the message flow in application. Feel free
668 // to remove the next code block if MSG_ERRQUEUE is added for unix
669 // domain socket in the future.
670 if (callback != nullptr) {
672 if (localAddr_.getFamily() == AF_UNIX) {
673 LOG(ERROR) << "Failed to set ErrMessageCallback=" << callback
674 << " for Unix Doamin Socket where MSG_ERRQUEUE is unsupported,"
680 // Short circuit if callback is the same as the existing errMessageCallback_.
681 if (callback == errMessageCallback_) {
685 if (!msgErrQueueSupported) {
686 // Per-socket error message queue is not supported on this platform.
687 return invalidState(callback);
690 DestructorGuard dg(this);
691 eventBase_->dcheckIsInEventBaseThread();
693 if (callback == nullptr) {
694 // We should be able to reset the callback regardless of the
695 // socket state. It's important to have a reliable callback
696 // cancellation mechanism.
697 errMessageCallback_ = callback;
701 switch ((StateEnum)state_) {
702 case StateEnum::CONNECTING:
703 case StateEnum::FAST_OPEN:
704 case StateEnum::ESTABLISHED: {
705 errMessageCallback_ = callback;
708 case StateEnum::CLOSED:
709 case StateEnum::ERROR:
710 // We should never reach here. SHUT_READ should always be set
711 // if we are in STATE_CLOSED or STATE_ERROR.
713 return invalidState(callback);
714 case StateEnum::UNINIT:
715 // We do not allow setReadCallback() to be called before we start
717 return invalidState(callback);
720 // We don't put a default case in the switch statement, so that the compiler
721 // will warn us to update the switch statement if a new state is added.
722 return invalidState(callback);
725 AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
726 return errMessageCallback_;
729 void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
730 sendMsgParamCallback_ = callback;
733 AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {
734 return sendMsgParamCallback_;
737 void AsyncSocket::setReadCB(ReadCallback *callback) {
738 VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
739 << ", callback=" << callback << ", state=" << state_;
741 // Short circuit if callback is the same as the existing readCallback_.
743 // Note that this is needed for proper functioning during some cleanup cases.
744 // During cleanup we allow setReadCallback(nullptr) to be called even if the
745 // read callback is already unset and we have been detached from an event
746 // base. This check prevents us from asserting
747 // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
748 if (callback == readCallback_) {
752 /* We are removing a read callback */
753 if (callback == nullptr &&
754 immediateReadHandler_.isLoopCallbackScheduled()) {
755 immediateReadHandler_.cancelLoopCallback();
758 if (shutdownFlags_ & SHUT_READ) {
759 // Reads have already been shut down on this socket.
761 // Allow setReadCallback(nullptr) to be called in this case, but don't
762 // allow a new callback to be set.
764 // For example, setReadCallback(nullptr) can happen after an error if we
765 // invoke some other error callback before invoking readError(). The other
766 // error callback that is invoked first may go ahead and clear the read
767 // callback before we get a chance to invoke readError().
768 if (callback != nullptr) {
769 return invalidState(callback);
771 assert((eventFlags_ & EventHandler::READ) == 0);
772 readCallback_ = nullptr;
776 DestructorGuard dg(this);
777 eventBase_->dcheckIsInEventBaseThread();
779 switch ((StateEnum)state_) {
780 case StateEnum::CONNECTING:
781 case StateEnum::FAST_OPEN:
782 // For convenience, we allow the read callback to be set while we are
783 // still connecting. We just store the callback for now. Once the
784 // connection completes we'll register for read events.
785 readCallback_ = callback;
787 case StateEnum::ESTABLISHED:
789 readCallback_ = callback;
790 uint16_t oldFlags = eventFlags_;
792 eventFlags_ |= EventHandler::READ;
794 eventFlags_ &= ~EventHandler::READ;
797 // Update our registration if our flags have changed
798 if (eventFlags_ != oldFlags) {
799 // We intentionally ignore the return value here.
800 // updateEventRegistration() will move us into the error state if it
801 // fails, and we don't need to do anything else here afterwards.
802 (void)updateEventRegistration();
806 checkForImmediateRead();
810 case StateEnum::CLOSED:
811 case StateEnum::ERROR:
812 // We should never reach here. SHUT_READ should always be set
813 // if we are in STATE_CLOSED or STATE_ERROR.
815 return invalidState(callback);
816 case StateEnum::UNINIT:
817 // We do not allow setReadCallback() to be called before we start
819 return invalidState(callback);
822 // We don't put a default case in the switch statement, so that the compiler
823 // will warn us to update the switch statement if a new state is added.
824 return invalidState(callback);
827 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
828 return readCallback_;
831 bool AsyncSocket::setZeroCopy(bool enable) {
832 if (msgErrQueueSupported) {
833 zeroCopyVal_ = enable;
839 int val = enable ? 1 : 0;
840 int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
842 // if enable == false, set zeroCopyEnabled_ = false regardless
843 // if SO_ZEROCOPY is set or not
845 zeroCopyEnabled_ = enable;
849 /* if the setsockopt failed, try to see if the socket inherited the flag
850 * since we cannot set SO_ZEROCOPY on a socket s = accept
854 socklen_t optlen = sizeof(val);
855 ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
858 enable = val ? true : false;
863 zeroCopyEnabled_ = enable;
872 bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
873 return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
876 void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {
877 if (!zeroCopyEnabled_) {
878 flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
882 void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
883 uint32_t id = getNextZeroCopyBufId();
884 folly::IOBuf* ptr = buf.get();
886 idZeroCopyBufPtrMap_[id] = ptr;
887 auto& p = idZeroCopyBufInfoMap_[ptr];
889 CHECK(p.buf_.get() == nullptr);
890 p.buf_ = std::move(buf);
893 void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
894 uint32_t id = getNextZeroCopyBufId();
895 idZeroCopyBufPtrMap_[id] = ptr;
897 idZeroCopyBufInfoMap_[ptr].count_++;
900 void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
901 auto iter = idZeroCopyBufPtrMap_.find(id);
902 CHECK(iter != idZeroCopyBufPtrMap_.end());
903 auto ptr = iter->second;
904 auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
905 CHECK(iter1 != idZeroCopyBufInfoMap_.end());
906 if (0 == --iter1->second.count_) {
907 idZeroCopyBufInfoMap_.erase(iter1);
910 idZeroCopyBufPtrMap_.erase(iter);
913 void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
914 folly::IOBuf* ptr = buf.get();
915 auto& p = idZeroCopyBufInfoMap_[ptr];
916 CHECK(p.buf_.get() == nullptr);
918 p.buf_ = std::move(buf);
921 bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
922 return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
925 bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
926 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
927 if (zeroCopyEnabled_ &&
928 ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
929 (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
930 const struct sock_extended_err* serr =
931 reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
933 (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
939 void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
940 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
941 const struct sock_extended_err* serr =
942 reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
943 uint32_t hi = serr->ee_data;
944 uint32_t lo = serr->ee_info;
945 // disable zero copy if the buffer was actually copied
946 if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
947 VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
948 << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
950 zeroCopyEnabled_ = false;
953 for (uint32_t i = lo; i <= hi; i++) {
954 releaseZeroCopyBuf(i);
959 void AsyncSocket::write(WriteCallback* callback,
960 const void* buf, size_t bytes, WriteFlags flags) {
962 op.iov_base = const_cast<void*>(buf);
964 writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
967 void AsyncSocket::writev(WriteCallback* callback,
971 writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
974 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
976 adjustZeroCopyFlags(flags);
978 constexpr size_t kSmallSizeMax = 64;
979 size_t count = buf->countChainElements();
980 if (count <= kSmallSizeMax) {
981 // suppress "warning: variable length array 'vec' is used [-Wvla]"
983 FOLLY_GCC_DISABLE_WARNING("-Wvla")
984 iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
987 writeChainImpl(callback, vec, count, std::move(buf), flags);
989 iovec* vec = new iovec[count];
990 writeChainImpl(callback, vec, count, std::move(buf), flags);
995 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
996 size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
997 size_t veclen = buf->fillIov(vec, count);
998 writeImpl(callback, vec, veclen, std::move(buf), flags);
1001 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
1002 size_t count, unique_ptr<IOBuf>&& buf,
1004 VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
1005 << ", callback=" << callback << ", count=" << count
1006 << ", state=" << state_;
1007 DestructorGuard dg(this);
1008 unique_ptr<IOBuf>ioBuf(std::move(buf));
1009 eventBase_->dcheckIsInEventBaseThread();
1011 if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
1012 // No new writes may be performed after the write side of the socket has
1015 // We could just call callback->writeError() here to fail just this write.
1016 // However, fail hard and use invalidState() to fail all outstanding
1017 // callbacks and move the socket into the error state. There's most likely
1018 // a bug in the caller's code, so we abort everything rather than trying to
1019 // proceed as best we can.
1020 return invalidState(callback);
1023 uint32_t countWritten = 0;
1024 uint32_t partialWritten = 0;
1025 ssize_t bytesWritten = 0;
1026 bool mustRegister = false;
1027 if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
1029 if (writeReqHead_ == nullptr) {
1030 // If we are established and there are no other writes pending,
1031 // we can attempt to perform the write immediately.
1032 assert(writeReqTail_ == nullptr);
1033 assert((eventFlags_ & EventHandler::WRITE) == 0);
1035 auto writeResult = performWrite(
1036 vec, uint32_t(count), flags, &countWritten, &partialWritten);
1037 bytesWritten = writeResult.writeReturn;
1038 if (bytesWritten < 0) {
1039 auto errnoCopy = errno;
1040 if (writeResult.exception) {
1041 return failWrite(__func__, callback, 0, *writeResult.exception);
1043 AsyncSocketException ex(
1044 AsyncSocketException::INTERNAL_ERROR,
1045 withAddr("writev failed"),
1047 return failWrite(__func__, callback, 0, ex);
1048 } else if (countWritten == count) {
1049 // done, add the whole buffer
1050 if (countWritten && isZeroCopyRequest(flags)) {
1051 addZeroCopyBuf(std::move(ioBuf));
1053 // We successfully wrote everything.
1054 // Invoke the callback and return.
1056 callback->writeSuccess();
1059 } else { // continue writing the next writeReq
1061 if (bytesWritten && isZeroCopyRequest(flags)) {
1062 addZeroCopyBuf(ioBuf.get());
1064 if (bufferCallback_) {
1065 bufferCallback_->onEgressBuffered();
1068 if (!connecting()) {
1069 // Writes might put the socket back into connecting state
1070 // if TFO is enabled, and using TFO fails.
1071 // This means that write timeouts would not be active, however
1072 // connect timeouts would affect this stage.
1073 mustRegister = true;
1076 } else if (!connecting()) {
1077 // Invalid state for writing
1078 return invalidState(callback);
1081 // Create a new WriteRequest to add to the queue
1084 req = BytesWriteRequest::newRequest(
1088 uint32_t(count - countWritten),
1090 uint32_t(bytesWritten),
1093 } catch (const std::exception& ex) {
1094 // we mainly expect to catch std::bad_alloc here
1095 AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
1096 withAddr(string("failed to append new WriteRequest: ") + ex.what()));
1097 return failWrite(__func__, callback, size_t(bytesWritten), tex);
1100 if (writeReqTail_ == nullptr) {
1101 assert(writeReqHead_ == nullptr);
1102 writeReqHead_ = writeReqTail_ = req;
1104 writeReqTail_->append(req);
1105 writeReqTail_ = req;
1108 // Register for write events if are established and not currently
1109 // waiting on write events
1111 assert(state_ == StateEnum::ESTABLISHED);
1112 assert((eventFlags_ & EventHandler::WRITE) == 0);
1113 if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1114 assert(state_ == StateEnum::ERROR);
1117 if (sendTimeout_ > 0) {
1118 // Schedule a timeout to fire if the write takes too long.
1119 if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1120 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1121 withAddr("failed to schedule send timeout"));
1122 return failWrite(__func__, ex);
1128 void AsyncSocket::writeRequest(WriteRequest* req) {
1129 if (writeReqTail_ == nullptr) {
1130 assert(writeReqHead_ == nullptr);
1131 writeReqHead_ = writeReqTail_ = req;
1134 writeReqTail_->append(req);
1135 writeReqTail_ = req;
1139 void AsyncSocket::close() {
1140 VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
1141 << ", state=" << state_ << ", shutdownFlags="
1142 << std::hex << (int) shutdownFlags_;
1144 // close() is only different from closeNow() when there are pending writes
1145 // that need to drain before we can close. In all other cases, just call
1148 // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
1149 // STATE_ERROR if close() is invoked while a previous closeNow() or failure
1150 // is still running. (e.g., If there are multiple pending writes, and we
1151 // call writeError() on the first one, it may call close(). In this case we
1152 // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
1153 // writes will still be in the queue.)
1155 // We only need to drain pending writes if we are still in STATE_CONNECTING
1156 // or STATE_ESTABLISHED
1157 if ((writeReqHead_ == nullptr) ||
1158 !(state_ == StateEnum::CONNECTING ||
1159 state_ == StateEnum::ESTABLISHED)) {
1164 // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
1165 // destroyed until close() returns.
1166 DestructorGuard dg(this);
1167 eventBase_->dcheckIsInEventBaseThread();
1169 // Since there are write requests pending, we have to set the
1170 // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
1171 // connect finishes and we finish writing these requests.
1173 // Set SHUT_READ to indicate that reads are shut down, and set the
1174 // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
1175 // pending writes complete.
1176 shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
1178 // If a read callback is set, invoke readEOF() immediately to inform it that
1179 // the socket has been closed and no more data can be read.
1180 if (readCallback_) {
1181 // Disable reads if they are enabled
1182 if (!updateEventRegistration(0, EventHandler::READ)) {
1183 // We're now in the error state; callbacks have been cleaned up
1184 assert(state_ == StateEnum::ERROR);
1185 assert(readCallback_ == nullptr);
1187 ReadCallback* callback = readCallback_;
1188 readCallback_ = nullptr;
1189 callback->readEOF();
1194 void AsyncSocket::closeNow() {
1195 VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
1196 << ", state=" << state_ << ", shutdownFlags="
1197 << std::hex << (int) shutdownFlags_;
1198 DestructorGuard dg(this);
1200 eventBase_->dcheckIsInEventBaseThread();
1204 case StateEnum::ESTABLISHED:
1205 case StateEnum::CONNECTING:
1206 case StateEnum::FAST_OPEN: {
1207 shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1208 state_ = StateEnum::CLOSED;
1210 // If the write timeout was set, cancel it.
1211 writeTimeout_.cancelTimeout();
1213 // If we are registered for I/O events, unregister.
1214 if (eventFlags_ != EventHandler::NONE) {
1215 eventFlags_ = EventHandler::NONE;
1216 if (!updateEventRegistration()) {
1217 // We will have been moved into the error state.
1218 assert(state_ == StateEnum::ERROR);
1223 if (immediateReadHandler_.isLoopCallbackScheduled()) {
1224 immediateReadHandler_.cancelLoopCallback();
1228 ioHandler_.changeHandlerFD(-1);
1232 invokeConnectErr(socketClosedLocallyEx);
1234 failAllWrites(socketClosedLocallyEx);
1236 if (readCallback_) {
1237 ReadCallback* callback = readCallback_;
1238 readCallback_ = nullptr;
1239 callback->readEOF();
1243 case StateEnum::CLOSED:
1244 // Do nothing. It's possible that we are being called recursively
1245 // from inside a callback that we invoked inside another call to close()
1246 // that is still running.
1248 case StateEnum::ERROR:
1249 // Do nothing. The error handling code has performed (or is performing)
1252 case StateEnum::UNINIT:
1253 assert(eventFlags_ == EventHandler::NONE);
1254 assert(connectCallback_ == nullptr);
1255 assert(readCallback_ == nullptr);
1256 assert(writeReqHead_ == nullptr);
1257 shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1258 state_ = StateEnum::CLOSED;
1262 LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
1263 << ") called in unknown state " << state_;
1266 void AsyncSocket::closeWithReset() {
1267 // Enable SO_LINGER, with the linger timeout set to 0.
1268 // This will trigger a TCP reset when we close the socket.
1270 struct linger optLinger = {1, 0};
1271 if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1272 VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
1273 << "on " << fd_ << ": errno=" << errno;
1277 // Then let closeNow() take care of the rest
1281 void AsyncSocket::shutdownWrite() {
1282 VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
1283 << ", state=" << state_ << ", shutdownFlags="
1284 << std::hex << (int) shutdownFlags_;
1286 // If there are no pending writes, shutdownWrite() is identical to
1287 // shutdownWriteNow().
1288 if (writeReqHead_ == nullptr) {
1293 eventBase_->dcheckIsInEventBaseThread();
1295 // There are pending writes. Set SHUT_WRITE_PENDING so that the actual
1296 // shutdown will be performed once all writes complete.
1297 shutdownFlags_ |= SHUT_WRITE_PENDING;
1300 void AsyncSocket::shutdownWriteNow() {
1301 VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
1302 << ", fd=" << fd_ << ", state=" << state_
1303 << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
1305 if (shutdownFlags_ & SHUT_WRITE) {
1306 // Writes are already shutdown; nothing else to do.
1310 // If SHUT_READ is already set, just call closeNow() to completely
1311 // close the socket. This can happen if close() was called with writes
1312 // pending, and then shutdownWriteNow() is called before all pending writes
1314 if (shutdownFlags_ & SHUT_READ) {
1319 DestructorGuard dg(this);
1321 eventBase_->dcheckIsInEventBaseThread();
1324 switch (static_cast<StateEnum>(state_)) {
1325 case StateEnum::ESTABLISHED:
1327 shutdownFlags_ |= SHUT_WRITE;
1329 // If the write timeout was set, cancel it.
1330 writeTimeout_.cancelTimeout();
1332 // If we are registered for write events, unregister.
1333 if (!updateEventRegistration(0, EventHandler::WRITE)) {
1334 // We will have been moved into the error state.
1335 assert(state_ == StateEnum::ERROR);
1339 // Shutdown writes on the file descriptor
1340 shutdown(fd_, SHUT_WR);
1342 // Immediately fail all write requests
1343 failAllWrites(socketShutdownForWritesEx);
1346 case StateEnum::CONNECTING:
1348 // Set the SHUT_WRITE_PENDING flag.
1349 // When the connection completes, it will check this flag,
1350 // shutdown the write half of the socket, and then set SHUT_WRITE.
1351 shutdownFlags_ |= SHUT_WRITE_PENDING;
1353 // Immediately fail all write requests
1354 failAllWrites(socketShutdownForWritesEx);
1357 case StateEnum::UNINIT:
1358 // Callers normally shouldn't call shutdownWriteNow() before the socket
1359 // even starts connecting. Nonetheless, go ahead and set
1360 // SHUT_WRITE_PENDING. Once the socket eventually connects it will
1361 // immediately shut down the write side of the socket.
1362 shutdownFlags_ |= SHUT_WRITE_PENDING;
1364 case StateEnum::FAST_OPEN:
1365 // In fast open state we haven't call connected yet, and if we shutdown
1366 // the writes, we will never try to call connect, so shut everything down
1367 shutdownFlags_ |= SHUT_WRITE;
1368 // Immediately fail all write requests
1369 failAllWrites(socketShutdownForWritesEx);
1371 case StateEnum::CLOSED:
1372 case StateEnum::ERROR:
1373 // We should never get here. SHUT_WRITE should always be set
1374 // in STATE_CLOSED and STATE_ERROR.
1375 VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1376 << ", fd=" << fd_ << ") in unexpected state " << state_
1377 << " with SHUT_WRITE not set ("
1378 << std::hex << (int) shutdownFlags_ << ")";
1383 LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1384 << fd_ << ") called in unknown state " << state_;
1387 bool AsyncSocket::readable() const {
1391 struct pollfd fds[1];
1393 fds[0].events = POLLIN;
1395 int rc = poll(fds, 1, 0);
1399 bool AsyncSocket::writable() const {
1403 struct pollfd fds[1];
1405 fds[0].events = POLLOUT;
1407 int rc = poll(fds, 1, 0);
1411 bool AsyncSocket::isPending() const {
1412 return ioHandler_.isPending();
1415 bool AsyncSocket::hangup() const {
1417 // sanity check, no one should ask for hangup if we are not connected.
1421 #ifdef POLLRDHUP // Linux-only
1422 struct pollfd fds[1];
1424 fds[0].events = POLLRDHUP|POLLHUP;
1427 return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1433 bool AsyncSocket::good() const {
1435 (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1436 state_ == StateEnum::ESTABLISHED) &&
1437 (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1440 bool AsyncSocket::error() const {
1441 return (state_ == StateEnum::ERROR);
1444 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1445 VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1446 << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1447 << ", state=" << state_ << ", events="
1448 << std::hex << eventFlags_ << ")";
1449 assert(eventBase_ == nullptr);
1450 eventBase->dcheckIsInEventBaseThread();
1452 eventBase_ = eventBase;
1453 ioHandler_.attachEventBase(eventBase);
1455 updateEventRegistration();
1457 writeTimeout_.attachEventBase(eventBase);
1459 evbChangeCb_->evbAttached(this);
1463 void AsyncSocket::detachEventBase() {
1464 VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1465 << ", old evb=" << eventBase_ << ", state=" << state_
1466 << ", events=" << std::hex << eventFlags_ << ")";
1467 assert(eventBase_ != nullptr);
1468 eventBase_->dcheckIsInEventBaseThread();
1470 eventBase_ = nullptr;
1472 ioHandler_.unregisterHandler();
1474 ioHandler_.detachEventBase();
1475 writeTimeout_.detachEventBase();
1477 evbChangeCb_->evbDetached(this);
1481 bool AsyncSocket::isDetachable() const {
1482 DCHECK(eventBase_ != nullptr);
1483 eventBase_->dcheckIsInEventBaseThread();
1485 return !writeTimeout_.isScheduled();
1488 void AsyncSocket::cacheAddresses() {
1491 cacheLocalAddress();
1493 } catch (const std::system_error& e) {
1494 if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1495 VLOG(1) << "Error caching addresses: " << e.code().value() << ", "
1496 << e.code().message();
1502 void AsyncSocket::cacheLocalAddress() const {
1503 if (!localAddr_.isInitialized()) {
1504 localAddr_.setFromLocalAddress(fd_);
1508 void AsyncSocket::cachePeerAddress() const {
1509 if (!addr_.isInitialized()) {
1510 addr_.setFromPeerAddress(fd_);
1514 bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
1515 eventBase_->dcheckIsInEventBaseThread();
1516 return (!idZeroCopyBufPtrMap_.empty());
1519 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1520 cacheLocalAddress();
1521 *address = localAddr_;
1524 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1529 bool AsyncSocket::getTFOSucceded() const {
1530 return detail::tfo_succeeded(fd_);
1533 int AsyncSocket::setNoDelay(bool noDelay) {
1535 VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1536 << this << "(state=" << state_ << ")";
1541 int value = noDelay ? 1 : 0;
1542 if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1543 int errnoCopy = errno;
1544 VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1545 << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1546 << strerror(errnoCopy);
1553 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1555 #ifndef TCP_CONGESTION
1556 #define TCP_CONGESTION 13
1560 VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1561 << "socket " << this << "(state=" << state_ << ")";
1571 socklen_t(cname.length() + 1)) != 0) {
1572 int errnoCopy = errno;
1573 VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1574 << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1575 << strerror(errnoCopy);
1582 int AsyncSocket::setQuickAck(bool quickack) {
1585 VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1586 << this << "(state=" << state_ << ")";
1591 #ifdef TCP_QUICKACK // Linux-only
1592 int value = quickack ? 1 : 0;
1593 if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1594 int errnoCopy = errno;
1595 VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1596 << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1597 << strerror(errnoCopy);
1607 int AsyncSocket::setSendBufSize(size_t bufsize) {
1609 VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1610 << this << "(state=" << state_ << ")";
1614 if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1615 int errnoCopy = errno;
1616 VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1617 << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1618 << strerror(errnoCopy);
1625 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1627 VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1628 << this << "(state=" << state_ << ")";
1632 if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1633 int errnoCopy = errno;
1634 VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1635 << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1636 << strerror(errnoCopy);
1643 int AsyncSocket::setTCPProfile(int profd) {
1645 VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1646 << this << "(state=" << state_ << ")";
1650 if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1651 int errnoCopy = errno;
1652 VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1653 << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1654 << strerror(errnoCopy);
1661 void AsyncSocket::ioReady(uint16_t events) noexcept {
1662 VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
1663 << ", events=" << std::hex << events << ", state=" << state_;
1664 DestructorGuard dg(this);
1665 assert(events & EventHandler::READ_WRITE);
1666 eventBase_->dcheckIsInEventBaseThread();
1668 uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1669 EventBase* originalEventBase = eventBase_;
1670 // If we got there it means that either EventHandler::READ or
1671 // EventHandler::WRITE is set. Any of these flags can
1672 // indicate that there are messages available in the socket
1673 // error message queue.
1674 // Return if we handle any error messages - this is to avoid
1675 // unnecessary read/write calls
1676 if (handleErrMessages()) {
1680 // Return now if handleErrMessages() detached us from our EventBase
1681 if (eventBase_ != originalEventBase) {
1685 if (relevantEvents == EventHandler::READ) {
1687 } else if (relevantEvents == EventHandler::WRITE) {
1689 } else if (relevantEvents == EventHandler::READ_WRITE) {
1690 // If both read and write events are ready, process writes first.
1693 // Return now if handleWrite() detached us from our EventBase
1694 if (eventBase_ != originalEventBase) {
1698 // Only call handleRead() if a read callback is still installed.
1699 // (It's possible that the read callback was uninstalled during
1701 if (readCallback_) {
1705 VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1706 << std::hex << events << "(this=" << this << ")";
1711 AsyncSocket::ReadResult
1712 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1713 VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1714 << ", buflen=" << *buflen;
1716 if (preReceivedData_ && !preReceivedData_->empty()) {
1717 VLOG(5) << "AsyncSocket::performRead() this=" << this
1718 << ", reading pre-received data";
1720 io::Cursor cursor(preReceivedData_.get());
1721 auto len = cursor.pullAtMost(*buf, *buflen);
1724 queue.append(std::move(preReceivedData_));
1725 queue.trimStart(len);
1726 preReceivedData_ = queue.move();
1728 appBytesReceived_ += len;
1729 return ReadResult(len);
1732 ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1734 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1735 // No more data to read right now.
1736 return ReadResult(READ_BLOCKING);
1738 return ReadResult(READ_ERROR);
1741 appBytesReceived_ += bytes;
1742 return ReadResult(bytes);
1746 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1747 // no matter what, buffer should be preapared for non-ssl socket
1748 CHECK(readCallback_);
1749 readCallback_->getReadBuffer(buf, buflen);
1752 size_t AsyncSocket::handleErrMessages() noexcept {
1753 // This method has non-empty implementation only for platforms
1754 // supporting per-socket error queues.
1755 VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
1756 << ", state=" << state_;
1757 if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty()) {
1758 VLOG(7) << "AsyncSocket::handleErrMessages(): "
1759 << "no callback installed - exiting.";
1763 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
1769 entry.iov_base = &data;
1770 entry.iov_len = sizeof(data);
1771 msg.msg_iov = &entry;
1773 msg.msg_name = nullptr;
1774 msg.msg_namelen = 0;
1775 msg.msg_control = ctrl;
1776 msg.msg_controllen = sizeof(ctrl);
1782 ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
1783 VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1786 if (errno != EAGAIN) {
1787 auto errnoCopy = errno;
1788 LOG(ERROR) << "::recvmsg exited with code " << ret
1789 << ", errno: " << errnoCopy;
1790 AsyncSocketException ex(
1791 AsyncSocketException::INTERNAL_ERROR,
1792 withAddr("recvmsg() failed"),
1794 failErrMessageRead(__func__, ex);
1800 for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1801 cmsg != nullptr && cmsg->cmsg_len != 0;
1802 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1804 if (isZeroCopyMsg(*cmsg)) {
1805 processZeroCopyMsg(*cmsg);
1807 if (errMessageCallback_) {
1808 errMessageCallback_->errMessage(*cmsg);
1815 #endif // FOLLY_HAVE_MSG_ERRQUEUE
1818 bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
1819 eventBase_->dcheckIsInEventBaseThread();
1820 if (idZeroCopyBufPtrMap_.empty()) {
1824 handleErrMessages();
1826 return idZeroCopyBufPtrMap_.empty();
1829 void AsyncSocket::handleRead() noexcept {
1830 VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1831 << ", state=" << state_;
1832 assert(state_ == StateEnum::ESTABLISHED);
1833 assert((shutdownFlags_ & SHUT_READ) == 0);
1834 assert(readCallback_ != nullptr);
1835 assert(eventFlags_ & EventHandler::READ);
1838 // - a read attempt would block
1839 // - readCallback_ is uninstalled
1840 // - the number of loop iterations exceeds the optional maximum
1841 // - this AsyncSocket is moved to another EventBase
1843 // When we invoke readDataAvailable() it may uninstall the readCallback_,
1844 // which is why need to check for it here.
1846 // The last bullet point is slightly subtle. readDataAvailable() may also
1847 // detach this socket from this EventBase. However, before
1848 // readDataAvailable() returns another thread may pick it up, attach it to
1849 // a different EventBase, and install another readCallback_. We need to
1850 // exit immediately after readDataAvailable() returns if the eventBase_ has
1851 // changed. (The caller must perform some sort of locking to transfer the
1852 // AsyncSocket between threads properly. This will be sufficient to ensure
1853 // that this thread sees the updated eventBase_ variable after
1854 // readDataAvailable() returns.)
1855 uint16_t numReads = 0;
1856 EventBase* originalEventBase = eventBase_;
1857 while (readCallback_ && eventBase_ == originalEventBase) {
1858 // Get the buffer to read into.
1859 void* buf = nullptr;
1860 size_t buflen = 0, offset = 0;
1862 prepareReadBuffer(&buf, &buflen);
1863 VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1864 } catch (const AsyncSocketException& ex) {
1865 return failRead(__func__, ex);
1866 } catch (const std::exception& ex) {
1867 AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1868 string("ReadCallback::getReadBuffer() "
1869 "threw exception: ") +
1871 return failRead(__func__, tex);
1873 AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1874 "ReadCallback::getReadBuffer() threw "
1875 "non-exception type");
1876 return failRead(__func__, ex);
1878 if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1879 AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1880 "ReadCallback::getReadBuffer() returned "
1882 return failRead(__func__, ex);
1886 auto readResult = performRead(&buf, &buflen, &offset);
1887 auto bytesRead = readResult.readReturn;
1888 VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1889 << bytesRead << " bytes";
1890 if (bytesRead > 0) {
1891 if (!isBufferMovable_) {
1892 readCallback_->readDataAvailable(size_t(bytesRead));
1894 CHECK(kOpenSslModeMoveBufferOwnership);
1895 VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1896 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1897 << ", offset=" << offset;
1898 auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1899 readBuf->trimStart(offset);
1900 readBuf->trimEnd(buflen - offset - bytesRead);
1901 readCallback_->readBufferAvailable(std::move(readBuf));
1904 // Fall through and continue around the loop if the read
1905 // completely filled the available buffer.
1906 // Note that readCallback_ may have been uninstalled or changed inside
1907 // readDataAvailable().
1908 if (size_t(bytesRead) < buflen) {
1911 } else if (bytesRead == READ_BLOCKING) {
1912 // No more data to read right now.
1914 } else if (bytesRead == READ_ERROR) {
1915 readErr_ = READ_ERROR;
1916 if (readResult.exception) {
1917 return failRead(__func__, *readResult.exception);
1919 auto errnoCopy = errno;
1920 AsyncSocketException ex(
1921 AsyncSocketException::INTERNAL_ERROR,
1922 withAddr("recv() failed"),
1924 return failRead(__func__, ex);
1926 assert(bytesRead == READ_EOF);
1927 readErr_ = READ_EOF;
1929 shutdownFlags_ |= SHUT_READ;
1930 if (!updateEventRegistration(0, EventHandler::READ)) {
1931 // we've already been moved into STATE_ERROR
1932 assert(state_ == StateEnum::ERROR);
1933 assert(readCallback_ == nullptr);
1937 ReadCallback* callback = readCallback_;
1938 readCallback_ = nullptr;
1939 callback->readEOF();
1942 if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1943 if (readCallback_ != nullptr) {
1944 // We might still have data in the socket.
1945 // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1946 scheduleImmediateRead();
1954 * This function attempts to write as much data as possible, until no more data
1957 * - If it sends all available data, it unregisters for write events, and stops
1958 * the writeTimeout_.
1960 * - If not all of the data can be sent immediately, it reschedules
1961 * writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1962 * registered for write events.
1964 void AsyncSocket::handleWrite() noexcept {
1965 VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1966 << ", state=" << state_;
1967 DestructorGuard dg(this);
1969 if (state_ == StateEnum::CONNECTING) {
1975 assert(state_ == StateEnum::ESTABLISHED);
1976 assert((shutdownFlags_ & SHUT_WRITE) == 0);
1977 assert(writeReqHead_ != nullptr);
1979 // Loop until we run out of write requests,
1980 // or until this socket is moved to another EventBase.
1981 // (See the comment in handleRead() explaining how this can happen.)
1982 EventBase* originalEventBase = eventBase_;
1983 while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1984 auto writeResult = writeReqHead_->performWrite();
1985 if (writeResult.writeReturn < 0) {
1986 if (writeResult.exception) {
1987 return failWrite(__func__, *writeResult.exception);
1989 auto errnoCopy = errno;
1990 AsyncSocketException ex(
1991 AsyncSocketException::INTERNAL_ERROR,
1992 withAddr("writev() failed"),
1994 return failWrite(__func__, ex);
1995 } else if (writeReqHead_->isComplete()) {
1996 // We finished this request
1997 WriteRequest* req = writeReqHead_;
1998 writeReqHead_ = req->getNext();
2000 if (writeReqHead_ == nullptr) {
2001 writeReqTail_ = nullptr;
2002 // This is the last write request.
2003 // Unregister for write events and cancel the send timer
2004 // before we invoke the callback. We have to update the state properly
2005 // before calling the callback, since it may want to detach us from
2007 if (eventFlags_ & EventHandler::WRITE) {
2008 if (!updateEventRegistration(0, EventHandler::WRITE)) {
2009 assert(state_ == StateEnum::ERROR);
2012 // Stop the send timeout
2013 writeTimeout_.cancelTimeout();
2015 assert(!writeTimeout_.isScheduled());
2017 // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
2018 // we finish sending the last write request.
2020 // We have to do this before invoking writeSuccess(), since
2021 // writeSuccess() may detach us from our EventBase.
2022 if (shutdownFlags_ & SHUT_WRITE_PENDING) {
2023 assert(connectCallback_ == nullptr);
2024 shutdownFlags_ |= SHUT_WRITE;
2026 if (shutdownFlags_ & SHUT_READ) {
2027 // Reads have already been shutdown. Fully close the socket and
2028 // move to STATE_CLOSED.
2030 // Note: This code currently moves us to STATE_CLOSED even if
2031 // close() hasn't ever been called. This can occur if we have
2032 // received EOF from the peer and shutdownWrite() has been called
2033 // locally. Should we bother staying in STATE_ESTABLISHED in this
2034 // case, until close() is actually called? I can't think of a
2035 // reason why we would need to do so. No other operations besides
2036 // calling close() or destroying the socket can be performed at
2038 assert(readCallback_ == nullptr);
2039 state_ = StateEnum::CLOSED;
2041 ioHandler_.changeHandlerFD(-1);
2045 // Reads are still enabled, so we are only doing a half-shutdown
2046 shutdown(fd_, SHUT_WR);
2051 // Invoke the callback
2052 WriteCallback* callback = req->getCallback();
2055 callback->writeSuccess();
2057 // We'll continue around the loop, trying to write another request
2060 if (bufferCallback_) {
2061 bufferCallback_->onEgressBuffered();
2063 writeReqHead_->consume();
2064 // Stop after a partial write; it's highly likely that a subsequent write
2065 // attempt will just return EAGAIN.
2067 // Ensure that we are registered for write events.
2068 if ((eventFlags_ & EventHandler::WRITE) == 0) {
2069 if (!updateEventRegistration(EventHandler::WRITE, 0)) {
2070 assert(state_ == StateEnum::ERROR);
2075 // Reschedule the send timeout, since we have made some write progress.
2076 if (sendTimeout_ > 0) {
2077 if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
2078 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2079 withAddr("failed to reschedule write timeout"));
2080 return failWrite(__func__, ex);
2086 if (!writeReqHead_ && bufferCallback_) {
2087 bufferCallback_->onEgressBufferCleared();
2091 void AsyncSocket::checkForImmediateRead() noexcept {
2092 // We currently don't attempt to perform optimistic reads in AsyncSocket.
2093 // (However, note that some subclasses do override this method.)
2095 // Simply calling handleRead() here would be bad, as this would call
2096 // readCallback_->getReadBuffer(), forcing the callback to allocate a read
2097 // buffer even though no data may be available. This would waste lots of
2098 // memory, since the buffer will sit around unused until the socket actually
2099 // becomes readable.
2101 // Checking if the socket is readable now also seems like it would probably
2102 // be a pessimism. In most cases it probably wouldn't be readable, and we
2103 // would just waste an extra system call. Even if it is readable, waiting to
2104 // find out from libevent on the next event loop doesn't seem that bad.
2106 // The exception to this is if we have pre-received data. In that case there
2107 // is definitely data available immediately.
2108 if (preReceivedData_ && !preReceivedData_->empty()) {
2113 void AsyncSocket::handleInitialReadWrite() noexcept {
2114 // Our callers should already be holding a DestructorGuard, but grab
2115 // one here just to make sure, in case one of our calling code paths ever
2117 DestructorGuard dg(this);
2118 // If we have a readCallback_, make sure we enable read events. We
2119 // may already be registered for reads if connectSuccess() set
2120 // the read calback.
2121 if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
2122 assert(state_ == StateEnum::ESTABLISHED);
2123 assert((shutdownFlags_ & SHUT_READ) == 0);
2124 if (!updateEventRegistration(EventHandler::READ, 0)) {
2125 assert(state_ == StateEnum::ERROR);
2128 checkForImmediateRead();
2129 } else if (readCallback_ == nullptr) {
2130 // Unregister for read events.
2131 updateEventRegistration(0, EventHandler::READ);
2134 // If we have write requests pending, try to send them immediately.
2135 // Since we just finished accepting, there is a very good chance that we can
2136 // write without blocking.
2138 // However, we only process them if EventHandler::WRITE is not already set,
2139 // which means that we're already blocked on a write attempt. (This can
2140 // happen if connectSuccess() called write() before returning.)
2141 if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
2142 // Call handleWrite() to perform write processing.
2144 } else if (writeReqHead_ == nullptr) {
2145 // Unregister for write event.
2146 updateEventRegistration(0, EventHandler::WRITE);
2150 void AsyncSocket::handleConnect() noexcept {
2151 VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
2152 << ", state=" << state_;
2153 assert(state_ == StateEnum::CONNECTING);
2154 // SHUT_WRITE can never be set while we are still connecting;
2155 // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
2157 assert((shutdownFlags_ & SHUT_WRITE) == 0);
2159 // In case we had a connect timeout, cancel the timeout
2160 writeTimeout_.cancelTimeout();
2161 // We don't use a persistent registration when waiting on a connect event,
2162 // so we have been automatically unregistered now. Update eventFlags_ to
2164 assert(eventFlags_ == EventHandler::WRITE);
2165 eventFlags_ = EventHandler::NONE;
2167 // Call getsockopt() to check if the connect succeeded
2169 socklen_t len = sizeof(error);
2170 int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
2172 auto errnoCopy = errno;
2173 AsyncSocketException ex(
2174 AsyncSocketException::INTERNAL_ERROR,
2175 withAddr("error calling getsockopt() after connect"),
2177 VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2178 << fd_ << " host=" << addr_.describe()
2179 << ") exception:" << ex.what();
2180 return failConnect(__func__, ex);
2184 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2185 "connect failed", error);
2186 VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2187 << fd_ << " host=" << addr_.describe()
2188 << ") exception: " << ex.what();
2189 return failConnect(__func__, ex);
2192 // Move into STATE_ESTABLISHED
2193 state_ = StateEnum::ESTABLISHED;
2195 // If SHUT_WRITE_PENDING is set and we don't have any write requests to
2196 // perform, immediately shutdown the write half of the socket.
2197 if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
2198 // SHUT_READ shouldn't be set. If close() is called on the socket while we
2199 // are still connecting we just abort the connect rather than waiting for
2201 assert((shutdownFlags_ & SHUT_READ) == 0);
2202 shutdown(fd_, SHUT_WR);
2203 shutdownFlags_ |= SHUT_WRITE;
2206 VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
2207 << "successfully connected; state=" << state_;
2209 // Remember the EventBase we are attached to, before we start invoking any
2210 // callbacks (since the callbacks may call detachEventBase()).
2211 EventBase* originalEventBase = eventBase_;
2213 invokeConnectSuccess();
2214 // Note that the connect callback may have changed our state.
2215 // (set or unset the read callback, called write(), closed the socket, etc.)
2216 // The following code needs to handle these situations correctly.
2218 // If the socket has been closed, readCallback_ and writeReqHead_ will
2219 // always be nullptr, so that will prevent us from trying to read or write.
2221 // The main thing to check for is if eventBase_ is still originalEventBase.
2222 // If not, we have been detached from this event base, so we shouldn't
2223 // perform any more operations.
2224 if (eventBase_ != originalEventBase) {
2228 handleInitialReadWrite();
2231 void AsyncSocket::timeoutExpired() noexcept {
2232 VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
2233 << "state=" << state_ << ", events=" << std::hex << eventFlags_;
2234 DestructorGuard dg(this);
2235 eventBase_->dcheckIsInEventBaseThread();
2237 if (state_ == StateEnum::CONNECTING) {
2238 // connect() timed out
2239 // Unregister for I/O events.
2240 if (connectCallback_) {
2241 AsyncSocketException ex(
2242 AsyncSocketException::TIMED_OUT,
2244 "connect timed out after {}ms", connectTimeout_.count()));
2245 failConnect(__func__, ex);
2247 // we faced a connect error without a connect callback, which could
2248 // happen due to TFO.
2249 AsyncSocketException ex(
2250 AsyncSocketException::TIMED_OUT, "write timed out during connection");
2251 failWrite(__func__, ex);
2254 // a normal write operation timed out
2255 AsyncSocketException ex(
2256 AsyncSocketException::TIMED_OUT,
2257 folly::sformat("write timed out after {}ms", sendTimeout_));
2258 failWrite(__func__, ex);
2262 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
2263 return detail::tfo_sendmsg(fd, msg, msg_flags);
2266 AsyncSocket::WriteResult
2267 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
2268 ssize_t totalWritten = 0;
2269 if (state_ == StateEnum::FAST_OPEN) {
2270 sockaddr_storage addr;
2271 auto len = addr_.getAddress(&addr);
2272 msg->msg_name = &addr;
2273 msg->msg_namelen = len;
2274 totalWritten = tfoSendMsg(fd_, msg, msg_flags);
2275 if (totalWritten >= 0) {
2276 tfoFinished_ = true;
2277 state_ = StateEnum::ESTABLISHED;
2278 // We schedule this asynchrously so that we don't end up
2279 // invoking initial read or write while a write is in progress.
2280 scheduleInitialReadWrite();
2281 } else if (errno == EINPROGRESS) {
2282 VLOG(4) << "TFO falling back to connecting";
2283 // A normal sendmsg doesn't return EINPROGRESS, however
2284 // TFO might fallback to connecting if there is no
2286 state_ = StateEnum::CONNECTING;
2288 scheduleConnectTimeout();
2289 registerForConnectEvents();
2290 } catch (const AsyncSocketException& ex) {
2292 WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2294 // Let's fake it that no bytes were written and return an errno.
2297 } else if (errno == EOPNOTSUPP) {
2298 // Try falling back to connecting.
2299 VLOG(4) << "TFO not supported";
2300 state_ = StateEnum::CONNECTING;
2302 int ret = socketConnect((const sockaddr*)&addr, len);
2304 // connect succeeded immediately
2305 // Treat this like no data was written.
2306 state_ = StateEnum::ESTABLISHED;
2307 scheduleInitialReadWrite();
2309 // If there was no exception during connections,
2310 // we would return that no bytes were written.
2313 } catch (const AsyncSocketException& ex) {
2315 WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2317 } else if (errno == EAGAIN) {
2318 // Normally sendmsg would indicate that the write would block.
2319 // However in the fast open case, it would indicate that sendmsg
2320 // fell back to a connect. This is a return code from connect()
2321 // instead, and is an error condition indicating no fds available.
2324 std::make_unique<AsyncSocketException>(
2325 AsyncSocketException::UNKNOWN, "No more free local ports"));
2328 totalWritten = ::sendmsg(fd, msg, msg_flags);
2330 return WriteResult(totalWritten);
2333 AsyncSocket::WriteResult AsyncSocket::performWrite(
2337 uint32_t* countWritten,
2338 uint32_t* partialWritten) {
2339 // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
2340 // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
2341 // (since it may terminate the program if the main program doesn't explicitly
2344 msg.msg_name = nullptr;
2345 msg.msg_namelen = 0;
2346 msg.msg_iov = const_cast<iovec *>(vec);
2347 msg.msg_iovlen = std::min<size_t>(count, kIovMax);
2349 msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
2350 CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
2351 msg.msg_controllen);
2353 if (msg.msg_controllen != 0) {
2354 msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
2355 sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
2357 msg.msg_control = nullptr;
2359 int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
2361 auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2362 auto totalWritten = writeResult.writeReturn;
2363 if (totalWritten < 0) {
2364 bool tryAgain = (errno == EAGAIN);
2366 // Apple has a bug where doing a second write on a socket which we
2367 // have opened with TFO causes an ENOTCONN to be thrown. However the
2368 // socket is really connected, so treat ENOTCONN as a EAGAIN until
2369 // this bug is fixed.
2370 tryAgain |= (errno == ENOTCONN);
2373 // workaround for running with zerocopy enabled but without a proper
2374 // memlock value - see ulimit -l
2375 if (zeroCopyEnabled_ && (errno == ENOBUFS)) {
2377 zeroCopyEnabled_ = false;
2380 if (!writeResult.exception && tryAgain) {
2381 // TCP buffer is full; we can't write any more data right now.
2383 *partialWritten = 0;
2384 return WriteResult(0);
2388 *partialWritten = 0;
2392 appBytesWritten_ += totalWritten;
2394 uint32_t bytesWritten;
2396 for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
2397 const iovec* v = vec + n;
2398 if (v->iov_len > bytesWritten) {
2399 // Partial write finished in the middle of this iovec
2401 *partialWritten = bytesWritten;
2402 return WriteResult(totalWritten);
2405 bytesWritten -= uint32_t(v->iov_len);
2408 assert(bytesWritten == 0);
2410 *partialWritten = 0;
2411 return WriteResult(totalWritten);
2415 * Re-register the EventHandler after eventFlags_ has changed.
2417 * If an error occurs, fail() is called to move the socket into the error state
2418 * and call all currently installed callbacks. After an error, the
2419 * AsyncSocket is completely unregistered.
2421 * @return Returns true on success, or false on error.
2423 bool AsyncSocket::updateEventRegistration() {
2424 VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
2425 << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
2426 << ", events=" << std::hex << eventFlags_;
2427 eventBase_->dcheckIsInEventBaseThread();
2428 if (eventFlags_ == EventHandler::NONE) {
2429 ioHandler_.unregisterHandler();
2433 // Always register for persistent events, so we don't have to re-register
2434 // after being called back.
2435 if (!ioHandler_.registerHandler(
2436 uint16_t(eventFlags_ | EventHandler::PERSIST))) {
2437 eventFlags_ = EventHandler::NONE; // we're not registered after error
2438 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2439 withAddr("failed to update AsyncSocket event registration"));
2440 fail("updateEventRegistration", ex);
2447 bool AsyncSocket::updateEventRegistration(uint16_t enable,
2449 uint16_t oldFlags = eventFlags_;
2450 eventFlags_ |= enable;
2451 eventFlags_ &= ~disable;
2452 if (eventFlags_ == oldFlags) {
2455 return updateEventRegistration();
2459 void AsyncSocket::startFail() {
2460 // startFail() should only be called once
2461 assert(state_ != StateEnum::ERROR);
2462 assert(getDestructorGuardCount() > 0);
2463 state_ = StateEnum::ERROR;
2464 // Ensure that SHUT_READ and SHUT_WRITE are set,
2465 // so all future attempts to read or write will be rejected
2466 shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
2468 if (eventFlags_ != EventHandler::NONE) {
2469 eventFlags_ = EventHandler::NONE;
2470 ioHandler_.unregisterHandler();
2472 writeTimeout_.cancelTimeout();
2475 ioHandler_.changeHandlerFD(-1);
2480 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
2481 invokeConnectErr(ex);
2484 if (readCallback_) {
2485 ReadCallback* callback = readCallback_;
2486 readCallback_ = nullptr;
2487 callback->readErr(ex);
2491 void AsyncSocket::finishFail() {
2492 assert(state_ == StateEnum::ERROR);
2493 assert(getDestructorGuardCount() > 0);
2495 AsyncSocketException ex(
2496 AsyncSocketException::INTERNAL_ERROR,
2497 withAddr("socket closing after error"));
2498 invokeAllErrors(ex);
2501 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2502 assert(state_ == StateEnum::ERROR);
2503 assert(getDestructorGuardCount() > 0);
2504 invokeAllErrors(ex);
2507 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2508 VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2509 << state_ << " host=" << addr_.describe()
2510 << "): failed in " << fn << "(): "
2516 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2517 VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2518 << state_ << " host=" << addr_.describe()
2519 << "): failed while connecting in " << fn << "(): "
2523 invokeConnectErr(ex);
2527 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2528 VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2529 << state_ << " host=" << addr_.describe()
2530 << "): failed while reading in " << fn << "(): "
2534 if (readCallback_ != nullptr) {
2535 ReadCallback* callback = readCallback_;
2536 readCallback_ = nullptr;
2537 callback->readErr(ex);
2543 void AsyncSocket::failErrMessageRead(const char* fn,
2544 const AsyncSocketException& ex) {
2545 VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2546 << state_ << " host=" << addr_.describe()
2547 << "): failed while reading message in " << fn << "(): "
2551 if (errMessageCallback_ != nullptr) {
2552 ErrMessageCallback* callback = errMessageCallback_;
2553 errMessageCallback_ = nullptr;
2554 callback->errMessageError(ex);
2560 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2561 VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2562 << state_ << " host=" << addr_.describe()
2563 << "): failed while writing in " << fn << "(): "
2567 // Only invoke the first write callback, since the error occurred while
2568 // writing this request. Let any other pending write callbacks be invoked in
2570 if (writeReqHead_ != nullptr) {
2571 WriteRequest* req = writeReqHead_;
2572 writeReqHead_ = req->getNext();
2573 WriteCallback* callback = req->getCallback();
2574 uint32_t bytesWritten = req->getTotalBytesWritten();
2577 callback->writeErr(bytesWritten, ex);
2584 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2585 size_t bytesWritten,
2586 const AsyncSocketException& ex) {
2587 // This version of failWrite() is used when the failure occurs before
2588 // we've added the callback to writeReqHead_.
2589 VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2590 << state_ << " host=" << addr_.describe()
2591 <<"): failed while writing in " << fn << "(): "
2595 if (callback != nullptr) {
2596 callback->writeErr(bytesWritten, ex);
2602 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2603 // Invoke writeError() on all write callbacks.
2604 // This is used when writes are forcibly shutdown with write requests
2605 // pending, or when an error occurs with writes pending.
2606 while (writeReqHead_ != nullptr) {
2607 WriteRequest* req = writeReqHead_;
2608 writeReqHead_ = req->getNext();
2609 WriteCallback* callback = req->getCallback();
2611 callback->writeErr(req->getTotalBytesWritten(), ex);
2617 void AsyncSocket::invalidState(ConnectCallback* callback) {
2618 VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2619 << "): connect() called in invalid state " << state_;
2622 * The invalidState() methods don't use the normal failure mechanisms,
2623 * since we don't know what state we are in. We don't want to call
2624 * startFail()/finishFail() recursively if we are already in the middle of
2628 AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2629 "connect() called with socket in invalid state");
2630 connectEndTime_ = std::chrono::steady_clock::now();
2631 if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2633 callback->connectErr(ex);
2636 // We can't use failConnect() here since connectCallback_
2637 // may already be set to another callback. Invoke this ConnectCallback
2638 // here; any other connectCallback_ will be invoked in finishFail()
2641 callback->connectErr(ex);
2647 void AsyncSocket::invalidState(ErrMessageCallback* callback) {
2648 VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2649 << "): setErrMessageCB(" << callback
2650 << ") called in invalid state " << state_;
2652 AsyncSocketException ex(
2653 AsyncSocketException::NOT_OPEN,
2654 msgErrQueueSupported
2655 ? "setErrMessageCB() called with socket in invalid state"
2656 : "This platform does not support socket error message notifications");
2657 if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2659 callback->errMessageError(ex);
2664 callback->errMessageError(ex);
2670 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2671 connectEndTime_ = std::chrono::steady_clock::now();
2672 if (connectCallback_) {
2673 ConnectCallback* callback = connectCallback_;
2674 connectCallback_ = nullptr;
2675 callback->connectErr(ex);
2679 void AsyncSocket::invokeConnectSuccess() {
2680 connectEndTime_ = std::chrono::steady_clock::now();
2681 if (connectCallback_) {
2682 ConnectCallback* callback = connectCallback_;
2683 connectCallback_ = nullptr;
2684 callback->connectSuccess();
2688 void AsyncSocket::invalidState(ReadCallback* callback) {
2689 VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2690 << "): setReadCallback(" << callback
2691 << ") called in invalid state " << state_;
2693 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2694 "setReadCallback() called with socket in "
2696 if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2698 callback->readErr(ex);
2703 callback->readErr(ex);
2709 void AsyncSocket::invalidState(WriteCallback* callback) {
2710 VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2711 << "): write() called in invalid state " << state_;
2713 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2714 withAddr("write() called with socket in invalid state"));
2715 if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2717 callback->writeErr(0, ex);
2722 callback->writeErr(0, ex);
2728 void AsyncSocket::doClose() {
2732 if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
2733 shutdownSocketSet->close(fd_);
2740 std::ostream& operator << (std::ostream& os,
2741 const AsyncSocket::StateEnum& state) {
2742 os << static_cast<int>(state);
2746 std::string AsyncSocket::withAddr(const std::string& s) {
2747 // Don't use addr_ directly because it may not be initialized
2748 // e.g. if constructed from fd
2749 folly::SocketAddress peer, local;
2751 getPeerAddress(&peer);
2752 getLocalAddress(&local);
2753 } catch (const std::exception&) {
2758 return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2761 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2762 bufferCallback_ = cb;
2765 } // namespace folly