8a9a37e72a80fc03ecae6cf66bc3074bbb2dd38a
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/Portability.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/Sockets.h>
25 #include <folly/portability/SysUio.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <boost/preprocessor/control/if.hpp>
33
34 using std::string;
35 using std::unique_ptr;
36
37 namespace fsp = folly::portability::sockets;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   WriteResult performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags |= WriteFlags::CORK;
97     }
98     auto writeResult = socket_->performWrite(
99         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
100     bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
101     return writeResult;
102   }
103
104   bool isComplete() override {
105     return opsWritten_ == getOpCount();
106   }
107
108   void consume() override {
109     // Advance opIndex_ forward by opsWritten_
110     opIndex_ += opsWritten_;
111     assert(opIndex_ < opCount_);
112
113     // If we've finished writing any IOBufs, release them
114     if (ioBuf_) {
115       for (uint32_t i = opsWritten_; i != 0; --i) {
116         assert(ioBuf_);
117         ioBuf_ = ioBuf_->pop();
118       }
119     }
120
121     // Move partialBytes_ forward into the current iovec buffer
122     struct iovec* currentOp = writeOps_ + opIndex_;
123     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
124     currentOp->iov_base =
125       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
126     currentOp->iov_len -= partialBytes_;
127
128     // Increment the totalBytesWritten_ count by bytesWritten_;
129     assert(bytesWritten_ >= 0);
130     totalBytesWritten_ += uint32_t(bytesWritten_);
131   }
132
133  private:
134   BytesWriteRequest(AsyncSocket* socket,
135                     WriteCallback* callback,
136                     const struct iovec* ops,
137                     uint32_t opCount,
138                     uint32_t partialBytes,
139                     uint32_t bytesWritten,
140                     unique_ptr<IOBuf>&& ioBuf,
141                     WriteFlags flags)
142     : AsyncSocket::WriteRequest(socket, callback)
143     , opCount_(opCount)
144     , opIndex_(0)
145     , flags_(flags)
146     , ioBuf_(std::move(ioBuf))
147     , opsWritten_(0)
148     , partialBytes_(partialBytes)
149     , bytesWritten_(bytesWritten) {
150     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
151   }
152
153   // private destructor, to ensure callers use destroy()
154   ~BytesWriteRequest() override = default;
155
156   const struct iovec* getOps() const {
157     assert(opCount_ > opIndex_);
158     return writeOps_ + opIndex_;
159   }
160
161   uint32_t getOpCount() const {
162     assert(opCount_ > opIndex_);
163     return opCount_ - opIndex_;
164   }
165
166   uint32_t opCount_;            ///< number of entries in writeOps_
167   uint32_t opIndex_;            ///< current index into writeOps_
168   WriteFlags flags_;            ///< set for WriteFlags
169   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
170
171   // for consume(), how much we wrote on the last write
172   uint32_t opsWritten_;         ///< complete ops written
173   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
174   ssize_t bytesWritten_;        ///< bytes written altogether
175
176   struct iovec writeOps_[];     ///< write operation(s) list
177 };
178
179 AsyncSocket::AsyncSocket()
180     : eventBase_(nullptr),
181       writeTimeout_(this, nullptr),
182       ioHandler_(this, nullptr),
183       immediateReadHandler_(this) {
184   VLOG(5) << "new AsyncSocket()";
185   init();
186 }
187
188 AsyncSocket::AsyncSocket(EventBase* evb)
189     : eventBase_(evb),
190       writeTimeout_(this, evb),
191       ioHandler_(this, evb),
192       immediateReadHandler_(this) {
193   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
194   init();
195 }
196
197 AsyncSocket::AsyncSocket(EventBase* evb,
198                            const folly::SocketAddress& address,
199                            uint32_t connectTimeout)
200   : AsyncSocket(evb) {
201   connect(nullptr, address, connectTimeout);
202 }
203
204 AsyncSocket::AsyncSocket(EventBase* evb,
205                            const std::string& ip,
206                            uint16_t port,
207                            uint32_t connectTimeout)
208   : AsyncSocket(evb) {
209   connect(nullptr, ip, port, connectTimeout);
210 }
211
212 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
213     : eventBase_(evb),
214       writeTimeout_(this, evb),
215       ioHandler_(this, evb, fd),
216       immediateReadHandler_(this) {
217   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
218           << fd << ")";
219   init();
220   fd_ = fd;
221   setCloseOnExec();
222   state_ = StateEnum::ESTABLISHED;
223 }
224
225 // init() method, since constructor forwarding isn't supported in most
226 // compilers yet.
227 void AsyncSocket::init() {
228   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
229   shutdownFlags_ = 0;
230   state_ = StateEnum::UNINIT;
231   eventFlags_ = EventHandler::NONE;
232   fd_ = -1;
233   sendTimeout_ = 0;
234   maxReadsPerEvent_ = 16;
235   connectCallback_ = nullptr;
236   readCallback_ = nullptr;
237   writeReqHead_ = nullptr;
238   writeReqTail_ = nullptr;
239   shutdownSocketSet_ = nullptr;
240   appBytesWritten_ = 0;
241   appBytesReceived_ = 0;
242 }
243
244 AsyncSocket::~AsyncSocket() {
245   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
246           << ", evb=" << eventBase_ << ", fd=" << fd_
247           << ", state=" << state_ << ")";
248 }
249
250 void AsyncSocket::destroy() {
251   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
252           << ", fd=" << fd_ << ", state=" << state_;
253   // When destroy is called, close the socket immediately
254   closeNow();
255
256   // Then call DelayedDestruction::destroy() to take care of
257   // whether or not we need immediate or delayed destruction
258   DelayedDestruction::destroy();
259 }
260
261 int AsyncSocket::detachFd() {
262   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
263           << ", evb=" << eventBase_ << ", state=" << state_
264           << ", events=" << std::hex << eventFlags_ << ")";
265   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
266   // actually close the descriptor.
267   if (shutdownSocketSet_) {
268     shutdownSocketSet_->remove(fd_);
269   }
270   int fd = fd_;
271   fd_ = -1;
272   // Call closeNow() to invoke all pending callbacks with an error.
273   closeNow();
274   // Update the EventHandler to stop using this fd.
275   // This can only be done after closeNow() unregisters the handler.
276   ioHandler_.changeHandlerFD(-1);
277   return fd;
278 }
279
280 const folly::SocketAddress& AsyncSocket::anyAddress() {
281   static const folly::SocketAddress anyAddress =
282     folly::SocketAddress("0.0.0.0", 0);
283   return anyAddress;
284 }
285
286 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
287   if (shutdownSocketSet_ == newSS) {
288     return;
289   }
290   if (shutdownSocketSet_ && fd_ != -1) {
291     shutdownSocketSet_->remove(fd_);
292   }
293   shutdownSocketSet_ = newSS;
294   if (shutdownSocketSet_ && fd_ != -1) {
295     shutdownSocketSet_->add(fd_);
296   }
297 }
298
299 void AsyncSocket::setCloseOnExec() {
300   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
301   if (rv != 0) {
302     auto errnoCopy = errno;
303     throw AsyncSocketException(
304         AsyncSocketException::INTERNAL_ERROR,
305         withAddr("failed to set close-on-exec flag"),
306         errnoCopy);
307   }
308 }
309
310 void AsyncSocket::connect(ConnectCallback* callback,
311                            const folly::SocketAddress& address,
312                            int timeout,
313                            const OptionMap &options,
314                            const folly::SocketAddress& bindAddr) noexcept {
315   DestructorGuard dg(this);
316   assert(eventBase_->isInEventBaseThread());
317
318   addr_ = address;
319
320   // Make sure we're in the uninitialized state
321   if (state_ != StateEnum::UNINIT) {
322     return invalidState(callback);
323   }
324
325   connectTimeout_ = std::chrono::milliseconds(timeout);
326   connectStartTime_ = std::chrono::steady_clock::now();
327   // Make connect end time at least >= connectStartTime.
328   connectEndTime_ = connectStartTime_;
329
330   assert(fd_ == -1);
331   state_ = StateEnum::CONNECTING;
332   connectCallback_ = callback;
333
334   sockaddr_storage addrStorage;
335   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
336
337   try {
338     // Create the socket
339     // Technically the first parameter should actually be a protocol family
340     // constant (PF_xxx) rather than an address family (AF_xxx), but the
341     // distinction is mainly just historical.  In pretty much all
342     // implementations the PF_foo and AF_foo constants are identical.
343     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
344     if (fd_ < 0) {
345       auto errnoCopy = errno;
346       throw AsyncSocketException(
347           AsyncSocketException::INTERNAL_ERROR,
348           withAddr("failed to create socket"),
349           errnoCopy);
350     }
351     if (shutdownSocketSet_) {
352       shutdownSocketSet_->add(fd_);
353     }
354     ioHandler_.changeHandlerFD(fd_);
355
356     setCloseOnExec();
357
358     // Put the socket in non-blocking mode
359     int flags = fcntl(fd_, F_GETFL, 0);
360     if (flags == -1) {
361       auto errnoCopy = errno;
362       throw AsyncSocketException(
363           AsyncSocketException::INTERNAL_ERROR,
364           withAddr("failed to get socket flags"),
365           errnoCopy);
366     }
367     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
368     if (rv == -1) {
369       auto errnoCopy = errno;
370       throw AsyncSocketException(
371           AsyncSocketException::INTERNAL_ERROR,
372           withAddr("failed to put socket in non-blocking mode"),
373           errnoCopy);
374     }
375
376 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
377     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
378     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
379     if (rv == -1) {
380       auto errnoCopy = errno;
381       throw AsyncSocketException(
382           AsyncSocketException::INTERNAL_ERROR,
383           "failed to enable F_SETNOSIGPIPE on socket",
384           errnoCopy);
385     }
386 #endif
387
388     // By default, turn on TCP_NODELAY
389     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
390     // setNoDelay() will log an error message if it fails.
391     if (address.getFamily() != AF_UNIX) {
392       (void)setNoDelay(true);
393     }
394
395     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
396             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
397
398     // bind the socket
399     if (bindAddr != anyAddress()) {
400       int one = 1;
401       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
402         auto errnoCopy = errno;
403         doClose();
404         throw AsyncSocketException(
405             AsyncSocketException::NOT_OPEN,
406             "failed to setsockopt prior to bind on " + bindAddr.describe(),
407             errnoCopy);
408       }
409
410       bindAddr.getAddress(&addrStorage);
411
412       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
413         auto errnoCopy = errno;
414         doClose();
415         throw AsyncSocketException(
416             AsyncSocketException::NOT_OPEN,
417             "failed to bind to async socket: " + bindAddr.describe(),
418             errnoCopy);
419       }
420     }
421
422     // Apply the additional options if any.
423     for (const auto& opt: options) {
424       rv = opt.first.apply(fd_, opt.second);
425       if (rv != 0) {
426         auto errnoCopy = errno;
427         throw AsyncSocketException(
428             AsyncSocketException::INTERNAL_ERROR,
429             withAddr("failed to set socket option"),
430             errnoCopy);
431       }
432     }
433
434     // Perform the connect()
435     address.getAddress(&addrStorage);
436
437     if (tfoEnabled_) {
438       state_ = StateEnum::FAST_OPEN;
439       tfoAttempted_ = true;
440     } else {
441       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
442         return;
443       }
444     }
445
446     // If we're still here the connect() succeeded immediately.
447     // Fall through to call the callback outside of this try...catch block
448   } catch (const AsyncSocketException& ex) {
449     return failConnect(__func__, ex);
450   } catch (const std::exception& ex) {
451     // shouldn't happen, but handle it just in case
452     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
453                << "): unexpected " << typeid(ex).name() << " exception: "
454                << ex.what();
455     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
456                             withAddr(string("unexpected exception: ") +
457                                      ex.what()));
458     return failConnect(__func__, tex);
459   }
460
461   // The connection succeeded immediately
462   // The read callback may not have been set yet, and no writes may be pending
463   // yet, so we don't have to register for any events at the moment.
464   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
465   assert(readCallback_ == nullptr);
466   assert(writeReqHead_ == nullptr);
467   if (state_ != StateEnum::FAST_OPEN) {
468     state_ = StateEnum::ESTABLISHED;
469   }
470   invokeConnectSuccess();
471 }
472
473 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
474 #if __linux__
475   if (noTransparentTls_) {
476     // Ignore return value, errors are ok
477     setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
478   }
479 #endif
480   int rv = fsp::connect(fd_, saddr, len);
481   if (rv < 0) {
482     auto errnoCopy = errno;
483     if (errnoCopy == EINPROGRESS) {
484       scheduleConnectTimeout();
485       registerForConnectEvents();
486     } else {
487       throw AsyncSocketException(
488           AsyncSocketException::NOT_OPEN,
489           "connect failed (immediately)",
490           errnoCopy);
491     }
492   }
493   return rv;
494 }
495
496 void AsyncSocket::scheduleConnectTimeout() {
497   // Connection in progress.
498   auto timeout = connectTimeout_.count();
499   if (timeout > 0) {
500     // Start a timer in case the connection takes too long.
501     if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
502       throw AsyncSocketException(
503           AsyncSocketException::INTERNAL_ERROR,
504           withAddr("failed to schedule AsyncSocket connect timeout"));
505     }
506   }
507 }
508
509 void AsyncSocket::registerForConnectEvents() {
510   // Register for write events, so we'll
511   // be notified when the connection finishes/fails.
512   // Note that we don't register for a persistent event here.
513   assert(eventFlags_ == EventHandler::NONE);
514   eventFlags_ = EventHandler::WRITE;
515   if (!ioHandler_.registerHandler(eventFlags_)) {
516     throw AsyncSocketException(
517         AsyncSocketException::INTERNAL_ERROR,
518         withAddr("failed to register AsyncSocket connect handler"));
519   }
520 }
521
522 void AsyncSocket::connect(ConnectCallback* callback,
523                            const string& ip, uint16_t port,
524                            int timeout,
525                            const OptionMap &options) noexcept {
526   DestructorGuard dg(this);
527   try {
528     connectCallback_ = callback;
529     connect(callback, folly::SocketAddress(ip, port), timeout, options);
530   } catch (const std::exception& ex) {
531     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
532                             ex.what());
533     return failConnect(__func__, tex);
534   }
535 }
536
537 void AsyncSocket::cancelConnect() {
538   connectCallback_ = nullptr;
539   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
540     closeNow();
541   }
542 }
543
544 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
545   sendTimeout_ = milliseconds;
546   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
547
548   // If we are currently pending on write requests, immediately update
549   // writeTimeout_ with the new value.
550   if ((eventFlags_ & EventHandler::WRITE) &&
551       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
552     assert(state_ == StateEnum::ESTABLISHED);
553     assert((shutdownFlags_ & SHUT_WRITE) == 0);
554     if (sendTimeout_ > 0) {
555       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
556         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
557             withAddr("failed to reschedule send timeout in setSendTimeout"));
558         return failWrite(__func__, ex);
559       }
560     } else {
561       writeTimeout_.cancelTimeout();
562     }
563   }
564 }
565
566 void AsyncSocket::setReadCB(ReadCallback *callback) {
567   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
568           << ", callback=" << callback << ", state=" << state_;
569
570   // Short circuit if callback is the same as the existing readCallback_.
571   //
572   // Note that this is needed for proper functioning during some cleanup cases.
573   // During cleanup we allow setReadCallback(nullptr) to be called even if the
574   // read callback is already unset and we have been detached from an event
575   // base.  This check prevents us from asserting
576   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
577   if (callback == readCallback_) {
578     return;
579   }
580
581   /* We are removing a read callback */
582   if (callback == nullptr &&
583       immediateReadHandler_.isLoopCallbackScheduled()) {
584     immediateReadHandler_.cancelLoopCallback();
585   }
586
587   if (shutdownFlags_ & SHUT_READ) {
588     // Reads have already been shut down on this socket.
589     //
590     // Allow setReadCallback(nullptr) to be called in this case, but don't
591     // allow a new callback to be set.
592     //
593     // For example, setReadCallback(nullptr) can happen after an error if we
594     // invoke some other error callback before invoking readError().  The other
595     // error callback that is invoked first may go ahead and clear the read
596     // callback before we get a chance to invoke readError().
597     if (callback != nullptr) {
598       return invalidState(callback);
599     }
600     assert((eventFlags_ & EventHandler::READ) == 0);
601     readCallback_ = nullptr;
602     return;
603   }
604
605   DestructorGuard dg(this);
606   assert(eventBase_->isInEventBaseThread());
607
608   switch ((StateEnum)state_) {
609     case StateEnum::CONNECTING:
610     case StateEnum::FAST_OPEN:
611       // For convenience, we allow the read callback to be set while we are
612       // still connecting.  We just store the callback for now.  Once the
613       // connection completes we'll register for read events.
614       readCallback_ = callback;
615       return;
616     case StateEnum::ESTABLISHED:
617     {
618       readCallback_ = callback;
619       uint16_t oldFlags = eventFlags_;
620       if (readCallback_) {
621         eventFlags_ |= EventHandler::READ;
622       } else {
623         eventFlags_ &= ~EventHandler::READ;
624       }
625
626       // Update our registration if our flags have changed
627       if (eventFlags_ != oldFlags) {
628         // We intentionally ignore the return value here.
629         // updateEventRegistration() will move us into the error state if it
630         // fails, and we don't need to do anything else here afterwards.
631         (void)updateEventRegistration();
632       }
633
634       if (readCallback_) {
635         checkForImmediateRead();
636       }
637       return;
638     }
639     case StateEnum::CLOSED:
640     case StateEnum::ERROR:
641       // We should never reach here.  SHUT_READ should always be set
642       // if we are in STATE_CLOSED or STATE_ERROR.
643       assert(false);
644       return invalidState(callback);
645     case StateEnum::UNINIT:
646       // We do not allow setReadCallback() to be called before we start
647       // connecting.
648       return invalidState(callback);
649   }
650
651   // We don't put a default case in the switch statement, so that the compiler
652   // will warn us to update the switch statement if a new state is added.
653   return invalidState(callback);
654 }
655
656 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
657   return readCallback_;
658 }
659
660 void AsyncSocket::write(WriteCallback* callback,
661                          const void* buf, size_t bytes, WriteFlags flags) {
662   iovec op;
663   op.iov_base = const_cast<void*>(buf);
664   op.iov_len = bytes;
665   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
666 }
667
668 void AsyncSocket::writev(WriteCallback* callback,
669                           const iovec* vec,
670                           size_t count,
671                           WriteFlags flags) {
672   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
673 }
674
675 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
676                               WriteFlags flags) {
677   constexpr size_t kSmallSizeMax = 64;
678   size_t count = buf->countChainElements();
679   if (count <= kSmallSizeMax) {
680     // suppress "warning: variable length array 'vec' is used [-Wvla]"
681     FOLLY_PUSH_WARNING;
682     FOLLY_GCC_DISABLE_WARNING(vla);
683     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
684     FOLLY_POP_WARNING;
685
686     writeChainImpl(callback, vec, count, std::move(buf), flags);
687   } else {
688     iovec* vec = new iovec[count];
689     writeChainImpl(callback, vec, count, std::move(buf), flags);
690     delete[] vec;
691   }
692 }
693
694 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
695     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
696   size_t veclen = buf->fillIov(vec, count);
697   writeImpl(callback, vec, veclen, std::move(buf), flags);
698 }
699
700 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
701                              size_t count, unique_ptr<IOBuf>&& buf,
702                              WriteFlags flags) {
703   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
704           << ", callback=" << callback << ", count=" << count
705           << ", state=" << state_;
706   DestructorGuard dg(this);
707   unique_ptr<IOBuf>ioBuf(std::move(buf));
708   assert(eventBase_->isInEventBaseThread());
709
710   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
711     // No new writes may be performed after the write side of the socket has
712     // been shutdown.
713     //
714     // We could just call callback->writeError() here to fail just this write.
715     // However, fail hard and use invalidState() to fail all outstanding
716     // callbacks and move the socket into the error state.  There's most likely
717     // a bug in the caller's code, so we abort everything rather than trying to
718     // proceed as best we can.
719     return invalidState(callback);
720   }
721
722   uint32_t countWritten = 0;
723   uint32_t partialWritten = 0;
724   ssize_t bytesWritten = 0;
725   bool mustRegister = false;
726   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
727       !connecting()) {
728     if (writeReqHead_ == nullptr) {
729       // If we are established and there are no other writes pending,
730       // we can attempt to perform the write immediately.
731       assert(writeReqTail_ == nullptr);
732       assert((eventFlags_ & EventHandler::WRITE) == 0);
733
734       auto writeResult = performWrite(
735           vec, uint32_t(count), flags, &countWritten, &partialWritten);
736       bytesWritten = writeResult.writeReturn;
737       if (bytesWritten < 0) {
738         auto errnoCopy = errno;
739         if (writeResult.exception) {
740           return failWrite(__func__, callback, 0, *writeResult.exception);
741         }
742         AsyncSocketException ex(
743             AsyncSocketException::INTERNAL_ERROR,
744             withAddr("writev failed"),
745             errnoCopy);
746         return failWrite(__func__, callback, 0, ex);
747       } else if (countWritten == count) {
748         // We successfully wrote everything.
749         // Invoke the callback and return.
750         if (callback) {
751           callback->writeSuccess();
752         }
753         return;
754       } else { // continue writing the next writeReq
755         if (bufferCallback_) {
756           bufferCallback_->onEgressBuffered();
757         }
758       }
759       if (!connecting()) {
760         // Writes might put the socket back into connecting state
761         // if TFO is enabled, and using TFO fails.
762         // This means that write timeouts would not be active, however
763         // connect timeouts would affect this stage.
764         mustRegister = true;
765       }
766     }
767   } else if (!connecting()) {
768     // Invalid state for writing
769     return invalidState(callback);
770   }
771
772   // Create a new WriteRequest to add to the queue
773   WriteRequest* req;
774   try {
775     req = BytesWriteRequest::newRequest(
776         this,
777         callback,
778         vec + countWritten,
779         uint32_t(count - countWritten),
780         partialWritten,
781         uint32_t(bytesWritten),
782         std::move(ioBuf),
783         flags);
784   } catch (const std::exception& ex) {
785     // we mainly expect to catch std::bad_alloc here
786     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
787         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
788     return failWrite(__func__, callback, size_t(bytesWritten), tex);
789   }
790   req->consume();
791   if (writeReqTail_ == nullptr) {
792     assert(writeReqHead_ == nullptr);
793     writeReqHead_ = writeReqTail_ = req;
794   } else {
795     writeReqTail_->append(req);
796     writeReqTail_ = req;
797   }
798
799   // Register for write events if are established and not currently
800   // waiting on write events
801   if (mustRegister) {
802     assert(state_ == StateEnum::ESTABLISHED);
803     assert((eventFlags_ & EventHandler::WRITE) == 0);
804     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
805       assert(state_ == StateEnum::ERROR);
806       return;
807     }
808     if (sendTimeout_ > 0) {
809       // Schedule a timeout to fire if the write takes too long.
810       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
811         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
812                                withAddr("failed to schedule send timeout"));
813         return failWrite(__func__, ex);
814       }
815     }
816   }
817 }
818
819 void AsyncSocket::writeRequest(WriteRequest* req) {
820   if (writeReqTail_ == nullptr) {
821     assert(writeReqHead_ == nullptr);
822     writeReqHead_ = writeReqTail_ = req;
823     req->start();
824   } else {
825     writeReqTail_->append(req);
826     writeReqTail_ = req;
827   }
828 }
829
830 void AsyncSocket::close() {
831   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
832           << ", state=" << state_ << ", shutdownFlags="
833           << std::hex << (int) shutdownFlags_;
834
835   // close() is only different from closeNow() when there are pending writes
836   // that need to drain before we can close.  In all other cases, just call
837   // closeNow().
838   //
839   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
840   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
841   // is still running.  (e.g., If there are multiple pending writes, and we
842   // call writeError() on the first one, it may call close().  In this case we
843   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
844   // writes will still be in the queue.)
845   //
846   // We only need to drain pending writes if we are still in STATE_CONNECTING
847   // or STATE_ESTABLISHED
848   if ((writeReqHead_ == nullptr) ||
849       !(state_ == StateEnum::CONNECTING ||
850       state_ == StateEnum::ESTABLISHED)) {
851     closeNow();
852     return;
853   }
854
855   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
856   // destroyed until close() returns.
857   DestructorGuard dg(this);
858   assert(eventBase_->isInEventBaseThread());
859
860   // Since there are write requests pending, we have to set the
861   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
862   // connect finishes and we finish writing these requests.
863   //
864   // Set SHUT_READ to indicate that reads are shut down, and set the
865   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
866   // pending writes complete.
867   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
868
869   // If a read callback is set, invoke readEOF() immediately to inform it that
870   // the socket has been closed and no more data can be read.
871   if (readCallback_) {
872     // Disable reads if they are enabled
873     if (!updateEventRegistration(0, EventHandler::READ)) {
874       // We're now in the error state; callbacks have been cleaned up
875       assert(state_ == StateEnum::ERROR);
876       assert(readCallback_ == nullptr);
877     } else {
878       ReadCallback* callback = readCallback_;
879       readCallback_ = nullptr;
880       callback->readEOF();
881     }
882   }
883 }
884
885 void AsyncSocket::closeNow() {
886   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
887           << ", state=" << state_ << ", shutdownFlags="
888           << std::hex << (int) shutdownFlags_;
889   DestructorGuard dg(this);
890   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
891
892   switch (state_) {
893     case StateEnum::ESTABLISHED:
894     case StateEnum::CONNECTING:
895     case StateEnum::FAST_OPEN: {
896       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
897       state_ = StateEnum::CLOSED;
898
899       // If the write timeout was set, cancel it.
900       writeTimeout_.cancelTimeout();
901
902       // If we are registered for I/O events, unregister.
903       if (eventFlags_ != EventHandler::NONE) {
904         eventFlags_ = EventHandler::NONE;
905         if (!updateEventRegistration()) {
906           // We will have been moved into the error state.
907           assert(state_ == StateEnum::ERROR);
908           return;
909         }
910       }
911
912       if (immediateReadHandler_.isLoopCallbackScheduled()) {
913         immediateReadHandler_.cancelLoopCallback();
914       }
915
916       if (fd_ >= 0) {
917         ioHandler_.changeHandlerFD(-1);
918         doClose();
919       }
920
921       invokeConnectErr(socketClosedLocallyEx);
922
923       failAllWrites(socketClosedLocallyEx);
924
925       if (readCallback_) {
926         ReadCallback* callback = readCallback_;
927         readCallback_ = nullptr;
928         callback->readEOF();
929       }
930       return;
931     }
932     case StateEnum::CLOSED:
933       // Do nothing.  It's possible that we are being called recursively
934       // from inside a callback that we invoked inside another call to close()
935       // that is still running.
936       return;
937     case StateEnum::ERROR:
938       // Do nothing.  The error handling code has performed (or is performing)
939       // cleanup.
940       return;
941     case StateEnum::UNINIT:
942       assert(eventFlags_ == EventHandler::NONE);
943       assert(connectCallback_ == nullptr);
944       assert(readCallback_ == nullptr);
945       assert(writeReqHead_ == nullptr);
946       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
947       state_ = StateEnum::CLOSED;
948       return;
949   }
950
951   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
952               << ") called in unknown state " << state_;
953 }
954
955 void AsyncSocket::closeWithReset() {
956   // Enable SO_LINGER, with the linger timeout set to 0.
957   // This will trigger a TCP reset when we close the socket.
958   if (fd_ >= 0) {
959     struct linger optLinger = {1, 0};
960     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
961       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
962               << "on " << fd_ << ": errno=" << errno;
963     }
964   }
965
966   // Then let closeNow() take care of the rest
967   closeNow();
968 }
969
970 void AsyncSocket::shutdownWrite() {
971   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
972           << ", state=" << state_ << ", shutdownFlags="
973           << std::hex << (int) shutdownFlags_;
974
975   // If there are no pending writes, shutdownWrite() is identical to
976   // shutdownWriteNow().
977   if (writeReqHead_ == nullptr) {
978     shutdownWriteNow();
979     return;
980   }
981
982   assert(eventBase_->isInEventBaseThread());
983
984   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
985   // shutdown will be performed once all writes complete.
986   shutdownFlags_ |= SHUT_WRITE_PENDING;
987 }
988
989 void AsyncSocket::shutdownWriteNow() {
990   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
991           << ", fd=" << fd_ << ", state=" << state_
992           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
993
994   if (shutdownFlags_ & SHUT_WRITE) {
995     // Writes are already shutdown; nothing else to do.
996     return;
997   }
998
999   // If SHUT_READ is already set, just call closeNow() to completely
1000   // close the socket.  This can happen if close() was called with writes
1001   // pending, and then shutdownWriteNow() is called before all pending writes
1002   // complete.
1003   if (shutdownFlags_ & SHUT_READ) {
1004     closeNow();
1005     return;
1006   }
1007
1008   DestructorGuard dg(this);
1009   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
1010
1011   switch (static_cast<StateEnum>(state_)) {
1012     case StateEnum::ESTABLISHED:
1013     {
1014       shutdownFlags_ |= SHUT_WRITE;
1015
1016       // If the write timeout was set, cancel it.
1017       writeTimeout_.cancelTimeout();
1018
1019       // If we are registered for write events, unregister.
1020       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1021         // We will have been moved into the error state.
1022         assert(state_ == StateEnum::ERROR);
1023         return;
1024       }
1025
1026       // Shutdown writes on the file descriptor
1027       shutdown(fd_, SHUT_WR);
1028
1029       // Immediately fail all write requests
1030       failAllWrites(socketShutdownForWritesEx);
1031       return;
1032     }
1033     case StateEnum::CONNECTING:
1034     {
1035       // Set the SHUT_WRITE_PENDING flag.
1036       // When the connection completes, it will check this flag,
1037       // shutdown the write half of the socket, and then set SHUT_WRITE.
1038       shutdownFlags_ |= SHUT_WRITE_PENDING;
1039
1040       // Immediately fail all write requests
1041       failAllWrites(socketShutdownForWritesEx);
1042       return;
1043     }
1044     case StateEnum::UNINIT:
1045       // Callers normally shouldn't call shutdownWriteNow() before the socket
1046       // even starts connecting.  Nonetheless, go ahead and set
1047       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1048       // immediately shut down the write side of the socket.
1049       shutdownFlags_ |= SHUT_WRITE_PENDING;
1050       return;
1051     case StateEnum::FAST_OPEN:
1052       // In fast open state we haven't call connected yet, and if we shutdown
1053       // the writes, we will never try to call connect, so shut everything down
1054       shutdownFlags_ |= SHUT_WRITE;
1055       // Immediately fail all write requests
1056       failAllWrites(socketShutdownForWritesEx);
1057       return;
1058     case StateEnum::CLOSED:
1059     case StateEnum::ERROR:
1060       // We should never get here.  SHUT_WRITE should always be set
1061       // in STATE_CLOSED and STATE_ERROR.
1062       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1063                  << ", fd=" << fd_ << ") in unexpected state " << state_
1064                  << " with SHUT_WRITE not set ("
1065                  << std::hex << (int) shutdownFlags_ << ")";
1066       assert(false);
1067       return;
1068   }
1069
1070   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1071               << fd_ << ") called in unknown state " << state_;
1072 }
1073
1074 bool AsyncSocket::readable() const {
1075   if (fd_ == -1) {
1076     return false;
1077   }
1078   struct pollfd fds[1];
1079   fds[0].fd = fd_;
1080   fds[0].events = POLLIN;
1081   fds[0].revents = 0;
1082   int rc = poll(fds, 1, 0);
1083   return rc == 1;
1084 }
1085
1086 bool AsyncSocket::isPending() const {
1087   return ioHandler_.isPending();
1088 }
1089
1090 bool AsyncSocket::hangup() const {
1091   if (fd_ == -1) {
1092     // sanity check, no one should ask for hangup if we are not connected.
1093     assert(false);
1094     return false;
1095   }
1096 #ifdef POLLRDHUP // Linux-only
1097   struct pollfd fds[1];
1098   fds[0].fd = fd_;
1099   fds[0].events = POLLRDHUP|POLLHUP;
1100   fds[0].revents = 0;
1101   poll(fds, 1, 0);
1102   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1103 #else
1104   return false;
1105 #endif
1106 }
1107
1108 bool AsyncSocket::good() const {
1109   return (
1110       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1111        state_ == StateEnum::ESTABLISHED) &&
1112       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1113 }
1114
1115 bool AsyncSocket::error() const {
1116   return (state_ == StateEnum::ERROR);
1117 }
1118
1119 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1120   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1121           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1122           << ", state=" << state_ << ", events="
1123           << std::hex << eventFlags_ << ")";
1124   assert(eventBase_ == nullptr);
1125   assert(eventBase->isInEventBaseThread());
1126
1127   eventBase_ = eventBase;
1128   ioHandler_.attachEventBase(eventBase);
1129   writeTimeout_.attachEventBase(eventBase);
1130   if (evbChangeCb_) {
1131     evbChangeCb_->evbAttached(this);
1132   }
1133 }
1134
1135 void AsyncSocket::detachEventBase() {
1136   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1137           << ", old evb=" << eventBase_ << ", state=" << state_
1138           << ", events=" << std::hex << eventFlags_ << ")";
1139   assert(eventBase_ != nullptr);
1140   assert(eventBase_->isInEventBaseThread());
1141
1142   eventBase_ = nullptr;
1143   ioHandler_.detachEventBase();
1144   writeTimeout_.detachEventBase();
1145   if (evbChangeCb_) {
1146     evbChangeCb_->evbDetached(this);
1147   }
1148 }
1149
1150 bool AsyncSocket::isDetachable() const {
1151   DCHECK(eventBase_ != nullptr);
1152   DCHECK(eventBase_->isInEventBaseThread());
1153
1154   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1155 }
1156
1157 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1158   if (!localAddr_.isInitialized()) {
1159     localAddr_.setFromLocalAddress(fd_);
1160   }
1161   *address = localAddr_;
1162 }
1163
1164 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1165   if (!addr_.isInitialized()) {
1166     addr_.setFromPeerAddress(fd_);
1167   }
1168   *address = addr_;
1169 }
1170
1171 bool AsyncSocket::getTFOSucceded() const {
1172   return detail::tfo_succeeded(fd_);
1173 }
1174
1175 int AsyncSocket::setNoDelay(bool noDelay) {
1176   if (fd_ < 0) {
1177     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1178                << this << "(state=" << state_ << ")";
1179     return EINVAL;
1180
1181   }
1182
1183   int value = noDelay ? 1 : 0;
1184   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1185     int errnoCopy = errno;
1186     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1187             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1188             << strerror(errnoCopy);
1189     return errnoCopy;
1190   }
1191
1192   return 0;
1193 }
1194
1195 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1196
1197   #ifndef TCP_CONGESTION
1198   #define TCP_CONGESTION  13
1199   #endif
1200
1201   if (fd_ < 0) {
1202     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1203                << "socket " << this << "(state=" << state_ << ")";
1204     return EINVAL;
1205
1206   }
1207
1208   if (setsockopt(
1209           fd_,
1210           IPPROTO_TCP,
1211           TCP_CONGESTION,
1212           cname.c_str(),
1213           socklen_t(cname.length() + 1)) != 0) {
1214     int errnoCopy = errno;
1215     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1216             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1217             << strerror(errnoCopy);
1218     return errnoCopy;
1219   }
1220
1221   return 0;
1222 }
1223
1224 int AsyncSocket::setQuickAck(bool quickack) {
1225   if (fd_ < 0) {
1226     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1227                << this << "(state=" << state_ << ")";
1228     return EINVAL;
1229
1230   }
1231
1232 #ifdef TCP_QUICKACK // Linux-only
1233   int value = quickack ? 1 : 0;
1234   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1235     int errnoCopy = errno;
1236     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1237             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1238             << strerror(errnoCopy);
1239     return errnoCopy;
1240   }
1241
1242   return 0;
1243 #else
1244   return ENOSYS;
1245 #endif
1246 }
1247
1248 int AsyncSocket::setSendBufSize(size_t bufsize) {
1249   if (fd_ < 0) {
1250     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1251                << this << "(state=" << state_ << ")";
1252     return EINVAL;
1253   }
1254
1255   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1256     int errnoCopy = errno;
1257     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1258             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1259             << strerror(errnoCopy);
1260     return errnoCopy;
1261   }
1262
1263   return 0;
1264 }
1265
1266 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1267   if (fd_ < 0) {
1268     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1269                << this << "(state=" << state_ << ")";
1270     return EINVAL;
1271   }
1272
1273   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1274     int errnoCopy = errno;
1275     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1276             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1277             << strerror(errnoCopy);
1278     return errnoCopy;
1279   }
1280
1281   return 0;
1282 }
1283
1284 int AsyncSocket::setTCPProfile(int profd) {
1285   if (fd_ < 0) {
1286     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1287                << this << "(state=" << state_ << ")";
1288     return EINVAL;
1289   }
1290
1291   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1292     int errnoCopy = errno;
1293     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1294             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1295             << strerror(errnoCopy);
1296     return errnoCopy;
1297   }
1298
1299   return 0;
1300 }
1301
1302 void AsyncSocket::ioReady(uint16_t events) noexcept {
1303   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1304           << ", events=" << std::hex << events << ", state=" << state_;
1305   DestructorGuard dg(this);
1306   assert(events & EventHandler::READ_WRITE);
1307   assert(eventBase_->isInEventBaseThread());
1308
1309   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1310   if (relevantEvents == EventHandler::READ) {
1311     handleRead();
1312   } else if (relevantEvents == EventHandler::WRITE) {
1313     handleWrite();
1314   } else if (relevantEvents == EventHandler::READ_WRITE) {
1315     EventBase* originalEventBase = eventBase_;
1316     // If both read and write events are ready, process writes first.
1317     handleWrite();
1318
1319     // Return now if handleWrite() detached us from our EventBase
1320     if (eventBase_ != originalEventBase) {
1321       return;
1322     }
1323
1324     // Only call handleRead() if a read callback is still installed.
1325     // (It's possible that the read callback was uninstalled during
1326     // handleWrite().)
1327     if (readCallback_) {
1328       handleRead();
1329     }
1330   } else {
1331     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1332                << std::hex << events << "(this=" << this << ")";
1333     abort();
1334   }
1335 }
1336
1337 AsyncSocket::ReadResult
1338 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1339   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1340           << ", buflen=" << *buflen;
1341
1342   int recvFlags = 0;
1343   if (peek_) {
1344     recvFlags |= MSG_PEEK;
1345   }
1346
1347   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1348   if (bytes < 0) {
1349     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1350       // No more data to read right now.
1351       return ReadResult(READ_BLOCKING);
1352     } else {
1353       return ReadResult(READ_ERROR);
1354     }
1355   } else {
1356     appBytesReceived_ += bytes;
1357     return ReadResult(bytes);
1358   }
1359 }
1360
1361 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1362   // no matter what, buffer should be preapared for non-ssl socket
1363   CHECK(readCallback_);
1364   readCallback_->getReadBuffer(buf, buflen);
1365 }
1366
1367 void AsyncSocket::handleRead() noexcept {
1368   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1369           << ", state=" << state_;
1370   assert(state_ == StateEnum::ESTABLISHED);
1371   assert((shutdownFlags_ & SHUT_READ) == 0);
1372   assert(readCallback_ != nullptr);
1373   assert(eventFlags_ & EventHandler::READ);
1374
1375   // Loop until:
1376   // - a read attempt would block
1377   // - readCallback_ is uninstalled
1378   // - the number of loop iterations exceeds the optional maximum
1379   // - this AsyncSocket is moved to another EventBase
1380   //
1381   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1382   // which is why need to check for it here.
1383   //
1384   // The last bullet point is slightly subtle.  readDataAvailable() may also
1385   // detach this socket from this EventBase.  However, before
1386   // readDataAvailable() returns another thread may pick it up, attach it to
1387   // a different EventBase, and install another readCallback_.  We need to
1388   // exit immediately after readDataAvailable() returns if the eventBase_ has
1389   // changed.  (The caller must perform some sort of locking to transfer the
1390   // AsyncSocket between threads properly.  This will be sufficient to ensure
1391   // that this thread sees the updated eventBase_ variable after
1392   // readDataAvailable() returns.)
1393   uint16_t numReads = 0;
1394   EventBase* originalEventBase = eventBase_;
1395   while (readCallback_ && eventBase_ == originalEventBase) {
1396     // Get the buffer to read into.
1397     void* buf = nullptr;
1398     size_t buflen = 0, offset = 0;
1399     try {
1400       prepareReadBuffer(&buf, &buflen);
1401       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1402     } catch (const AsyncSocketException& ex) {
1403       return failRead(__func__, ex);
1404     } catch (const std::exception& ex) {
1405       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1406                               string("ReadCallback::getReadBuffer() "
1407                                      "threw exception: ") +
1408                               ex.what());
1409       return failRead(__func__, tex);
1410     } catch (...) {
1411       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1412                              "ReadCallback::getReadBuffer() threw "
1413                              "non-exception type");
1414       return failRead(__func__, ex);
1415     }
1416     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1417       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1418                              "ReadCallback::getReadBuffer() returned "
1419                              "empty buffer");
1420       return failRead(__func__, ex);
1421     }
1422
1423     // Perform the read
1424     auto readResult = performRead(&buf, &buflen, &offset);
1425     auto bytesRead = readResult.readReturn;
1426     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1427             << bytesRead << " bytes";
1428     if (bytesRead > 0) {
1429       if (!isBufferMovable_) {
1430         readCallback_->readDataAvailable(size_t(bytesRead));
1431       } else {
1432         CHECK(kOpenSslModeMoveBufferOwnership);
1433         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1434                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1435                 << ", offset=" << offset;
1436         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1437         readBuf->trimStart(offset);
1438         readBuf->trimEnd(buflen - offset - bytesRead);
1439         readCallback_->readBufferAvailable(std::move(readBuf));
1440       }
1441
1442       // Fall through and continue around the loop if the read
1443       // completely filled the available buffer.
1444       // Note that readCallback_ may have been uninstalled or changed inside
1445       // readDataAvailable().
1446       if (size_t(bytesRead) < buflen) {
1447         return;
1448       }
1449     } else if (bytesRead == READ_BLOCKING) {
1450         // No more data to read right now.
1451         return;
1452     } else if (bytesRead == READ_ERROR) {
1453       readErr_ = READ_ERROR;
1454       if (readResult.exception) {
1455         return failRead(__func__, *readResult.exception);
1456       }
1457       auto errnoCopy = errno;
1458       AsyncSocketException ex(
1459           AsyncSocketException::INTERNAL_ERROR,
1460           withAddr("recv() failed"),
1461           errnoCopy);
1462       return failRead(__func__, ex);
1463     } else {
1464       assert(bytesRead == READ_EOF);
1465       readErr_ = READ_EOF;
1466       // EOF
1467       shutdownFlags_ |= SHUT_READ;
1468       if (!updateEventRegistration(0, EventHandler::READ)) {
1469         // we've already been moved into STATE_ERROR
1470         assert(state_ == StateEnum::ERROR);
1471         assert(readCallback_ == nullptr);
1472         return;
1473       }
1474
1475       ReadCallback* callback = readCallback_;
1476       readCallback_ = nullptr;
1477       callback->readEOF();
1478       return;
1479     }
1480     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1481       if (readCallback_ != nullptr) {
1482         // We might still have data in the socket.
1483         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1484         scheduleImmediateRead();
1485       }
1486       return;
1487     }
1488   }
1489 }
1490
1491 /**
1492  * This function attempts to write as much data as possible, until no more data
1493  * can be written.
1494  *
1495  * - If it sends all available data, it unregisters for write events, and stops
1496  *   the writeTimeout_.
1497  *
1498  * - If not all of the data can be sent immediately, it reschedules
1499  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1500  *   registered for write events.
1501  */
1502 void AsyncSocket::handleWrite() noexcept {
1503   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1504           << ", state=" << state_;
1505   DestructorGuard dg(this);
1506
1507   if (state_ == StateEnum::CONNECTING) {
1508     handleConnect();
1509     return;
1510   }
1511
1512   // Normal write
1513   assert(state_ == StateEnum::ESTABLISHED);
1514   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1515   assert(writeReqHead_ != nullptr);
1516
1517   // Loop until we run out of write requests,
1518   // or until this socket is moved to another EventBase.
1519   // (See the comment in handleRead() explaining how this can happen.)
1520   EventBase* originalEventBase = eventBase_;
1521   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1522     auto writeResult = writeReqHead_->performWrite();
1523     if (writeResult.writeReturn < 0) {
1524       if (writeResult.exception) {
1525         return failWrite(__func__, *writeResult.exception);
1526       }
1527       auto errnoCopy = errno;
1528       AsyncSocketException ex(
1529           AsyncSocketException::INTERNAL_ERROR,
1530           withAddr("writev() failed"),
1531           errnoCopy);
1532       return failWrite(__func__, ex);
1533     } else if (writeReqHead_->isComplete()) {
1534       // We finished this request
1535       WriteRequest* req = writeReqHead_;
1536       writeReqHead_ = req->getNext();
1537
1538       if (writeReqHead_ == nullptr) {
1539         writeReqTail_ = nullptr;
1540         // This is the last write request.
1541         // Unregister for write events and cancel the send timer
1542         // before we invoke the callback.  We have to update the state properly
1543         // before calling the callback, since it may want to detach us from
1544         // the EventBase.
1545         if (eventFlags_ & EventHandler::WRITE) {
1546           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1547             assert(state_ == StateEnum::ERROR);
1548             return;
1549           }
1550           // Stop the send timeout
1551           writeTimeout_.cancelTimeout();
1552         }
1553         assert(!writeTimeout_.isScheduled());
1554
1555         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1556         // we finish sending the last write request.
1557         //
1558         // We have to do this before invoking writeSuccess(), since
1559         // writeSuccess() may detach us from our EventBase.
1560         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1561           assert(connectCallback_ == nullptr);
1562           shutdownFlags_ |= SHUT_WRITE;
1563
1564           if (shutdownFlags_ & SHUT_READ) {
1565             // Reads have already been shutdown.  Fully close the socket and
1566             // move to STATE_CLOSED.
1567             //
1568             // Note: This code currently moves us to STATE_CLOSED even if
1569             // close() hasn't ever been called.  This can occur if we have
1570             // received EOF from the peer and shutdownWrite() has been called
1571             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1572             // case, until close() is actually called?  I can't think of a
1573             // reason why we would need to do so.  No other operations besides
1574             // calling close() or destroying the socket can be performed at
1575             // this point.
1576             assert(readCallback_ == nullptr);
1577             state_ = StateEnum::CLOSED;
1578             if (fd_ >= 0) {
1579               ioHandler_.changeHandlerFD(-1);
1580               doClose();
1581             }
1582           } else {
1583             // Reads are still enabled, so we are only doing a half-shutdown
1584             shutdown(fd_, SHUT_WR);
1585           }
1586         }
1587       }
1588
1589       // Invoke the callback
1590       WriteCallback* callback = req->getCallback();
1591       req->destroy();
1592       if (callback) {
1593         callback->writeSuccess();
1594       }
1595       // We'll continue around the loop, trying to write another request
1596     } else {
1597       // Partial write.
1598       if (bufferCallback_) {
1599         bufferCallback_->onEgressBuffered();
1600       }
1601       writeReqHead_->consume();
1602       // Stop after a partial write; it's highly likely that a subsequent write
1603       // attempt will just return EAGAIN.
1604       //
1605       // Ensure that we are registered for write events.
1606       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1607         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1608           assert(state_ == StateEnum::ERROR);
1609           return;
1610         }
1611       }
1612
1613       // Reschedule the send timeout, since we have made some write progress.
1614       if (sendTimeout_ > 0) {
1615         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1616           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1617               withAddr("failed to reschedule write timeout"));
1618           return failWrite(__func__, ex);
1619         }
1620       }
1621       return;
1622     }
1623   }
1624   if (!writeReqHead_ && bufferCallback_) {
1625     bufferCallback_->onEgressBufferCleared();
1626   }
1627 }
1628
1629 void AsyncSocket::checkForImmediateRead() noexcept {
1630   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1631   // (However, note that some subclasses do override this method.)
1632   //
1633   // Simply calling handleRead() here would be bad, as this would call
1634   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1635   // buffer even though no data may be available.  This would waste lots of
1636   // memory, since the buffer will sit around unused until the socket actually
1637   // becomes readable.
1638   //
1639   // Checking if the socket is readable now also seems like it would probably
1640   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1641   // would just waste an extra system call.  Even if it is readable, waiting to
1642   // find out from libevent on the next event loop doesn't seem that bad.
1643 }
1644
1645 void AsyncSocket::handleInitialReadWrite() noexcept {
1646   // Our callers should already be holding a DestructorGuard, but grab
1647   // one here just to make sure, in case one of our calling code paths ever
1648   // changes.
1649   DestructorGuard dg(this);
1650   // If we have a readCallback_, make sure we enable read events.  We
1651   // may already be registered for reads if connectSuccess() set
1652   // the read calback.
1653   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1654     assert(state_ == StateEnum::ESTABLISHED);
1655     assert((shutdownFlags_ & SHUT_READ) == 0);
1656     if (!updateEventRegistration(EventHandler::READ, 0)) {
1657       assert(state_ == StateEnum::ERROR);
1658       return;
1659     }
1660     checkForImmediateRead();
1661   } else if (readCallback_ == nullptr) {
1662     // Unregister for read events.
1663     updateEventRegistration(0, EventHandler::READ);
1664   }
1665
1666   // If we have write requests pending, try to send them immediately.
1667   // Since we just finished accepting, there is a very good chance that we can
1668   // write without blocking.
1669   //
1670   // However, we only process them if EventHandler::WRITE is not already set,
1671   // which means that we're already blocked on a write attempt.  (This can
1672   // happen if connectSuccess() called write() before returning.)
1673   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1674     // Call handleWrite() to perform write processing.
1675     handleWrite();
1676   } else if (writeReqHead_ == nullptr) {
1677     // Unregister for write event.
1678     updateEventRegistration(0, EventHandler::WRITE);
1679   }
1680 }
1681
1682 void AsyncSocket::handleConnect() noexcept {
1683   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1684           << ", state=" << state_;
1685   assert(state_ == StateEnum::CONNECTING);
1686   // SHUT_WRITE can never be set while we are still connecting;
1687   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1688   // finishes
1689   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1690
1691   // In case we had a connect timeout, cancel the timeout
1692   writeTimeout_.cancelTimeout();
1693   // We don't use a persistent registration when waiting on a connect event,
1694   // so we have been automatically unregistered now.  Update eventFlags_ to
1695   // reflect reality.
1696   assert(eventFlags_ == EventHandler::WRITE);
1697   eventFlags_ = EventHandler::NONE;
1698
1699   // Call getsockopt() to check if the connect succeeded
1700   int error;
1701   socklen_t len = sizeof(error);
1702   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1703   if (rv != 0) {
1704     auto errnoCopy = errno;
1705     AsyncSocketException ex(
1706         AsyncSocketException::INTERNAL_ERROR,
1707         withAddr("error calling getsockopt() after connect"),
1708         errnoCopy);
1709     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1710                << fd_ << " host=" << addr_.describe()
1711                << ") exception:" << ex.what();
1712     return failConnect(__func__, ex);
1713   }
1714
1715   if (error != 0) {
1716     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1717                            "connect failed", error);
1718     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1719             << fd_ << " host=" << addr_.describe()
1720             << ") exception: " << ex.what();
1721     return failConnect(__func__, ex);
1722   }
1723
1724   // Move into STATE_ESTABLISHED
1725   state_ = StateEnum::ESTABLISHED;
1726
1727   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1728   // perform, immediately shutdown the write half of the socket.
1729   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1730     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1731     // are still connecting we just abort the connect rather than waiting for
1732     // it to complete.
1733     assert((shutdownFlags_ & SHUT_READ) == 0);
1734     shutdown(fd_, SHUT_WR);
1735     shutdownFlags_ |= SHUT_WRITE;
1736   }
1737
1738   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1739           << "successfully connected; state=" << state_;
1740
1741   // Remember the EventBase we are attached to, before we start invoking any
1742   // callbacks (since the callbacks may call detachEventBase()).
1743   EventBase* originalEventBase = eventBase_;
1744
1745   invokeConnectSuccess();
1746   // Note that the connect callback may have changed our state.
1747   // (set or unset the read callback, called write(), closed the socket, etc.)
1748   // The following code needs to handle these situations correctly.
1749   //
1750   // If the socket has been closed, readCallback_ and writeReqHead_ will
1751   // always be nullptr, so that will prevent us from trying to read or write.
1752   //
1753   // The main thing to check for is if eventBase_ is still originalEventBase.
1754   // If not, we have been detached from this event base, so we shouldn't
1755   // perform any more operations.
1756   if (eventBase_ != originalEventBase) {
1757     return;
1758   }
1759
1760   handleInitialReadWrite();
1761 }
1762
1763 void AsyncSocket::timeoutExpired() noexcept {
1764   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1765           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1766   DestructorGuard dg(this);
1767   assert(eventBase_->isInEventBaseThread());
1768
1769   if (state_ == StateEnum::CONNECTING) {
1770     // connect() timed out
1771     // Unregister for I/O events.
1772     if (connectCallback_) {
1773       AsyncSocketException ex(
1774           AsyncSocketException::TIMED_OUT,
1775           folly::sformat(
1776               "connect timed out after {}ms", connectTimeout_.count()));
1777       failConnect(__func__, ex);
1778     } else {
1779       // we faced a connect error without a connect callback, which could
1780       // happen due to TFO.
1781       AsyncSocketException ex(
1782           AsyncSocketException::TIMED_OUT, "write timed out during connection");
1783       failWrite(__func__, ex);
1784     }
1785   } else {
1786     // a normal write operation timed out
1787     AsyncSocketException ex(
1788         AsyncSocketException::TIMED_OUT,
1789         folly::sformat("write timed out after {}ms", sendTimeout_));
1790     failWrite(__func__, ex);
1791   }
1792 }
1793
1794 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
1795   return detail::tfo_sendmsg(fd, msg, msg_flags);
1796 }
1797
1798 AsyncSocket::WriteResult
1799 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
1800   ssize_t totalWritten = 0;
1801   if (state_ == StateEnum::FAST_OPEN) {
1802     sockaddr_storage addr;
1803     auto len = addr_.getAddress(&addr);
1804     msg->msg_name = &addr;
1805     msg->msg_namelen = len;
1806     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
1807     if (totalWritten >= 0) {
1808       tfoFinished_ = true;
1809       state_ = StateEnum::ESTABLISHED;
1810       // We schedule this asynchrously so that we don't end up
1811       // invoking initial read or write while a write is in progress.
1812       scheduleInitialReadWrite();
1813     } else if (errno == EINPROGRESS) {
1814       VLOG(4) << "TFO falling back to connecting";
1815       // A normal sendmsg doesn't return EINPROGRESS, however
1816       // TFO might fallback to connecting if there is no
1817       // cookie.
1818       state_ = StateEnum::CONNECTING;
1819       try {
1820         scheduleConnectTimeout();
1821         registerForConnectEvents();
1822       } catch (const AsyncSocketException& ex) {
1823         return WriteResult(
1824             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1825       }
1826       // Let's fake it that no bytes were written and return an errno.
1827       errno = EAGAIN;
1828       totalWritten = -1;
1829     } else if (errno == EOPNOTSUPP) {
1830       // Try falling back to connecting.
1831       VLOG(4) << "TFO not supported";
1832       state_ = StateEnum::CONNECTING;
1833       try {
1834         int ret = socketConnect((const sockaddr*)&addr, len);
1835         if (ret == 0) {
1836           // connect succeeded immediately
1837           // Treat this like no data was written.
1838           state_ = StateEnum::ESTABLISHED;
1839           scheduleInitialReadWrite();
1840         }
1841         // If there was no exception during connections,
1842         // we would return that no bytes were written.
1843         errno = EAGAIN;
1844         totalWritten = -1;
1845       } catch (const AsyncSocketException& ex) {
1846         return WriteResult(
1847             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1848       }
1849     } else if (errno == EAGAIN) {
1850       // Normally sendmsg would indicate that the write would block.
1851       // However in the fast open case, it would indicate that sendmsg
1852       // fell back to a connect. This is a return code from connect()
1853       // instead, and is an error condition indicating no fds available.
1854       return WriteResult(
1855           WRITE_ERROR,
1856           folly::make_unique<AsyncSocketException>(
1857               AsyncSocketException::UNKNOWN, "No more free local ports"));
1858     }
1859   } else {
1860     totalWritten = ::sendmsg(fd, msg, msg_flags);
1861   }
1862   return WriteResult(totalWritten);
1863 }
1864
1865 AsyncSocket::WriteResult AsyncSocket::performWrite(
1866     const iovec* vec,
1867     uint32_t count,
1868     WriteFlags flags,
1869     uint32_t* countWritten,
1870     uint32_t* partialWritten) {
1871   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1872   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1873   // (since it may terminate the program if the main program doesn't explicitly
1874   // ignore it).
1875   struct msghdr msg;
1876   msg.msg_name = nullptr;
1877   msg.msg_namelen = 0;
1878   msg.msg_iov = const_cast<iovec *>(vec);
1879   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1880   msg.msg_control = nullptr;
1881   msg.msg_controllen = 0;
1882   msg.msg_flags = 0;
1883
1884   int msg_flags = MSG_DONTWAIT;
1885
1886 #ifdef MSG_NOSIGNAL // Linux-only
1887   msg_flags |= MSG_NOSIGNAL;
1888   if (isSet(flags, WriteFlags::CORK)) {
1889     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1890     // give it the rest of the data rather than immediately sending a partial
1891     // frame, even when TCP_NODELAY is enabled.
1892     msg_flags |= MSG_MORE;
1893   }
1894 #endif
1895   if (isSet(flags, WriteFlags::EOR)) {
1896     // marks that this is the last byte of a record (response)
1897     msg_flags |= MSG_EOR;
1898   }
1899   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
1900   auto totalWritten = writeResult.writeReturn;
1901   if (totalWritten < 0) {
1902     bool tryAgain = (errno == EAGAIN);
1903 #ifdef __APPLE__
1904     // Apple has a bug where doing a second write on a socket which we
1905     // have opened with TFO causes an ENOTCONN to be thrown. However the
1906     // socket is really connected, so treat ENOTCONN as a EAGAIN until
1907     // this bug is fixed.
1908     tryAgain |= (errno == ENOTCONN);
1909 #endif
1910     if (!writeResult.exception && tryAgain) {
1911       // TCP buffer is full; we can't write any more data right now.
1912       *countWritten = 0;
1913       *partialWritten = 0;
1914       return WriteResult(0);
1915     }
1916     // error
1917     *countWritten = 0;
1918     *partialWritten = 0;
1919     return writeResult;
1920   }
1921
1922   appBytesWritten_ += totalWritten;
1923
1924   uint32_t bytesWritten;
1925   uint32_t n;
1926   for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
1927     const iovec* v = vec + n;
1928     if (v->iov_len > bytesWritten) {
1929       // Partial write finished in the middle of this iovec
1930       *countWritten = n;
1931       *partialWritten = bytesWritten;
1932       return WriteResult(totalWritten);
1933     }
1934
1935     bytesWritten -= uint32_t(v->iov_len);
1936   }
1937
1938   assert(bytesWritten == 0);
1939   *countWritten = n;
1940   *partialWritten = 0;
1941   return WriteResult(totalWritten);
1942 }
1943
1944 /**
1945  * Re-register the EventHandler after eventFlags_ has changed.
1946  *
1947  * If an error occurs, fail() is called to move the socket into the error state
1948  * and call all currently installed callbacks.  After an error, the
1949  * AsyncSocket is completely unregistered.
1950  *
1951  * @return Returns true on succcess, or false on error.
1952  */
1953 bool AsyncSocket::updateEventRegistration() {
1954   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1955           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1956           << ", events=" << std::hex << eventFlags_;
1957   assert(eventBase_->isInEventBaseThread());
1958   if (eventFlags_ == EventHandler::NONE) {
1959     ioHandler_.unregisterHandler();
1960     return true;
1961   }
1962
1963   // Always register for persistent events, so we don't have to re-register
1964   // after being called back.
1965   if (!ioHandler_.registerHandler(
1966           uint16_t(eventFlags_ | EventHandler::PERSIST))) {
1967     eventFlags_ = EventHandler::NONE; // we're not registered after error
1968     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1969         withAddr("failed to update AsyncSocket event registration"));
1970     fail("updateEventRegistration", ex);
1971     return false;
1972   }
1973
1974   return true;
1975 }
1976
1977 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1978                                            uint16_t disable) {
1979   uint16_t oldFlags = eventFlags_;
1980   eventFlags_ |= enable;
1981   eventFlags_ &= ~disable;
1982   if (eventFlags_ == oldFlags) {
1983     return true;
1984   } else {
1985     return updateEventRegistration();
1986   }
1987 }
1988
1989 void AsyncSocket::startFail() {
1990   // startFail() should only be called once
1991   assert(state_ != StateEnum::ERROR);
1992   assert(getDestructorGuardCount() > 0);
1993   state_ = StateEnum::ERROR;
1994   // Ensure that SHUT_READ and SHUT_WRITE are set,
1995   // so all future attempts to read or write will be rejected
1996   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1997
1998   if (eventFlags_ != EventHandler::NONE) {
1999     eventFlags_ = EventHandler::NONE;
2000     ioHandler_.unregisterHandler();
2001   }
2002   writeTimeout_.cancelTimeout();
2003
2004   if (fd_ >= 0) {
2005     ioHandler_.changeHandlerFD(-1);
2006     doClose();
2007   }
2008 }
2009
2010 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
2011   invokeConnectErr(ex);
2012   failAllWrites(ex);
2013
2014   if (readCallback_) {
2015     ReadCallback* callback = readCallback_;
2016     readCallback_ = nullptr;
2017     callback->readErr(ex);
2018   }
2019 }
2020
2021 void AsyncSocket::finishFail() {
2022   assert(state_ == StateEnum::ERROR);
2023   assert(getDestructorGuardCount() > 0);
2024
2025   AsyncSocketException ex(
2026       AsyncSocketException::INTERNAL_ERROR,
2027       withAddr("socket closing after error"));
2028   invokeAllErrors(ex);
2029 }
2030
2031 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2032   assert(state_ == StateEnum::ERROR);
2033   assert(getDestructorGuardCount() > 0);
2034   invokeAllErrors(ex);
2035 }
2036
2037 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2038   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2039              << state_ << " host=" << addr_.describe()
2040              << "): failed in " << fn << "(): "
2041              << ex.what();
2042   startFail();
2043   finishFail();
2044 }
2045
2046 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2047   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2048                << state_ << " host=" << addr_.describe()
2049                << "): failed while connecting in " << fn << "(): "
2050                << ex.what();
2051   startFail();
2052
2053   invokeConnectErr(ex);
2054   finishFail(ex);
2055 }
2056
2057 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2058   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2059                << state_ << " host=" << addr_.describe()
2060                << "): failed while reading in " << fn << "(): "
2061                << ex.what();
2062   startFail();
2063
2064   if (readCallback_ != nullptr) {
2065     ReadCallback* callback = readCallback_;
2066     readCallback_ = nullptr;
2067     callback->readErr(ex);
2068   }
2069
2070   finishFail();
2071 }
2072
2073 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2074   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2075                << state_ << " host=" << addr_.describe()
2076                << "): failed while writing in " << fn << "(): "
2077                << ex.what();
2078   startFail();
2079
2080   // Only invoke the first write callback, since the error occurred while
2081   // writing this request.  Let any other pending write callbacks be invoked in
2082   // finishFail().
2083   if (writeReqHead_ != nullptr) {
2084     WriteRequest* req = writeReqHead_;
2085     writeReqHead_ = req->getNext();
2086     WriteCallback* callback = req->getCallback();
2087     uint32_t bytesWritten = req->getTotalBytesWritten();
2088     req->destroy();
2089     if (callback) {
2090       callback->writeErr(bytesWritten, ex);
2091     }
2092   }
2093
2094   finishFail();
2095 }
2096
2097 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2098                              size_t bytesWritten,
2099                              const AsyncSocketException& ex) {
2100   // This version of failWrite() is used when the failure occurs before
2101   // we've added the callback to writeReqHead_.
2102   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2103              << state_ << " host=" << addr_.describe()
2104              <<"): failed while writing in " << fn << "(): "
2105              << ex.what();
2106   startFail();
2107
2108   if (callback != nullptr) {
2109     callback->writeErr(bytesWritten, ex);
2110   }
2111
2112   finishFail();
2113 }
2114
2115 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2116   // Invoke writeError() on all write callbacks.
2117   // This is used when writes are forcibly shutdown with write requests
2118   // pending, or when an error occurs with writes pending.
2119   while (writeReqHead_ != nullptr) {
2120     WriteRequest* req = writeReqHead_;
2121     writeReqHead_ = req->getNext();
2122     WriteCallback* callback = req->getCallback();
2123     if (callback) {
2124       callback->writeErr(req->getTotalBytesWritten(), ex);
2125     }
2126     req->destroy();
2127   }
2128 }
2129
2130 void AsyncSocket::invalidState(ConnectCallback* callback) {
2131   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2132              << "): connect() called in invalid state " << state_;
2133
2134   /*
2135    * The invalidState() methods don't use the normal failure mechanisms,
2136    * since we don't know what state we are in.  We don't want to call
2137    * startFail()/finishFail() recursively if we are already in the middle of
2138    * cleaning up.
2139    */
2140
2141   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2142                          "connect() called with socket in invalid state");
2143   connectEndTime_ = std::chrono::steady_clock::now();
2144   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2145     if (callback) {
2146       callback->connectErr(ex);
2147     }
2148   } else {
2149     // We can't use failConnect() here since connectCallback_
2150     // may already be set to another callback.  Invoke this ConnectCallback
2151     // here; any other connectCallback_ will be invoked in finishFail()
2152     startFail();
2153     if (callback) {
2154       callback->connectErr(ex);
2155     }
2156     finishFail();
2157   }
2158 }
2159
2160 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2161   connectEndTime_ = std::chrono::steady_clock::now();
2162   if (connectCallback_) {
2163     ConnectCallback* callback = connectCallback_;
2164     connectCallback_ = nullptr;
2165     callback->connectErr(ex);
2166   }
2167 }
2168
2169 void AsyncSocket::invokeConnectSuccess() {
2170   connectEndTime_ = std::chrono::steady_clock::now();
2171   if (connectCallback_) {
2172     ConnectCallback* callback = connectCallback_;
2173     connectCallback_ = nullptr;
2174     callback->connectSuccess();
2175   }
2176 }
2177
2178 void AsyncSocket::invalidState(ReadCallback* callback) {
2179   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2180              << "): setReadCallback(" << callback
2181              << ") called in invalid state " << state_;
2182
2183   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2184                          "setReadCallback() called with socket in "
2185                          "invalid state");
2186   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2187     if (callback) {
2188       callback->readErr(ex);
2189     }
2190   } else {
2191     startFail();
2192     if (callback) {
2193       callback->readErr(ex);
2194     }
2195     finishFail();
2196   }
2197 }
2198
2199 void AsyncSocket::invalidState(WriteCallback* callback) {
2200   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2201              << "): write() called in invalid state " << state_;
2202
2203   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2204                          withAddr("write() called with socket in invalid state"));
2205   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2206     if (callback) {
2207       callback->writeErr(0, ex);
2208     }
2209   } else {
2210     startFail();
2211     if (callback) {
2212       callback->writeErr(0, ex);
2213     }
2214     finishFail();
2215   }
2216 }
2217
2218 void AsyncSocket::doClose() {
2219   if (fd_ == -1) return;
2220   if (shutdownSocketSet_) {
2221     shutdownSocketSet_->close(fd_);
2222   } else {
2223     ::close(fd_);
2224   }
2225   fd_ = -1;
2226 }
2227
2228 std::ostream& operator << (std::ostream& os,
2229                            const AsyncSocket::StateEnum& state) {
2230   os << static_cast<int>(state);
2231   return os;
2232 }
2233
2234 std::string AsyncSocket::withAddr(const std::string& s) {
2235   // Don't use addr_ directly because it may not be initialized
2236   // e.g. if constructed from fd
2237   folly::SocketAddress peer, local;
2238   try {
2239     getPeerAddress(&peer);
2240     getLocalAddress(&local);
2241   } catch (const std::exception&) {
2242     // ignore
2243   } catch (...) {
2244     // ignore
2245   }
2246   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2247 }
2248
2249 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2250   bufferCallback_ = cb;
2251 }
2252
2253 } // folly