Move address caching logic from AsyncSSLSocket to AsyncSocket.
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/Portability.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/Cursor.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25 #include <folly/portability/Fcntl.h>
26 #include <folly/portability/Sockets.h>
27 #include <folly/portability/SysUio.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <boost/preprocessor/control/if.hpp>
31 #include <errno.h>
32 #include <limits.h>
33 #include <sys/types.h>
34 #include <thread>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace fsp = folly::portability::sockets;
40
41 namespace folly {
42
43 static constexpr bool msgErrQueueSupported =
44 #ifdef MSG_ERRQUEUE
45     true;
46 #else
47     false;
48 #endif // MSG_ERRQUEUE
49
50 // static members initializers
51 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
52
53 const AsyncSocketException socketClosedLocallyEx(
54     AsyncSocketException::END_OF_FILE, "socket closed locally");
55 const AsyncSocketException socketShutdownForWritesEx(
56     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
57
58 // TODO: It might help performance to provide a version of BytesWriteRequest that
59 // users could derive from, so we can avoid the extra allocation for each call
60 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
61 // protocols are currently templatized for transports.
62 //
63 // We would need the version for external users where they provide the iovec
64 // storage space, and only our internal version would allocate it at the end of
65 // the WriteRequest.
66
67 /* The default WriteRequest implementation, used for write(), writev() and
68  * writeChain()
69  *
70  * A new BytesWriteRequest operation is allocated on the heap for all write
71  * operations that cannot be completed immediately.
72  */
73 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
74  public:
75   static BytesWriteRequest* newRequest(AsyncSocket* socket,
76                                        WriteCallback* callback,
77                                        const iovec* ops,
78                                        uint32_t opCount,
79                                        uint32_t partialWritten,
80                                        uint32_t bytesWritten,
81                                        unique_ptr<IOBuf>&& ioBuf,
82                                        WriteFlags flags) {
83     assert(opCount > 0);
84     // Since we put a variable size iovec array at the end
85     // of each BytesWriteRequest, we have to manually allocate the memory.
86     void* buf = malloc(sizeof(BytesWriteRequest) +
87                        (opCount * sizeof(struct iovec)));
88     if (buf == nullptr) {
89       throw std::bad_alloc();
90     }
91
92     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
93                                       partialWritten, bytesWritten,
94                                       std::move(ioBuf), flags);
95   }
96
97   void destroy() override {
98     this->~BytesWriteRequest();
99     free(this);
100   }
101
102   WriteResult performWrite() override {
103     WriteFlags writeFlags = flags_;
104     if (getNext() != nullptr) {
105       writeFlags |= WriteFlags::CORK;
106     }
107     auto writeResult = socket_->performWrite(
108         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
109     bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
110     return writeResult;
111   }
112
113   bool isComplete() override {
114     return opsWritten_ == getOpCount();
115   }
116
117   void consume() override {
118     // Advance opIndex_ forward by opsWritten_
119     opIndex_ += opsWritten_;
120     assert(opIndex_ < opCount_);
121
122     // If we've finished writing any IOBufs, release them
123     if (ioBuf_) {
124       for (uint32_t i = opsWritten_; i != 0; --i) {
125         assert(ioBuf_);
126         ioBuf_ = ioBuf_->pop();
127       }
128     }
129
130     // Move partialBytes_ forward into the current iovec buffer
131     struct iovec* currentOp = writeOps_ + opIndex_;
132     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
133     currentOp->iov_base =
134       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
135     currentOp->iov_len -= partialBytes_;
136
137     // Increment the totalBytesWritten_ count by bytesWritten_;
138     assert(bytesWritten_ >= 0);
139     totalBytesWritten_ += uint32_t(bytesWritten_);
140   }
141
142  private:
143   BytesWriteRequest(AsyncSocket* socket,
144                     WriteCallback* callback,
145                     const struct iovec* ops,
146                     uint32_t opCount,
147                     uint32_t partialBytes,
148                     uint32_t bytesWritten,
149                     unique_ptr<IOBuf>&& ioBuf,
150                     WriteFlags flags)
151     : AsyncSocket::WriteRequest(socket, callback)
152     , opCount_(opCount)
153     , opIndex_(0)
154     , flags_(flags)
155     , ioBuf_(std::move(ioBuf))
156     , opsWritten_(0)
157     , partialBytes_(partialBytes)
158     , bytesWritten_(bytesWritten) {
159     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
160   }
161
162   // private destructor, to ensure callers use destroy()
163   ~BytesWriteRequest() override = default;
164
165   const struct iovec* getOps() const {
166     assert(opCount_ > opIndex_);
167     return writeOps_ + opIndex_;
168   }
169
170   uint32_t getOpCount() const {
171     assert(opCount_ > opIndex_);
172     return opCount_ - opIndex_;
173   }
174
175   uint32_t opCount_;            ///< number of entries in writeOps_
176   uint32_t opIndex_;            ///< current index into writeOps_
177   WriteFlags flags_;            ///< set for WriteFlags
178   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
179
180   // for consume(), how much we wrote on the last write
181   uint32_t opsWritten_;         ///< complete ops written
182   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
183   ssize_t bytesWritten_;        ///< bytes written altogether
184
185   struct iovec writeOps_[];     ///< write operation(s) list
186 };
187
188 int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags)
189                                                                       noexcept {
190   int msg_flags = MSG_DONTWAIT;
191
192 #ifdef MSG_NOSIGNAL // Linux-only
193   msg_flags |= MSG_NOSIGNAL;
194 #ifdef MSG_MORE
195   if (isSet(flags, WriteFlags::CORK)) {
196     // MSG_MORE tells the kernel we have more data to send, so wait for us to
197     // give it the rest of the data rather than immediately sending a partial
198     // frame, even when TCP_NODELAY is enabled.
199     msg_flags |= MSG_MORE;
200   }
201 #endif // MSG_MORE
202 #endif // MSG_NOSIGNAL
203   if (isSet(flags, WriteFlags::EOR)) {
204     // marks that this is the last byte of a record (response)
205     msg_flags |= MSG_EOR;
206   }
207
208   return msg_flags;
209 }
210
211 namespace {
212 static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
213 }
214
215 AsyncSocket::AsyncSocket()
216     : eventBase_(nullptr),
217       writeTimeout_(this, nullptr),
218       ioHandler_(this, nullptr),
219       immediateReadHandler_(this) {
220   VLOG(5) << "new AsyncSocket()";
221   init();
222 }
223
224 AsyncSocket::AsyncSocket(EventBase* evb)
225     : eventBase_(evb),
226       writeTimeout_(this, evb),
227       ioHandler_(this, evb),
228       immediateReadHandler_(this) {
229   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
230   init();
231 }
232
233 AsyncSocket::AsyncSocket(EventBase* evb,
234                            const folly::SocketAddress& address,
235                            uint32_t connectTimeout)
236   : AsyncSocket(evb) {
237   connect(nullptr, address, connectTimeout);
238 }
239
240 AsyncSocket::AsyncSocket(EventBase* evb,
241                            const std::string& ip,
242                            uint16_t port,
243                            uint32_t connectTimeout)
244   : AsyncSocket(evb) {
245   connect(nullptr, ip, port, connectTimeout);
246 }
247
248 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
249     : eventBase_(evb),
250       writeTimeout_(this, evb),
251       ioHandler_(this, evb, fd),
252       immediateReadHandler_(this) {
253   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
254           << fd << ")";
255   init();
256   fd_ = fd;
257   setCloseOnExec();
258   state_ = StateEnum::ESTABLISHED;
259 }
260
261 AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
262     : AsyncSocket(oldAsyncSocket->getEventBase(), oldAsyncSocket->detachFd()) {
263   preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
264 }
265
266 // init() method, since constructor forwarding isn't supported in most
267 // compilers yet.
268 void AsyncSocket::init() {
269   if (eventBase_) {
270     eventBase_->dcheckIsInEventBaseThread();
271   }
272   shutdownFlags_ = 0;
273   state_ = StateEnum::UNINIT;
274   eventFlags_ = EventHandler::NONE;
275   fd_ = -1;
276   sendTimeout_ = 0;
277   maxReadsPerEvent_ = 16;
278   connectCallback_ = nullptr;
279   errMessageCallback_ = nullptr;
280   readCallback_ = nullptr;
281   writeReqHead_ = nullptr;
282   writeReqTail_ = nullptr;
283   shutdownSocketSet_ = nullptr;
284   appBytesWritten_ = 0;
285   appBytesReceived_ = 0;
286   sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
287 }
288
289 AsyncSocket::~AsyncSocket() {
290   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
291           << ", evb=" << eventBase_ << ", fd=" << fd_
292           << ", state=" << state_ << ")";
293 }
294
295 void AsyncSocket::destroy() {
296   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
297           << ", fd=" << fd_ << ", state=" << state_;
298   // When destroy is called, close the socket immediately
299   closeNow();
300
301   // Then call DelayedDestruction::destroy() to take care of
302   // whether or not we need immediate or delayed destruction
303   DelayedDestruction::destroy();
304 }
305
306 int AsyncSocket::detachFd() {
307   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
308           << ", evb=" << eventBase_ << ", state=" << state_
309           << ", events=" << std::hex << eventFlags_ << ")";
310   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
311   // actually close the descriptor.
312   if (shutdownSocketSet_) {
313     shutdownSocketSet_->remove(fd_);
314   }
315   int fd = fd_;
316   fd_ = -1;
317   // Call closeNow() to invoke all pending callbacks with an error.
318   closeNow();
319   // Update the EventHandler to stop using this fd.
320   // This can only be done after closeNow() unregisters the handler.
321   ioHandler_.changeHandlerFD(-1);
322   return fd;
323 }
324
325 const folly::SocketAddress& AsyncSocket::anyAddress() {
326   static const folly::SocketAddress anyAddress =
327     folly::SocketAddress("0.0.0.0", 0);
328   return anyAddress;
329 }
330
331 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
332   if (shutdownSocketSet_ == newSS) {
333     return;
334   }
335   if (shutdownSocketSet_ && fd_ != -1) {
336     shutdownSocketSet_->remove(fd_);
337   }
338   shutdownSocketSet_ = newSS;
339   if (shutdownSocketSet_ && fd_ != -1) {
340     shutdownSocketSet_->add(fd_);
341   }
342 }
343
344 void AsyncSocket::setCloseOnExec() {
345   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
346   if (rv != 0) {
347     auto errnoCopy = errno;
348     throw AsyncSocketException(
349         AsyncSocketException::INTERNAL_ERROR,
350         withAddr("failed to set close-on-exec flag"),
351         errnoCopy);
352   }
353 }
354
355 void AsyncSocket::connect(ConnectCallback* callback,
356                            const folly::SocketAddress& address,
357                            int timeout,
358                            const OptionMap &options,
359                            const folly::SocketAddress& bindAddr) noexcept {
360   DestructorGuard dg(this);
361   eventBase_->dcheckIsInEventBaseThread();
362
363   addr_ = address;
364
365   // Make sure we're in the uninitialized state
366   if (state_ != StateEnum::UNINIT) {
367     return invalidState(callback);
368   }
369
370   connectTimeout_ = std::chrono::milliseconds(timeout);
371   connectStartTime_ = std::chrono::steady_clock::now();
372   // Make connect end time at least >= connectStartTime.
373   connectEndTime_ = connectStartTime_;
374
375   assert(fd_ == -1);
376   state_ = StateEnum::CONNECTING;
377   connectCallback_ = callback;
378
379   sockaddr_storage addrStorage;
380   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
381
382   try {
383     // Create the socket
384     // Technically the first parameter should actually be a protocol family
385     // constant (PF_xxx) rather than an address family (AF_xxx), but the
386     // distinction is mainly just historical.  In pretty much all
387     // implementations the PF_foo and AF_foo constants are identical.
388     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
389     if (fd_ < 0) {
390       auto errnoCopy = errno;
391       throw AsyncSocketException(
392           AsyncSocketException::INTERNAL_ERROR,
393           withAddr("failed to create socket"),
394           errnoCopy);
395     }
396     if (shutdownSocketSet_) {
397       shutdownSocketSet_->add(fd_);
398     }
399     ioHandler_.changeHandlerFD(fd_);
400
401     setCloseOnExec();
402
403     // Put the socket in non-blocking mode
404     int flags = fcntl(fd_, F_GETFL, 0);
405     if (flags == -1) {
406       auto errnoCopy = errno;
407       throw AsyncSocketException(
408           AsyncSocketException::INTERNAL_ERROR,
409           withAddr("failed to get socket flags"),
410           errnoCopy);
411     }
412     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
413     if (rv == -1) {
414       auto errnoCopy = errno;
415       throw AsyncSocketException(
416           AsyncSocketException::INTERNAL_ERROR,
417           withAddr("failed to put socket in non-blocking mode"),
418           errnoCopy);
419     }
420
421 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
422     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
423     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
424     if (rv == -1) {
425       auto errnoCopy = errno;
426       throw AsyncSocketException(
427           AsyncSocketException::INTERNAL_ERROR,
428           "failed to enable F_SETNOSIGPIPE on socket",
429           errnoCopy);
430     }
431 #endif
432
433     // By default, turn on TCP_NODELAY
434     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
435     // setNoDelay() will log an error message if it fails.
436     if (address.getFamily() != AF_UNIX) {
437       (void)setNoDelay(true);
438     }
439
440     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
441             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
442
443     // bind the socket
444     if (bindAddr != anyAddress()) {
445       int one = 1;
446       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
447         auto errnoCopy = errno;
448         doClose();
449         throw AsyncSocketException(
450             AsyncSocketException::NOT_OPEN,
451             "failed to setsockopt prior to bind on " + bindAddr.describe(),
452             errnoCopy);
453       }
454
455       bindAddr.getAddress(&addrStorage);
456
457       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
458         auto errnoCopy = errno;
459         doClose();
460         throw AsyncSocketException(
461             AsyncSocketException::NOT_OPEN,
462             "failed to bind to async socket: " + bindAddr.describe(),
463             errnoCopy);
464       }
465     }
466
467     // Apply the additional options if any.
468     for (const auto& opt: options) {
469       rv = opt.first.apply(fd_, opt.second);
470       if (rv != 0) {
471         auto errnoCopy = errno;
472         throw AsyncSocketException(
473             AsyncSocketException::INTERNAL_ERROR,
474             withAddr("failed to set socket option"),
475             errnoCopy);
476       }
477     }
478
479     // Perform the connect()
480     address.getAddress(&addrStorage);
481
482     if (tfoEnabled_) {
483       state_ = StateEnum::FAST_OPEN;
484       tfoAttempted_ = true;
485     } else {
486       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
487         return;
488       }
489     }
490
491     // If we're still here the connect() succeeded immediately.
492     // Fall through to call the callback outside of this try...catch block
493   } catch (const AsyncSocketException& ex) {
494     return failConnect(__func__, ex);
495   } catch (const std::exception& ex) {
496     // shouldn't happen, but handle it just in case
497     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
498                << "): unexpected " << typeid(ex).name() << " exception: "
499                << ex.what();
500     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
501                             withAddr(string("unexpected exception: ") +
502                                      ex.what()));
503     return failConnect(__func__, tex);
504   }
505
506   // The connection succeeded immediately
507   // The read callback may not have been set yet, and no writes may be pending
508   // yet, so we don't have to register for any events at the moment.
509   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
510   assert(errMessageCallback_ == nullptr);
511   assert(readCallback_ == nullptr);
512   assert(writeReqHead_ == nullptr);
513   if (state_ != StateEnum::FAST_OPEN) {
514     state_ = StateEnum::ESTABLISHED;
515   }
516   invokeConnectSuccess();
517 }
518
519 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
520 #if __linux__
521   if (noTransparentTls_) {
522     // Ignore return value, errors are ok
523     setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
524   }
525   if (noTSocks_) {
526     VLOG(4) << "Disabling TSOCKS for fd " << fd_;
527     // Ignore return value, errors are ok
528     setsockopt(fd_, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
529   }
530 #endif
531   int rv = fsp::connect(fd_, saddr, len);
532   if (rv < 0) {
533     auto errnoCopy = errno;
534     if (errnoCopy == EINPROGRESS) {
535       scheduleConnectTimeout();
536       registerForConnectEvents();
537     } else {
538       throw AsyncSocketException(
539           AsyncSocketException::NOT_OPEN,
540           "connect failed (immediately)",
541           errnoCopy);
542     }
543   }
544   return rv;
545 }
546
547 void AsyncSocket::scheduleConnectTimeout() {
548   // Connection in progress.
549   auto timeout = connectTimeout_.count();
550   if (timeout > 0) {
551     // Start a timer in case the connection takes too long.
552     if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
553       throw AsyncSocketException(
554           AsyncSocketException::INTERNAL_ERROR,
555           withAddr("failed to schedule AsyncSocket connect timeout"));
556     }
557   }
558 }
559
560 void AsyncSocket::registerForConnectEvents() {
561   // Register for write events, so we'll
562   // be notified when the connection finishes/fails.
563   // Note that we don't register for a persistent event here.
564   assert(eventFlags_ == EventHandler::NONE);
565   eventFlags_ = EventHandler::WRITE;
566   if (!ioHandler_.registerHandler(eventFlags_)) {
567     throw AsyncSocketException(
568         AsyncSocketException::INTERNAL_ERROR,
569         withAddr("failed to register AsyncSocket connect handler"));
570   }
571 }
572
573 void AsyncSocket::connect(ConnectCallback* callback,
574                            const string& ip, uint16_t port,
575                            int timeout,
576                            const OptionMap &options) noexcept {
577   DestructorGuard dg(this);
578   try {
579     connectCallback_ = callback;
580     connect(callback, folly::SocketAddress(ip, port), timeout, options);
581   } catch (const std::exception& ex) {
582     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
583                             ex.what());
584     return failConnect(__func__, tex);
585   }
586 }
587
588 void AsyncSocket::cancelConnect() {
589   connectCallback_ = nullptr;
590   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
591     closeNow();
592   }
593 }
594
595 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
596   sendTimeout_ = milliseconds;
597   if (eventBase_) {
598     eventBase_->dcheckIsInEventBaseThread();
599   }
600
601   // If we are currently pending on write requests, immediately update
602   // writeTimeout_ with the new value.
603   if ((eventFlags_ & EventHandler::WRITE) &&
604       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
605     assert(state_ == StateEnum::ESTABLISHED);
606     assert((shutdownFlags_ & SHUT_WRITE) == 0);
607     if (sendTimeout_ > 0) {
608       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
609         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
610             withAddr("failed to reschedule send timeout in setSendTimeout"));
611         return failWrite(__func__, ex);
612       }
613     } else {
614       writeTimeout_.cancelTimeout();
615     }
616   }
617 }
618
619 void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
620   VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this
621           << ", fd=" << fd_ << ", callback=" << callback
622           << ", state=" << state_;
623
624   // Short circuit if callback is the same as the existing errMessageCallback_.
625   if (callback == errMessageCallback_) {
626     return;
627   }
628
629   if (!msgErrQueueSupported) {
630       // Per-socket error message queue is not supported on this platform.
631       return invalidState(callback);
632   }
633
634   DestructorGuard dg(this);
635   eventBase_->dcheckIsInEventBaseThread();
636
637   if (callback == nullptr) {
638     // We should be able to reset the callback regardless of the
639     // socket state. It's important to have a reliable callback
640     // cancellation mechanism.
641     errMessageCallback_ = callback;
642     return;
643   }
644
645   switch ((StateEnum)state_) {
646     case StateEnum::CONNECTING:
647     case StateEnum::FAST_OPEN:
648     case StateEnum::ESTABLISHED: {
649       errMessageCallback_ = callback;
650       return;
651     }
652     case StateEnum::CLOSED:
653     case StateEnum::ERROR:
654       // We should never reach here.  SHUT_READ should always be set
655       // if we are in STATE_CLOSED or STATE_ERROR.
656       assert(false);
657       return invalidState(callback);
658     case StateEnum::UNINIT:
659       // We do not allow setReadCallback() to be called before we start
660       // connecting.
661       return invalidState(callback);
662   }
663
664   // We don't put a default case in the switch statement, so that the compiler
665   // will warn us to update the switch statement if a new state is added.
666   return invalidState(callback);
667 }
668
669 AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
670   return errMessageCallback_;
671 }
672
673 void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
674   sendMsgParamCallback_ = callback;
675 }
676
677 AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {
678   return sendMsgParamCallback_;
679 }
680
681 void AsyncSocket::setReadCB(ReadCallback *callback) {
682   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
683           << ", callback=" << callback << ", state=" << state_;
684
685   // Short circuit if callback is the same as the existing readCallback_.
686   //
687   // Note that this is needed for proper functioning during some cleanup cases.
688   // During cleanup we allow setReadCallback(nullptr) to be called even if the
689   // read callback is already unset and we have been detached from an event
690   // base.  This check prevents us from asserting
691   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
692   if (callback == readCallback_) {
693     return;
694   }
695
696   /* We are removing a read callback */
697   if (callback == nullptr &&
698       immediateReadHandler_.isLoopCallbackScheduled()) {
699     immediateReadHandler_.cancelLoopCallback();
700   }
701
702   if (shutdownFlags_ & SHUT_READ) {
703     // Reads have already been shut down on this socket.
704     //
705     // Allow setReadCallback(nullptr) to be called in this case, but don't
706     // allow a new callback to be set.
707     //
708     // For example, setReadCallback(nullptr) can happen after an error if we
709     // invoke some other error callback before invoking readError().  The other
710     // error callback that is invoked first may go ahead and clear the read
711     // callback before we get a chance to invoke readError().
712     if (callback != nullptr) {
713       return invalidState(callback);
714     }
715     assert((eventFlags_ & EventHandler::READ) == 0);
716     readCallback_ = nullptr;
717     return;
718   }
719
720   DestructorGuard dg(this);
721   eventBase_->dcheckIsInEventBaseThread();
722
723   switch ((StateEnum)state_) {
724     case StateEnum::CONNECTING:
725     case StateEnum::FAST_OPEN:
726       // For convenience, we allow the read callback to be set while we are
727       // still connecting.  We just store the callback for now.  Once the
728       // connection completes we'll register for read events.
729       readCallback_ = callback;
730       return;
731     case StateEnum::ESTABLISHED:
732     {
733       readCallback_ = callback;
734       uint16_t oldFlags = eventFlags_;
735       if (readCallback_) {
736         eventFlags_ |= EventHandler::READ;
737       } else {
738         eventFlags_ &= ~EventHandler::READ;
739       }
740
741       // Update our registration if our flags have changed
742       if (eventFlags_ != oldFlags) {
743         // We intentionally ignore the return value here.
744         // updateEventRegistration() will move us into the error state if it
745         // fails, and we don't need to do anything else here afterwards.
746         (void)updateEventRegistration();
747       }
748
749       if (readCallback_) {
750         checkForImmediateRead();
751       }
752       return;
753     }
754     case StateEnum::CLOSED:
755     case StateEnum::ERROR:
756       // We should never reach here.  SHUT_READ should always be set
757       // if we are in STATE_CLOSED or STATE_ERROR.
758       assert(false);
759       return invalidState(callback);
760     case StateEnum::UNINIT:
761       // We do not allow setReadCallback() to be called before we start
762       // connecting.
763       return invalidState(callback);
764   }
765
766   // We don't put a default case in the switch statement, so that the compiler
767   // will warn us to update the switch statement if a new state is added.
768   return invalidState(callback);
769 }
770
771 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
772   return readCallback_;
773 }
774
775 void AsyncSocket::write(WriteCallback* callback,
776                          const void* buf, size_t bytes, WriteFlags flags) {
777   iovec op;
778   op.iov_base = const_cast<void*>(buf);
779   op.iov_len = bytes;
780   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
781 }
782
783 void AsyncSocket::writev(WriteCallback* callback,
784                           const iovec* vec,
785                           size_t count,
786                           WriteFlags flags) {
787   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
788 }
789
790 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
791                               WriteFlags flags) {
792   constexpr size_t kSmallSizeMax = 64;
793   size_t count = buf->countChainElements();
794   if (count <= kSmallSizeMax) {
795     // suppress "warning: variable length array 'vec' is used [-Wvla]"
796     FOLLY_PUSH_WARNING
797     FOLLY_GCC_DISABLE_WARNING("-Wvla")
798     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
799     FOLLY_POP_WARNING
800
801     writeChainImpl(callback, vec, count, std::move(buf), flags);
802   } else {
803     iovec* vec = new iovec[count];
804     writeChainImpl(callback, vec, count, std::move(buf), flags);
805     delete[] vec;
806   }
807 }
808
809 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
810     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
811   size_t veclen = buf->fillIov(vec, count);
812   writeImpl(callback, vec, veclen, std::move(buf), flags);
813 }
814
815 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
816                              size_t count, unique_ptr<IOBuf>&& buf,
817                              WriteFlags flags) {
818   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
819           << ", callback=" << callback << ", count=" << count
820           << ", state=" << state_;
821   DestructorGuard dg(this);
822   unique_ptr<IOBuf>ioBuf(std::move(buf));
823   eventBase_->dcheckIsInEventBaseThread();
824
825   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
826     // No new writes may be performed after the write side of the socket has
827     // been shutdown.
828     //
829     // We could just call callback->writeError() here to fail just this write.
830     // However, fail hard and use invalidState() to fail all outstanding
831     // callbacks and move the socket into the error state.  There's most likely
832     // a bug in the caller's code, so we abort everything rather than trying to
833     // proceed as best we can.
834     return invalidState(callback);
835   }
836
837   uint32_t countWritten = 0;
838   uint32_t partialWritten = 0;
839   ssize_t bytesWritten = 0;
840   bool mustRegister = false;
841   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
842       !connecting()) {
843     if (writeReqHead_ == nullptr) {
844       // If we are established and there are no other writes pending,
845       // we can attempt to perform the write immediately.
846       assert(writeReqTail_ == nullptr);
847       assert((eventFlags_ & EventHandler::WRITE) == 0);
848
849       auto writeResult = performWrite(
850           vec, uint32_t(count), flags, &countWritten, &partialWritten);
851       bytesWritten = writeResult.writeReturn;
852       if (bytesWritten < 0) {
853         auto errnoCopy = errno;
854         if (writeResult.exception) {
855           return failWrite(__func__, callback, 0, *writeResult.exception);
856         }
857         AsyncSocketException ex(
858             AsyncSocketException::INTERNAL_ERROR,
859             withAddr("writev failed"),
860             errnoCopy);
861         return failWrite(__func__, callback, 0, ex);
862       } else if (countWritten == count) {
863         // We successfully wrote everything.
864         // Invoke the callback and return.
865         if (callback) {
866           callback->writeSuccess();
867         }
868         return;
869       } else { // continue writing the next writeReq
870         if (bufferCallback_) {
871           bufferCallback_->onEgressBuffered();
872         }
873       }
874       if (!connecting()) {
875         // Writes might put the socket back into connecting state
876         // if TFO is enabled, and using TFO fails.
877         // This means that write timeouts would not be active, however
878         // connect timeouts would affect this stage.
879         mustRegister = true;
880       }
881     }
882   } else if (!connecting()) {
883     // Invalid state for writing
884     return invalidState(callback);
885   }
886
887   // Create a new WriteRequest to add to the queue
888   WriteRequest* req;
889   try {
890     req = BytesWriteRequest::newRequest(
891         this,
892         callback,
893         vec + countWritten,
894         uint32_t(count - countWritten),
895         partialWritten,
896         uint32_t(bytesWritten),
897         std::move(ioBuf),
898         flags);
899   } catch (const std::exception& ex) {
900     // we mainly expect to catch std::bad_alloc here
901     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
902         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
903     return failWrite(__func__, callback, size_t(bytesWritten), tex);
904   }
905   req->consume();
906   if (writeReqTail_ == nullptr) {
907     assert(writeReqHead_ == nullptr);
908     writeReqHead_ = writeReqTail_ = req;
909   } else {
910     writeReqTail_->append(req);
911     writeReqTail_ = req;
912   }
913
914   // Register for write events if are established and not currently
915   // waiting on write events
916   if (mustRegister) {
917     assert(state_ == StateEnum::ESTABLISHED);
918     assert((eventFlags_ & EventHandler::WRITE) == 0);
919     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
920       assert(state_ == StateEnum::ERROR);
921       return;
922     }
923     if (sendTimeout_ > 0) {
924       // Schedule a timeout to fire if the write takes too long.
925       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
926         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
927                                withAddr("failed to schedule send timeout"));
928         return failWrite(__func__, ex);
929       }
930     }
931   }
932 }
933
934 void AsyncSocket::writeRequest(WriteRequest* req) {
935   if (writeReqTail_ == nullptr) {
936     assert(writeReqHead_ == nullptr);
937     writeReqHead_ = writeReqTail_ = req;
938     req->start();
939   } else {
940     writeReqTail_->append(req);
941     writeReqTail_ = req;
942   }
943 }
944
945 void AsyncSocket::close() {
946   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
947           << ", state=" << state_ << ", shutdownFlags="
948           << std::hex << (int) shutdownFlags_;
949
950   // close() is only different from closeNow() when there are pending writes
951   // that need to drain before we can close.  In all other cases, just call
952   // closeNow().
953   //
954   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
955   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
956   // is still running.  (e.g., If there are multiple pending writes, and we
957   // call writeError() on the first one, it may call close().  In this case we
958   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
959   // writes will still be in the queue.)
960   //
961   // We only need to drain pending writes if we are still in STATE_CONNECTING
962   // or STATE_ESTABLISHED
963   if ((writeReqHead_ == nullptr) ||
964       !(state_ == StateEnum::CONNECTING ||
965       state_ == StateEnum::ESTABLISHED)) {
966     closeNow();
967     return;
968   }
969
970   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
971   // destroyed until close() returns.
972   DestructorGuard dg(this);
973   eventBase_->dcheckIsInEventBaseThread();
974
975   // Since there are write requests pending, we have to set the
976   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
977   // connect finishes and we finish writing these requests.
978   //
979   // Set SHUT_READ to indicate that reads are shut down, and set the
980   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
981   // pending writes complete.
982   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
983
984   // If a read callback is set, invoke readEOF() immediately to inform it that
985   // the socket has been closed and no more data can be read.
986   if (readCallback_) {
987     // Disable reads if they are enabled
988     if (!updateEventRegistration(0, EventHandler::READ)) {
989       // We're now in the error state; callbacks have been cleaned up
990       assert(state_ == StateEnum::ERROR);
991       assert(readCallback_ == nullptr);
992     } else {
993       ReadCallback* callback = readCallback_;
994       readCallback_ = nullptr;
995       callback->readEOF();
996     }
997   }
998 }
999
1000 void AsyncSocket::closeNow() {
1001   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
1002           << ", state=" << state_ << ", shutdownFlags="
1003           << std::hex << (int) shutdownFlags_;
1004   DestructorGuard dg(this);
1005   if (eventBase_) {
1006     eventBase_->dcheckIsInEventBaseThread();
1007   }
1008
1009   switch (state_) {
1010     case StateEnum::ESTABLISHED:
1011     case StateEnum::CONNECTING:
1012     case StateEnum::FAST_OPEN: {
1013       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1014       state_ = StateEnum::CLOSED;
1015
1016       // If the write timeout was set, cancel it.
1017       writeTimeout_.cancelTimeout();
1018
1019       // If we are registered for I/O events, unregister.
1020       if (eventFlags_ != EventHandler::NONE) {
1021         eventFlags_ = EventHandler::NONE;
1022         if (!updateEventRegistration()) {
1023           // We will have been moved into the error state.
1024           assert(state_ == StateEnum::ERROR);
1025           return;
1026         }
1027       }
1028
1029       if (immediateReadHandler_.isLoopCallbackScheduled()) {
1030         immediateReadHandler_.cancelLoopCallback();
1031       }
1032
1033       if (fd_ >= 0) {
1034         ioHandler_.changeHandlerFD(-1);
1035         doClose();
1036       }
1037
1038       invokeConnectErr(socketClosedLocallyEx);
1039
1040       failAllWrites(socketClosedLocallyEx);
1041
1042       if (readCallback_) {
1043         ReadCallback* callback = readCallback_;
1044         readCallback_ = nullptr;
1045         callback->readEOF();
1046       }
1047       return;
1048     }
1049     case StateEnum::CLOSED:
1050       // Do nothing.  It's possible that we are being called recursively
1051       // from inside a callback that we invoked inside another call to close()
1052       // that is still running.
1053       return;
1054     case StateEnum::ERROR:
1055       // Do nothing.  The error handling code has performed (or is performing)
1056       // cleanup.
1057       return;
1058     case StateEnum::UNINIT:
1059       assert(eventFlags_ == EventHandler::NONE);
1060       assert(connectCallback_ == nullptr);
1061       assert(readCallback_ == nullptr);
1062       assert(writeReqHead_ == nullptr);
1063       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1064       state_ = StateEnum::CLOSED;
1065       return;
1066   }
1067
1068   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
1069               << ") called in unknown state " << state_;
1070 }
1071
1072 void AsyncSocket::closeWithReset() {
1073   // Enable SO_LINGER, with the linger timeout set to 0.
1074   // This will trigger a TCP reset when we close the socket.
1075   if (fd_ >= 0) {
1076     struct linger optLinger = {1, 0};
1077     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1078       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
1079               << "on " << fd_ << ": errno=" << errno;
1080     }
1081   }
1082
1083   // Then let closeNow() take care of the rest
1084   closeNow();
1085 }
1086
1087 void AsyncSocket::shutdownWrite() {
1088   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
1089           << ", state=" << state_ << ", shutdownFlags="
1090           << std::hex << (int) shutdownFlags_;
1091
1092   // If there are no pending writes, shutdownWrite() is identical to
1093   // shutdownWriteNow().
1094   if (writeReqHead_ == nullptr) {
1095     shutdownWriteNow();
1096     return;
1097   }
1098
1099   eventBase_->dcheckIsInEventBaseThread();
1100
1101   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
1102   // shutdown will be performed once all writes complete.
1103   shutdownFlags_ |= SHUT_WRITE_PENDING;
1104 }
1105
1106 void AsyncSocket::shutdownWriteNow() {
1107   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
1108           << ", fd=" << fd_ << ", state=" << state_
1109           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
1110
1111   if (shutdownFlags_ & SHUT_WRITE) {
1112     // Writes are already shutdown; nothing else to do.
1113     return;
1114   }
1115
1116   // If SHUT_READ is already set, just call closeNow() to completely
1117   // close the socket.  This can happen if close() was called with writes
1118   // pending, and then shutdownWriteNow() is called before all pending writes
1119   // complete.
1120   if (shutdownFlags_ & SHUT_READ) {
1121     closeNow();
1122     return;
1123   }
1124
1125   DestructorGuard dg(this);
1126   if (eventBase_) {
1127     eventBase_->dcheckIsInEventBaseThread();
1128   }
1129
1130   switch (static_cast<StateEnum>(state_)) {
1131     case StateEnum::ESTABLISHED:
1132     {
1133       shutdownFlags_ |= SHUT_WRITE;
1134
1135       // If the write timeout was set, cancel it.
1136       writeTimeout_.cancelTimeout();
1137
1138       // If we are registered for write events, unregister.
1139       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1140         // We will have been moved into the error state.
1141         assert(state_ == StateEnum::ERROR);
1142         return;
1143       }
1144
1145       // Shutdown writes on the file descriptor
1146       shutdown(fd_, SHUT_WR);
1147
1148       // Immediately fail all write requests
1149       failAllWrites(socketShutdownForWritesEx);
1150       return;
1151     }
1152     case StateEnum::CONNECTING:
1153     {
1154       // Set the SHUT_WRITE_PENDING flag.
1155       // When the connection completes, it will check this flag,
1156       // shutdown the write half of the socket, and then set SHUT_WRITE.
1157       shutdownFlags_ |= SHUT_WRITE_PENDING;
1158
1159       // Immediately fail all write requests
1160       failAllWrites(socketShutdownForWritesEx);
1161       return;
1162     }
1163     case StateEnum::UNINIT:
1164       // Callers normally shouldn't call shutdownWriteNow() before the socket
1165       // even starts connecting.  Nonetheless, go ahead and set
1166       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1167       // immediately shut down the write side of the socket.
1168       shutdownFlags_ |= SHUT_WRITE_PENDING;
1169       return;
1170     case StateEnum::FAST_OPEN:
1171       // In fast open state we haven't call connected yet, and if we shutdown
1172       // the writes, we will never try to call connect, so shut everything down
1173       shutdownFlags_ |= SHUT_WRITE;
1174       // Immediately fail all write requests
1175       failAllWrites(socketShutdownForWritesEx);
1176       return;
1177     case StateEnum::CLOSED:
1178     case StateEnum::ERROR:
1179       // We should never get here.  SHUT_WRITE should always be set
1180       // in STATE_CLOSED and STATE_ERROR.
1181       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1182                  << ", fd=" << fd_ << ") in unexpected state " << state_
1183                  << " with SHUT_WRITE not set ("
1184                  << std::hex << (int) shutdownFlags_ << ")";
1185       assert(false);
1186       return;
1187   }
1188
1189   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1190               << fd_ << ") called in unknown state " << state_;
1191 }
1192
1193 bool AsyncSocket::readable() const {
1194   if (fd_ == -1) {
1195     return false;
1196   }
1197   struct pollfd fds[1];
1198   fds[0].fd = fd_;
1199   fds[0].events = POLLIN;
1200   fds[0].revents = 0;
1201   int rc = poll(fds, 1, 0);
1202   return rc == 1;
1203 }
1204
1205 bool AsyncSocket::writable() const {
1206   if (fd_ == -1) {
1207     return false;
1208   }
1209   struct pollfd fds[1];
1210   fds[0].fd = fd_;
1211   fds[0].events = POLLOUT;
1212   fds[0].revents = 0;
1213   int rc = poll(fds, 1, 0);
1214   return rc == 1;
1215 }
1216
1217 bool AsyncSocket::isPending() const {
1218   return ioHandler_.isPending();
1219 }
1220
1221 bool AsyncSocket::hangup() const {
1222   if (fd_ == -1) {
1223     // sanity check, no one should ask for hangup if we are not connected.
1224     assert(false);
1225     return false;
1226   }
1227 #ifdef POLLRDHUP // Linux-only
1228   struct pollfd fds[1];
1229   fds[0].fd = fd_;
1230   fds[0].events = POLLRDHUP|POLLHUP;
1231   fds[0].revents = 0;
1232   poll(fds, 1, 0);
1233   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1234 #else
1235   return false;
1236 #endif
1237 }
1238
1239 bool AsyncSocket::good() const {
1240   return (
1241       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1242        state_ == StateEnum::ESTABLISHED) &&
1243       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1244 }
1245
1246 bool AsyncSocket::error() const {
1247   return (state_ == StateEnum::ERROR);
1248 }
1249
1250 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1251   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1252           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1253           << ", state=" << state_ << ", events="
1254           << std::hex << eventFlags_ << ")";
1255   assert(eventBase_ == nullptr);
1256   eventBase->dcheckIsInEventBaseThread();
1257
1258   eventBase_ = eventBase;
1259   ioHandler_.attachEventBase(eventBase);
1260   writeTimeout_.attachEventBase(eventBase);
1261   if (evbChangeCb_) {
1262     evbChangeCb_->evbAttached(this);
1263   }
1264 }
1265
1266 void AsyncSocket::detachEventBase() {
1267   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1268           << ", old evb=" << eventBase_ << ", state=" << state_
1269           << ", events=" << std::hex << eventFlags_ << ")";
1270   assert(eventBase_ != nullptr);
1271   eventBase_->dcheckIsInEventBaseThread();
1272
1273   eventBase_ = nullptr;
1274   ioHandler_.detachEventBase();
1275   writeTimeout_.detachEventBase();
1276   if (evbChangeCb_) {
1277     evbChangeCb_->evbDetached(this);
1278   }
1279 }
1280
1281 bool AsyncSocket::isDetachable() const {
1282   DCHECK(eventBase_ != nullptr);
1283   eventBase_->dcheckIsInEventBaseThread();
1284
1285   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1286 }
1287
1288 void AsyncSocket::cacheAddresses() {
1289   if (fd_ >= 0) {
1290     try {
1291       cacheLocalAddress();
1292       cachePeerAddress();
1293     } catch (const std::system_error& e) {
1294       if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1295         VLOG(1) << "Error caching addresses: " << e.code().value() << ", "
1296                 << e.code().message();
1297       }
1298     }
1299   }
1300 }
1301
1302 void AsyncSocket::cacheLocalAddress() const {
1303   if (!localAddr_.isInitialized()) {
1304     localAddr_.setFromLocalAddress(fd_);
1305   }
1306 }
1307
1308 void AsyncSocket::cachePeerAddress() const {
1309   if (!addr_.isInitialized()) {
1310     addr_.setFromPeerAddress(fd_);
1311   }
1312 }
1313
1314 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1315   cacheLocalAddress();
1316   *address = localAddr_;
1317 }
1318
1319 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1320   cachePeerAddress();
1321   *address = addr_;
1322 }
1323
1324 bool AsyncSocket::getTFOSucceded() const {
1325   return detail::tfo_succeeded(fd_);
1326 }
1327
1328 int AsyncSocket::setNoDelay(bool noDelay) {
1329   if (fd_ < 0) {
1330     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1331                << this << "(state=" << state_ << ")";
1332     return EINVAL;
1333
1334   }
1335
1336   int value = noDelay ? 1 : 0;
1337   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1338     int errnoCopy = errno;
1339     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1340             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1341             << strerror(errnoCopy);
1342     return errnoCopy;
1343   }
1344
1345   return 0;
1346 }
1347
1348 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1349
1350   #ifndef TCP_CONGESTION
1351   #define TCP_CONGESTION  13
1352   #endif
1353
1354   if (fd_ < 0) {
1355     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1356                << "socket " << this << "(state=" << state_ << ")";
1357     return EINVAL;
1358
1359   }
1360
1361   if (setsockopt(
1362           fd_,
1363           IPPROTO_TCP,
1364           TCP_CONGESTION,
1365           cname.c_str(),
1366           socklen_t(cname.length() + 1)) != 0) {
1367     int errnoCopy = errno;
1368     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1369             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1370             << strerror(errnoCopy);
1371     return errnoCopy;
1372   }
1373
1374   return 0;
1375 }
1376
1377 int AsyncSocket::setQuickAck(bool quickack) {
1378   (void)quickack;
1379   if (fd_ < 0) {
1380     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1381                << this << "(state=" << state_ << ")";
1382     return EINVAL;
1383
1384   }
1385
1386 #ifdef TCP_QUICKACK // Linux-only
1387   int value = quickack ? 1 : 0;
1388   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1389     int errnoCopy = errno;
1390     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1391             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1392             << strerror(errnoCopy);
1393     return errnoCopy;
1394   }
1395
1396   return 0;
1397 #else
1398   return ENOSYS;
1399 #endif
1400 }
1401
1402 int AsyncSocket::setSendBufSize(size_t bufsize) {
1403   if (fd_ < 0) {
1404     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1405                << this << "(state=" << state_ << ")";
1406     return EINVAL;
1407   }
1408
1409   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1410     int errnoCopy = errno;
1411     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1412             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1413             << strerror(errnoCopy);
1414     return errnoCopy;
1415   }
1416
1417   return 0;
1418 }
1419
1420 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1421   if (fd_ < 0) {
1422     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1423                << this << "(state=" << state_ << ")";
1424     return EINVAL;
1425   }
1426
1427   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1428     int errnoCopy = errno;
1429     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1430             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1431             << strerror(errnoCopy);
1432     return errnoCopy;
1433   }
1434
1435   return 0;
1436 }
1437
1438 int AsyncSocket::setTCPProfile(int profd) {
1439   if (fd_ < 0) {
1440     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1441                << this << "(state=" << state_ << ")";
1442     return EINVAL;
1443   }
1444
1445   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1446     int errnoCopy = errno;
1447     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1448             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1449             << strerror(errnoCopy);
1450     return errnoCopy;
1451   }
1452
1453   return 0;
1454 }
1455
1456 void AsyncSocket::ioReady(uint16_t events) noexcept {
1457   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
1458           << ", events=" << std::hex << events << ", state=" << state_;
1459   DestructorGuard dg(this);
1460   assert(events & EventHandler::READ_WRITE);
1461   eventBase_->dcheckIsInEventBaseThread();
1462
1463   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1464   EventBase* originalEventBase = eventBase_;
1465   // If we got there it means that either EventHandler::READ or
1466   // EventHandler::WRITE is set. Any of these flags can
1467   // indicate that there are messages available in the socket
1468   // error message queue.
1469   handleErrMessages();
1470
1471   // Return now if handleErrMessages() detached us from our EventBase
1472   if (eventBase_ != originalEventBase) {
1473     return;
1474   }
1475
1476   if (relevantEvents == EventHandler::READ) {
1477     handleRead();
1478   } else if (relevantEvents == EventHandler::WRITE) {
1479     handleWrite();
1480   } else if (relevantEvents == EventHandler::READ_WRITE) {
1481     // If both read and write events are ready, process writes first.
1482     handleWrite();
1483
1484     // Return now if handleWrite() detached us from our EventBase
1485     if (eventBase_ != originalEventBase) {
1486       return;
1487     }
1488
1489     // Only call handleRead() if a read callback is still installed.
1490     // (It's possible that the read callback was uninstalled during
1491     // handleWrite().)
1492     if (readCallback_) {
1493       handleRead();
1494     }
1495   } else {
1496     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1497                << std::hex << events << "(this=" << this << ")";
1498     abort();
1499   }
1500 }
1501
1502 AsyncSocket::ReadResult
1503 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1504   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1505           << ", buflen=" << *buflen;
1506
1507   if (preReceivedData_ && !preReceivedData_->empty()) {
1508     VLOG(5) << "AsyncSocket::performRead() this=" << this
1509             << ", reading pre-received data";
1510
1511     io::Cursor cursor(preReceivedData_.get());
1512     auto len = cursor.pullAtMost(*buf, *buflen);
1513
1514     IOBufQueue queue;
1515     queue.append(std::move(preReceivedData_));
1516     queue.trimStart(len);
1517     preReceivedData_ = queue.move();
1518
1519     appBytesReceived_ += len;
1520     return ReadResult(len);
1521   }
1522
1523   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1524   if (bytes < 0) {
1525     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1526       // No more data to read right now.
1527       return ReadResult(READ_BLOCKING);
1528     } else {
1529       return ReadResult(READ_ERROR);
1530     }
1531   } else {
1532     appBytesReceived_ += bytes;
1533     return ReadResult(bytes);
1534   }
1535 }
1536
1537 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1538   // no matter what, buffer should be preapared for non-ssl socket
1539   CHECK(readCallback_);
1540   readCallback_->getReadBuffer(buf, buflen);
1541 }
1542
1543 void AsyncSocket::handleErrMessages() noexcept {
1544   // This method has non-empty implementation only for platforms
1545   // supporting per-socket error queues.
1546   VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
1547           << ", state=" << state_;
1548   if (errMessageCallback_ == nullptr) {
1549     VLOG(7) << "AsyncSocket::handleErrMessages(): "
1550             << "no callback installed - exiting.";
1551     return;
1552   }
1553
1554 #ifdef MSG_ERRQUEUE
1555   uint8_t ctrl[1024];
1556   unsigned char data;
1557   struct msghdr msg;
1558   iovec entry;
1559
1560   entry.iov_base = &data;
1561   entry.iov_len = sizeof(data);
1562   msg.msg_iov = &entry;
1563   msg.msg_iovlen = 1;
1564   msg.msg_name = nullptr;
1565   msg.msg_namelen = 0;
1566   msg.msg_control = ctrl;
1567   msg.msg_controllen = sizeof(ctrl);
1568   msg.msg_flags = 0;
1569
1570   int ret;
1571   while (true) {
1572     ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
1573     VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1574
1575     if (ret < 0) {
1576       if (errno != EAGAIN) {
1577         auto errnoCopy = errno;
1578         LOG(ERROR) << "::recvmsg exited with code " << ret
1579                    << ", errno: " << errnoCopy;
1580         AsyncSocketException ex(
1581           AsyncSocketException::INTERNAL_ERROR,
1582           withAddr("recvmsg() failed"),
1583           errnoCopy);
1584         failErrMessageRead(__func__, ex);
1585       }
1586       return;
1587     }
1588
1589     for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1590          cmsg != nullptr &&
1591            cmsg->cmsg_len != 0 &&
1592            errMessageCallback_ != nullptr;
1593          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1594       errMessageCallback_->errMessage(*cmsg);
1595     }
1596   }
1597 #endif //MSG_ERRQUEUE
1598 }
1599
1600 void AsyncSocket::handleRead() noexcept {
1601   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1602           << ", state=" << state_;
1603   assert(state_ == StateEnum::ESTABLISHED);
1604   assert((shutdownFlags_ & SHUT_READ) == 0);
1605   assert(readCallback_ != nullptr);
1606   assert(eventFlags_ & EventHandler::READ);
1607
1608   // Loop until:
1609   // - a read attempt would block
1610   // - readCallback_ is uninstalled
1611   // - the number of loop iterations exceeds the optional maximum
1612   // - this AsyncSocket is moved to another EventBase
1613   //
1614   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1615   // which is why need to check for it here.
1616   //
1617   // The last bullet point is slightly subtle.  readDataAvailable() may also
1618   // detach this socket from this EventBase.  However, before
1619   // readDataAvailable() returns another thread may pick it up, attach it to
1620   // a different EventBase, and install another readCallback_.  We need to
1621   // exit immediately after readDataAvailable() returns if the eventBase_ has
1622   // changed.  (The caller must perform some sort of locking to transfer the
1623   // AsyncSocket between threads properly.  This will be sufficient to ensure
1624   // that this thread sees the updated eventBase_ variable after
1625   // readDataAvailable() returns.)
1626   uint16_t numReads = 0;
1627   EventBase* originalEventBase = eventBase_;
1628   while (readCallback_ && eventBase_ == originalEventBase) {
1629     // Get the buffer to read into.
1630     void* buf = nullptr;
1631     size_t buflen = 0, offset = 0;
1632     try {
1633       prepareReadBuffer(&buf, &buflen);
1634       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1635     } catch (const AsyncSocketException& ex) {
1636       return failRead(__func__, ex);
1637     } catch (const std::exception& ex) {
1638       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1639                               string("ReadCallback::getReadBuffer() "
1640                                      "threw exception: ") +
1641                               ex.what());
1642       return failRead(__func__, tex);
1643     } catch (...) {
1644       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1645                              "ReadCallback::getReadBuffer() threw "
1646                              "non-exception type");
1647       return failRead(__func__, ex);
1648     }
1649     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1650       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1651                              "ReadCallback::getReadBuffer() returned "
1652                              "empty buffer");
1653       return failRead(__func__, ex);
1654     }
1655
1656     // Perform the read
1657     auto readResult = performRead(&buf, &buflen, &offset);
1658     auto bytesRead = readResult.readReturn;
1659     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1660             << bytesRead << " bytes";
1661     if (bytesRead > 0) {
1662       if (!isBufferMovable_) {
1663         readCallback_->readDataAvailable(size_t(bytesRead));
1664       } else {
1665         CHECK(kOpenSslModeMoveBufferOwnership);
1666         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1667                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1668                 << ", offset=" << offset;
1669         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1670         readBuf->trimStart(offset);
1671         readBuf->trimEnd(buflen - offset - bytesRead);
1672         readCallback_->readBufferAvailable(std::move(readBuf));
1673       }
1674
1675       // Fall through and continue around the loop if the read
1676       // completely filled the available buffer.
1677       // Note that readCallback_ may have been uninstalled or changed inside
1678       // readDataAvailable().
1679       if (size_t(bytesRead) < buflen) {
1680         return;
1681       }
1682     } else if (bytesRead == READ_BLOCKING) {
1683         // No more data to read right now.
1684         return;
1685     } else if (bytesRead == READ_ERROR) {
1686       readErr_ = READ_ERROR;
1687       if (readResult.exception) {
1688         return failRead(__func__, *readResult.exception);
1689       }
1690       auto errnoCopy = errno;
1691       AsyncSocketException ex(
1692           AsyncSocketException::INTERNAL_ERROR,
1693           withAddr("recv() failed"),
1694           errnoCopy);
1695       return failRead(__func__, ex);
1696     } else {
1697       assert(bytesRead == READ_EOF);
1698       readErr_ = READ_EOF;
1699       // EOF
1700       shutdownFlags_ |= SHUT_READ;
1701       if (!updateEventRegistration(0, EventHandler::READ)) {
1702         // we've already been moved into STATE_ERROR
1703         assert(state_ == StateEnum::ERROR);
1704         assert(readCallback_ == nullptr);
1705         return;
1706       }
1707
1708       ReadCallback* callback = readCallback_;
1709       readCallback_ = nullptr;
1710       callback->readEOF();
1711       return;
1712     }
1713     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1714       if (readCallback_ != nullptr) {
1715         // We might still have data in the socket.
1716         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1717         scheduleImmediateRead();
1718       }
1719       return;
1720     }
1721   }
1722 }
1723
1724 /**
1725  * This function attempts to write as much data as possible, until no more data
1726  * can be written.
1727  *
1728  * - If it sends all available data, it unregisters for write events, and stops
1729  *   the writeTimeout_.
1730  *
1731  * - If not all of the data can be sent immediately, it reschedules
1732  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1733  *   registered for write events.
1734  */
1735 void AsyncSocket::handleWrite() noexcept {
1736   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1737           << ", state=" << state_;
1738   DestructorGuard dg(this);
1739
1740   if (state_ == StateEnum::CONNECTING) {
1741     handleConnect();
1742     return;
1743   }
1744
1745   // Normal write
1746   assert(state_ == StateEnum::ESTABLISHED);
1747   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1748   assert(writeReqHead_ != nullptr);
1749
1750   // Loop until we run out of write requests,
1751   // or until this socket is moved to another EventBase.
1752   // (See the comment in handleRead() explaining how this can happen.)
1753   EventBase* originalEventBase = eventBase_;
1754   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1755     auto writeResult = writeReqHead_->performWrite();
1756     if (writeResult.writeReturn < 0) {
1757       if (writeResult.exception) {
1758         return failWrite(__func__, *writeResult.exception);
1759       }
1760       auto errnoCopy = errno;
1761       AsyncSocketException ex(
1762           AsyncSocketException::INTERNAL_ERROR,
1763           withAddr("writev() failed"),
1764           errnoCopy);
1765       return failWrite(__func__, ex);
1766     } else if (writeReqHead_->isComplete()) {
1767       // We finished this request
1768       WriteRequest* req = writeReqHead_;
1769       writeReqHead_ = req->getNext();
1770
1771       if (writeReqHead_ == nullptr) {
1772         writeReqTail_ = nullptr;
1773         // This is the last write request.
1774         // Unregister for write events and cancel the send timer
1775         // before we invoke the callback.  We have to update the state properly
1776         // before calling the callback, since it may want to detach us from
1777         // the EventBase.
1778         if (eventFlags_ & EventHandler::WRITE) {
1779           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1780             assert(state_ == StateEnum::ERROR);
1781             return;
1782           }
1783           // Stop the send timeout
1784           writeTimeout_.cancelTimeout();
1785         }
1786         assert(!writeTimeout_.isScheduled());
1787
1788         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1789         // we finish sending the last write request.
1790         //
1791         // We have to do this before invoking writeSuccess(), since
1792         // writeSuccess() may detach us from our EventBase.
1793         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1794           assert(connectCallback_ == nullptr);
1795           shutdownFlags_ |= SHUT_WRITE;
1796
1797           if (shutdownFlags_ & SHUT_READ) {
1798             // Reads have already been shutdown.  Fully close the socket and
1799             // move to STATE_CLOSED.
1800             //
1801             // Note: This code currently moves us to STATE_CLOSED even if
1802             // close() hasn't ever been called.  This can occur if we have
1803             // received EOF from the peer and shutdownWrite() has been called
1804             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1805             // case, until close() is actually called?  I can't think of a
1806             // reason why we would need to do so.  No other operations besides
1807             // calling close() or destroying the socket can be performed at
1808             // this point.
1809             assert(readCallback_ == nullptr);
1810             state_ = StateEnum::CLOSED;
1811             if (fd_ >= 0) {
1812               ioHandler_.changeHandlerFD(-1);
1813               doClose();
1814             }
1815           } else {
1816             // Reads are still enabled, so we are only doing a half-shutdown
1817             shutdown(fd_, SHUT_WR);
1818           }
1819         }
1820       }
1821
1822       // Invoke the callback
1823       WriteCallback* callback = req->getCallback();
1824       req->destroy();
1825       if (callback) {
1826         callback->writeSuccess();
1827       }
1828       // We'll continue around the loop, trying to write another request
1829     } else {
1830       // Partial write.
1831       if (bufferCallback_) {
1832         bufferCallback_->onEgressBuffered();
1833       }
1834       writeReqHead_->consume();
1835       // Stop after a partial write; it's highly likely that a subsequent write
1836       // attempt will just return EAGAIN.
1837       //
1838       // Ensure that we are registered for write events.
1839       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1840         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1841           assert(state_ == StateEnum::ERROR);
1842           return;
1843         }
1844       }
1845
1846       // Reschedule the send timeout, since we have made some write progress.
1847       if (sendTimeout_ > 0) {
1848         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1849           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1850               withAddr("failed to reschedule write timeout"));
1851           return failWrite(__func__, ex);
1852         }
1853       }
1854       return;
1855     }
1856   }
1857   if (!writeReqHead_ && bufferCallback_) {
1858     bufferCallback_->onEgressBufferCleared();
1859   }
1860 }
1861
1862 void AsyncSocket::checkForImmediateRead() noexcept {
1863   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1864   // (However, note that some subclasses do override this method.)
1865   //
1866   // Simply calling handleRead() here would be bad, as this would call
1867   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1868   // buffer even though no data may be available.  This would waste lots of
1869   // memory, since the buffer will sit around unused until the socket actually
1870   // becomes readable.
1871   //
1872   // Checking if the socket is readable now also seems like it would probably
1873   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1874   // would just waste an extra system call.  Even if it is readable, waiting to
1875   // find out from libevent on the next event loop doesn't seem that bad.
1876   //
1877   // The exception to this is if we have pre-received data. In that case there
1878   // is definitely data available immediately.
1879   if (preReceivedData_ && !preReceivedData_->empty()) {
1880     handleRead();
1881   }
1882 }
1883
1884 void AsyncSocket::handleInitialReadWrite() noexcept {
1885   // Our callers should already be holding a DestructorGuard, but grab
1886   // one here just to make sure, in case one of our calling code paths ever
1887   // changes.
1888   DestructorGuard dg(this);
1889   // If we have a readCallback_, make sure we enable read events.  We
1890   // may already be registered for reads if connectSuccess() set
1891   // the read calback.
1892   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1893     assert(state_ == StateEnum::ESTABLISHED);
1894     assert((shutdownFlags_ & SHUT_READ) == 0);
1895     if (!updateEventRegistration(EventHandler::READ, 0)) {
1896       assert(state_ == StateEnum::ERROR);
1897       return;
1898     }
1899     checkForImmediateRead();
1900   } else if (readCallback_ == nullptr) {
1901     // Unregister for read events.
1902     updateEventRegistration(0, EventHandler::READ);
1903   }
1904
1905   // If we have write requests pending, try to send them immediately.
1906   // Since we just finished accepting, there is a very good chance that we can
1907   // write without blocking.
1908   //
1909   // However, we only process them if EventHandler::WRITE is not already set,
1910   // which means that we're already blocked on a write attempt.  (This can
1911   // happen if connectSuccess() called write() before returning.)
1912   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1913     // Call handleWrite() to perform write processing.
1914     handleWrite();
1915   } else if (writeReqHead_ == nullptr) {
1916     // Unregister for write event.
1917     updateEventRegistration(0, EventHandler::WRITE);
1918   }
1919 }
1920
1921 void AsyncSocket::handleConnect() noexcept {
1922   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1923           << ", state=" << state_;
1924   assert(state_ == StateEnum::CONNECTING);
1925   // SHUT_WRITE can never be set while we are still connecting;
1926   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1927   // finishes
1928   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1929
1930   // In case we had a connect timeout, cancel the timeout
1931   writeTimeout_.cancelTimeout();
1932   // We don't use a persistent registration when waiting on a connect event,
1933   // so we have been automatically unregistered now.  Update eventFlags_ to
1934   // reflect reality.
1935   assert(eventFlags_ == EventHandler::WRITE);
1936   eventFlags_ = EventHandler::NONE;
1937
1938   // Call getsockopt() to check if the connect succeeded
1939   int error;
1940   socklen_t len = sizeof(error);
1941   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1942   if (rv != 0) {
1943     auto errnoCopy = errno;
1944     AsyncSocketException ex(
1945         AsyncSocketException::INTERNAL_ERROR,
1946         withAddr("error calling getsockopt() after connect"),
1947         errnoCopy);
1948     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1949                << fd_ << " host=" << addr_.describe()
1950                << ") exception:" << ex.what();
1951     return failConnect(__func__, ex);
1952   }
1953
1954   if (error != 0) {
1955     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1956                            "connect failed", error);
1957     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1958             << fd_ << " host=" << addr_.describe()
1959             << ") exception: " << ex.what();
1960     return failConnect(__func__, ex);
1961   }
1962
1963   // Move into STATE_ESTABLISHED
1964   state_ = StateEnum::ESTABLISHED;
1965
1966   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1967   // perform, immediately shutdown the write half of the socket.
1968   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1969     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1970     // are still connecting we just abort the connect rather than waiting for
1971     // it to complete.
1972     assert((shutdownFlags_ & SHUT_READ) == 0);
1973     shutdown(fd_, SHUT_WR);
1974     shutdownFlags_ |= SHUT_WRITE;
1975   }
1976
1977   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1978           << "successfully connected; state=" << state_;
1979
1980   // Remember the EventBase we are attached to, before we start invoking any
1981   // callbacks (since the callbacks may call detachEventBase()).
1982   EventBase* originalEventBase = eventBase_;
1983
1984   invokeConnectSuccess();
1985   // Note that the connect callback may have changed our state.
1986   // (set or unset the read callback, called write(), closed the socket, etc.)
1987   // The following code needs to handle these situations correctly.
1988   //
1989   // If the socket has been closed, readCallback_ and writeReqHead_ will
1990   // always be nullptr, so that will prevent us from trying to read or write.
1991   //
1992   // The main thing to check for is if eventBase_ is still originalEventBase.
1993   // If not, we have been detached from this event base, so we shouldn't
1994   // perform any more operations.
1995   if (eventBase_ != originalEventBase) {
1996     return;
1997   }
1998
1999   handleInitialReadWrite();
2000 }
2001
2002 void AsyncSocket::timeoutExpired() noexcept {
2003   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
2004           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
2005   DestructorGuard dg(this);
2006   eventBase_->dcheckIsInEventBaseThread();
2007
2008   if (state_ == StateEnum::CONNECTING) {
2009     // connect() timed out
2010     // Unregister for I/O events.
2011     if (connectCallback_) {
2012       AsyncSocketException ex(
2013           AsyncSocketException::TIMED_OUT,
2014           folly::sformat(
2015               "connect timed out after {}ms", connectTimeout_.count()));
2016       failConnect(__func__, ex);
2017     } else {
2018       // we faced a connect error without a connect callback, which could
2019       // happen due to TFO.
2020       AsyncSocketException ex(
2021           AsyncSocketException::TIMED_OUT, "write timed out during connection");
2022       failWrite(__func__, ex);
2023     }
2024   } else {
2025     // a normal write operation timed out
2026     AsyncSocketException ex(
2027         AsyncSocketException::TIMED_OUT,
2028         folly::sformat("write timed out after {}ms", sendTimeout_));
2029     failWrite(__func__, ex);
2030   }
2031 }
2032
2033 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
2034   return detail::tfo_sendmsg(fd, msg, msg_flags);
2035 }
2036
2037 AsyncSocket::WriteResult
2038 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
2039   ssize_t totalWritten = 0;
2040   if (state_ == StateEnum::FAST_OPEN) {
2041     sockaddr_storage addr;
2042     auto len = addr_.getAddress(&addr);
2043     msg->msg_name = &addr;
2044     msg->msg_namelen = len;
2045     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
2046     if (totalWritten >= 0) {
2047       tfoFinished_ = true;
2048       state_ = StateEnum::ESTABLISHED;
2049       // We schedule this asynchrously so that we don't end up
2050       // invoking initial read or write while a write is in progress.
2051       scheduleInitialReadWrite();
2052     } else if (errno == EINPROGRESS) {
2053       VLOG(4) << "TFO falling back to connecting";
2054       // A normal sendmsg doesn't return EINPROGRESS, however
2055       // TFO might fallback to connecting if there is no
2056       // cookie.
2057       state_ = StateEnum::CONNECTING;
2058       try {
2059         scheduleConnectTimeout();
2060         registerForConnectEvents();
2061       } catch (const AsyncSocketException& ex) {
2062         return WriteResult(
2063             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2064       }
2065       // Let's fake it that no bytes were written and return an errno.
2066       errno = EAGAIN;
2067       totalWritten = -1;
2068     } else if (errno == EOPNOTSUPP) {
2069       // Try falling back to connecting.
2070       VLOG(4) << "TFO not supported";
2071       state_ = StateEnum::CONNECTING;
2072       try {
2073         int ret = socketConnect((const sockaddr*)&addr, len);
2074         if (ret == 0) {
2075           // connect succeeded immediately
2076           // Treat this like no data was written.
2077           state_ = StateEnum::ESTABLISHED;
2078           scheduleInitialReadWrite();
2079         }
2080         // If there was no exception during connections,
2081         // we would return that no bytes were written.
2082         errno = EAGAIN;
2083         totalWritten = -1;
2084       } catch (const AsyncSocketException& ex) {
2085         return WriteResult(
2086             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2087       }
2088     } else if (errno == EAGAIN) {
2089       // Normally sendmsg would indicate that the write would block.
2090       // However in the fast open case, it would indicate that sendmsg
2091       // fell back to a connect. This is a return code from connect()
2092       // instead, and is an error condition indicating no fds available.
2093       return WriteResult(
2094           WRITE_ERROR,
2095           std::make_unique<AsyncSocketException>(
2096               AsyncSocketException::UNKNOWN, "No more free local ports"));
2097     }
2098   } else {
2099     totalWritten = ::sendmsg(fd, msg, msg_flags);
2100   }
2101   return WriteResult(totalWritten);
2102 }
2103
2104 AsyncSocket::WriteResult AsyncSocket::performWrite(
2105     const iovec* vec,
2106     uint32_t count,
2107     WriteFlags flags,
2108     uint32_t* countWritten,
2109     uint32_t* partialWritten) {
2110   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
2111   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
2112   // (since it may terminate the program if the main program doesn't explicitly
2113   // ignore it).
2114   struct msghdr msg;
2115   msg.msg_name = nullptr;
2116   msg.msg_namelen = 0;
2117   msg.msg_iov = const_cast<iovec *>(vec);
2118   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
2119   msg.msg_flags = 0;
2120   msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
2121   CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
2122            msg.msg_controllen);
2123
2124   if (msg.msg_controllen != 0) {
2125     msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
2126     sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
2127   } else {
2128     msg.msg_control = nullptr;
2129   }
2130   int msg_flags = sendMsgParamCallback_->getFlags(flags);
2131
2132   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2133   auto totalWritten = writeResult.writeReturn;
2134   if (totalWritten < 0) {
2135     bool tryAgain = (errno == EAGAIN);
2136 #ifdef __APPLE__
2137     // Apple has a bug where doing a second write on a socket which we
2138     // have opened with TFO causes an ENOTCONN to be thrown. However the
2139     // socket is really connected, so treat ENOTCONN as a EAGAIN until
2140     // this bug is fixed.
2141     tryAgain |= (errno == ENOTCONN);
2142 #endif
2143     if (!writeResult.exception && tryAgain) {
2144       // TCP buffer is full; we can't write any more data right now.
2145       *countWritten = 0;
2146       *partialWritten = 0;
2147       return WriteResult(0);
2148     }
2149     // error
2150     *countWritten = 0;
2151     *partialWritten = 0;
2152     return writeResult;
2153   }
2154
2155   appBytesWritten_ += totalWritten;
2156
2157   uint32_t bytesWritten;
2158   uint32_t n;
2159   for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
2160     const iovec* v = vec + n;
2161     if (v->iov_len > bytesWritten) {
2162       // Partial write finished in the middle of this iovec
2163       *countWritten = n;
2164       *partialWritten = bytesWritten;
2165       return WriteResult(totalWritten);
2166     }
2167
2168     bytesWritten -= uint32_t(v->iov_len);
2169   }
2170
2171   assert(bytesWritten == 0);
2172   *countWritten = n;
2173   *partialWritten = 0;
2174   return WriteResult(totalWritten);
2175 }
2176
2177 /**
2178  * Re-register the EventHandler after eventFlags_ has changed.
2179  *
2180  * If an error occurs, fail() is called to move the socket into the error state
2181  * and call all currently installed callbacks.  After an error, the
2182  * AsyncSocket is completely unregistered.
2183  *
2184  * @return Returns true on success, or false on error.
2185  */
2186 bool AsyncSocket::updateEventRegistration() {
2187   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
2188           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
2189           << ", events=" << std::hex << eventFlags_;
2190   eventBase_->dcheckIsInEventBaseThread();
2191   if (eventFlags_ == EventHandler::NONE) {
2192     ioHandler_.unregisterHandler();
2193     return true;
2194   }
2195
2196   // Always register for persistent events, so we don't have to re-register
2197   // after being called back.
2198   if (!ioHandler_.registerHandler(
2199           uint16_t(eventFlags_ | EventHandler::PERSIST))) {
2200     eventFlags_ = EventHandler::NONE; // we're not registered after error
2201     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2202         withAddr("failed to update AsyncSocket event registration"));
2203     fail("updateEventRegistration", ex);
2204     return false;
2205   }
2206
2207   return true;
2208 }
2209
2210 bool AsyncSocket::updateEventRegistration(uint16_t enable,
2211                                            uint16_t disable) {
2212   uint16_t oldFlags = eventFlags_;
2213   eventFlags_ |= enable;
2214   eventFlags_ &= ~disable;
2215   if (eventFlags_ == oldFlags) {
2216     return true;
2217   } else {
2218     return updateEventRegistration();
2219   }
2220 }
2221
2222 void AsyncSocket::startFail() {
2223   // startFail() should only be called once
2224   assert(state_ != StateEnum::ERROR);
2225   assert(getDestructorGuardCount() > 0);
2226   state_ = StateEnum::ERROR;
2227   // Ensure that SHUT_READ and SHUT_WRITE are set,
2228   // so all future attempts to read or write will be rejected
2229   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
2230
2231   if (eventFlags_ != EventHandler::NONE) {
2232     eventFlags_ = EventHandler::NONE;
2233     ioHandler_.unregisterHandler();
2234   }
2235   writeTimeout_.cancelTimeout();
2236
2237   if (fd_ >= 0) {
2238     ioHandler_.changeHandlerFD(-1);
2239     doClose();
2240   }
2241 }
2242
2243 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
2244   invokeConnectErr(ex);
2245   failAllWrites(ex);
2246
2247   if (readCallback_) {
2248     ReadCallback* callback = readCallback_;
2249     readCallback_ = nullptr;
2250     callback->readErr(ex);
2251   }
2252 }
2253
2254 void AsyncSocket::finishFail() {
2255   assert(state_ == StateEnum::ERROR);
2256   assert(getDestructorGuardCount() > 0);
2257
2258   AsyncSocketException ex(
2259       AsyncSocketException::INTERNAL_ERROR,
2260       withAddr("socket closing after error"));
2261   invokeAllErrors(ex);
2262 }
2263
2264 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2265   assert(state_ == StateEnum::ERROR);
2266   assert(getDestructorGuardCount() > 0);
2267   invokeAllErrors(ex);
2268 }
2269
2270 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2271   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2272              << state_ << " host=" << addr_.describe()
2273              << "): failed in " << fn << "(): "
2274              << ex.what();
2275   startFail();
2276   finishFail();
2277 }
2278
2279 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2280   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2281                << state_ << " host=" << addr_.describe()
2282                << "): failed while connecting in " << fn << "(): "
2283                << ex.what();
2284   startFail();
2285
2286   invokeConnectErr(ex);
2287   finishFail(ex);
2288 }
2289
2290 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2291   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2292                << state_ << " host=" << addr_.describe()
2293                << "): failed while reading in " << fn << "(): "
2294                << ex.what();
2295   startFail();
2296
2297   if (readCallback_ != nullptr) {
2298     ReadCallback* callback = readCallback_;
2299     readCallback_ = nullptr;
2300     callback->readErr(ex);
2301   }
2302
2303   finishFail();
2304 }
2305
2306 void AsyncSocket::failErrMessageRead(const char* fn,
2307                                      const AsyncSocketException& ex) {
2308   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2309                << state_ << " host=" << addr_.describe()
2310                << "): failed while reading message in " << fn << "(): "
2311                << ex.what();
2312   startFail();
2313
2314   if (errMessageCallback_ != nullptr) {
2315     ErrMessageCallback* callback = errMessageCallback_;
2316     errMessageCallback_ = nullptr;
2317     callback->errMessageError(ex);
2318   }
2319
2320   finishFail();
2321 }
2322
2323 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2324   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2325                << state_ << " host=" << addr_.describe()
2326                << "): failed while writing in " << fn << "(): "
2327                << ex.what();
2328   startFail();
2329
2330   // Only invoke the first write callback, since the error occurred while
2331   // writing this request.  Let any other pending write callbacks be invoked in
2332   // finishFail().
2333   if (writeReqHead_ != nullptr) {
2334     WriteRequest* req = writeReqHead_;
2335     writeReqHead_ = req->getNext();
2336     WriteCallback* callback = req->getCallback();
2337     uint32_t bytesWritten = req->getTotalBytesWritten();
2338     req->destroy();
2339     if (callback) {
2340       callback->writeErr(bytesWritten, ex);
2341     }
2342   }
2343
2344   finishFail();
2345 }
2346
2347 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2348                              size_t bytesWritten,
2349                              const AsyncSocketException& ex) {
2350   // This version of failWrite() is used when the failure occurs before
2351   // we've added the callback to writeReqHead_.
2352   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2353              << state_ << " host=" << addr_.describe()
2354              <<"): failed while writing in " << fn << "(): "
2355              << ex.what();
2356   startFail();
2357
2358   if (callback != nullptr) {
2359     callback->writeErr(bytesWritten, ex);
2360   }
2361
2362   finishFail();
2363 }
2364
2365 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2366   // Invoke writeError() on all write callbacks.
2367   // This is used when writes are forcibly shutdown with write requests
2368   // pending, or when an error occurs with writes pending.
2369   while (writeReqHead_ != nullptr) {
2370     WriteRequest* req = writeReqHead_;
2371     writeReqHead_ = req->getNext();
2372     WriteCallback* callback = req->getCallback();
2373     if (callback) {
2374       callback->writeErr(req->getTotalBytesWritten(), ex);
2375     }
2376     req->destroy();
2377   }
2378 }
2379
2380 void AsyncSocket::invalidState(ConnectCallback* callback) {
2381   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2382           << "): connect() called in invalid state " << state_;
2383
2384   /*
2385    * The invalidState() methods don't use the normal failure mechanisms,
2386    * since we don't know what state we are in.  We don't want to call
2387    * startFail()/finishFail() recursively if we are already in the middle of
2388    * cleaning up.
2389    */
2390
2391   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2392                          "connect() called with socket in invalid state");
2393   connectEndTime_ = std::chrono::steady_clock::now();
2394   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2395     if (callback) {
2396       callback->connectErr(ex);
2397     }
2398   } else {
2399     // We can't use failConnect() here since connectCallback_
2400     // may already be set to another callback.  Invoke this ConnectCallback
2401     // here; any other connectCallback_ will be invoked in finishFail()
2402     startFail();
2403     if (callback) {
2404       callback->connectErr(ex);
2405     }
2406     finishFail();
2407   }
2408 }
2409
2410 void AsyncSocket::invalidState(ErrMessageCallback* callback) {
2411   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2412           << "): setErrMessageCB(" << callback
2413           << ") called in invalid state " << state_;
2414
2415   AsyncSocketException ex(
2416       AsyncSocketException::NOT_OPEN,
2417       msgErrQueueSupported
2418       ? "setErrMessageCB() called with socket in invalid state"
2419       : "This platform does not support socket error message notifications");
2420   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2421     if (callback) {
2422       callback->errMessageError(ex);
2423     }
2424   } else {
2425     startFail();
2426     if (callback) {
2427       callback->errMessageError(ex);
2428     }
2429     finishFail();
2430   }
2431 }
2432
2433 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2434   connectEndTime_ = std::chrono::steady_clock::now();
2435   if (connectCallback_) {
2436     ConnectCallback* callback = connectCallback_;
2437     connectCallback_ = nullptr;
2438     callback->connectErr(ex);
2439   }
2440 }
2441
2442 void AsyncSocket::invokeConnectSuccess() {
2443   connectEndTime_ = std::chrono::steady_clock::now();
2444   if (connectCallback_) {
2445     ConnectCallback* callback = connectCallback_;
2446     connectCallback_ = nullptr;
2447     callback->connectSuccess();
2448   }
2449 }
2450
2451 void AsyncSocket::invalidState(ReadCallback* callback) {
2452   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2453              << "): setReadCallback(" << callback
2454              << ") called in invalid state " << state_;
2455
2456   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2457                          "setReadCallback() called with socket in "
2458                          "invalid state");
2459   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2460     if (callback) {
2461       callback->readErr(ex);
2462     }
2463   } else {
2464     startFail();
2465     if (callback) {
2466       callback->readErr(ex);
2467     }
2468     finishFail();
2469   }
2470 }
2471
2472 void AsyncSocket::invalidState(WriteCallback* callback) {
2473   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2474              << "): write() called in invalid state " << state_;
2475
2476   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2477                          withAddr("write() called with socket in invalid state"));
2478   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2479     if (callback) {
2480       callback->writeErr(0, ex);
2481     }
2482   } else {
2483     startFail();
2484     if (callback) {
2485       callback->writeErr(0, ex);
2486     }
2487     finishFail();
2488   }
2489 }
2490
2491 void AsyncSocket::doClose() {
2492   if (fd_ == -1) return;
2493   if (shutdownSocketSet_) {
2494     shutdownSocketSet_->close(fd_);
2495   } else {
2496     ::close(fd_);
2497   }
2498   fd_ = -1;
2499 }
2500
2501 std::ostream& operator << (std::ostream& os,
2502                            const AsyncSocket::StateEnum& state) {
2503   os << static_cast<int>(state);
2504   return os;
2505 }
2506
2507 std::string AsyncSocket::withAddr(const std::string& s) {
2508   // Don't use addr_ directly because it may not be initialized
2509   // e.g. if constructed from fd
2510   folly::SocketAddress peer, local;
2511   try {
2512     getPeerAddress(&peer);
2513     getLocalAddress(&local);
2514   } catch (const std::exception&) {
2515     // ignore
2516   } catch (...) {
2517     // ignore
2518   }
2519   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2520 }
2521
2522 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2523   bufferCallback_ = cb;
2524 }
2525
2526 } // folly