Remove getTFOSucceeded
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/Portability.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/Sockets.h>
25 #include <folly/portability/SysUio.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <boost/preprocessor/control/if.hpp>
33
34 using std::string;
35 using std::unique_ptr;
36
37 namespace fsp = folly::portability::sockets;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   WriteResult performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     return socket_->performWrite(
99         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
100   }
101
102   bool isComplete() override {
103     return opsWritten_ == getOpCount();
104   }
105
106   void consume() override {
107     // Advance opIndex_ forward by opsWritten_
108     opIndex_ += opsWritten_;
109     assert(opIndex_ < opCount_);
110
111     // If we've finished writing any IOBufs, release them
112     if (ioBuf_) {
113       for (uint32_t i = opsWritten_; i != 0; --i) {
114         assert(ioBuf_);
115         ioBuf_ = ioBuf_->pop();
116       }
117     }
118
119     // Move partialBytes_ forward into the current iovec buffer
120     struct iovec* currentOp = writeOps_ + opIndex_;
121     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
122     currentOp->iov_base =
123       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
124     currentOp->iov_len -= partialBytes_;
125
126     // Increment the totalBytesWritten_ count by bytesWritten_;
127     totalBytesWritten_ += bytesWritten_;
128   }
129
130  private:
131   BytesWriteRequest(AsyncSocket* socket,
132                     WriteCallback* callback,
133                     const struct iovec* ops,
134                     uint32_t opCount,
135                     uint32_t partialBytes,
136                     uint32_t bytesWritten,
137                     unique_ptr<IOBuf>&& ioBuf,
138                     WriteFlags flags)
139     : AsyncSocket::WriteRequest(socket, callback)
140     , opCount_(opCount)
141     , opIndex_(0)
142     , flags_(flags)
143     , ioBuf_(std::move(ioBuf))
144     , opsWritten_(0)
145     , partialBytes_(partialBytes)
146     , bytesWritten_(bytesWritten) {
147     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
148   }
149
150   // private destructor, to ensure callers use destroy()
151   ~BytesWriteRequest() override = default;
152
153   const struct iovec* getOps() const {
154     assert(opCount_ > opIndex_);
155     return writeOps_ + opIndex_;
156   }
157
158   uint32_t getOpCount() const {
159     assert(opCount_ > opIndex_);
160     return opCount_ - opIndex_;
161   }
162
163   uint32_t opCount_;            ///< number of entries in writeOps_
164   uint32_t opIndex_;            ///< current index into writeOps_
165   WriteFlags flags_;            ///< set for WriteFlags
166   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
167
168   // for consume(), how much we wrote on the last write
169   uint32_t opsWritten_;         ///< complete ops written
170   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
171   ssize_t bytesWritten_;        ///< bytes written altogether
172
173   struct iovec writeOps_[];     ///< write operation(s) list
174 };
175
176 AsyncSocket::AsyncSocket()
177   : eventBase_(nullptr)
178   , writeTimeout_(this, nullptr)
179   , ioHandler_(this, nullptr)
180   , immediateReadHandler_(this) {
181   VLOG(5) << "new AsyncSocket()";
182   init();
183 }
184
185 AsyncSocket::AsyncSocket(EventBase* evb)
186   : eventBase_(evb)
187   , writeTimeout_(this, evb)
188   , ioHandler_(this, evb)
189   , immediateReadHandler_(this) {
190   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
191   init();
192 }
193
194 AsyncSocket::AsyncSocket(EventBase* evb,
195                            const folly::SocketAddress& address,
196                            uint32_t connectTimeout)
197   : AsyncSocket(evb) {
198   connect(nullptr, address, connectTimeout);
199 }
200
201 AsyncSocket::AsyncSocket(EventBase* evb,
202                            const std::string& ip,
203                            uint16_t port,
204                            uint32_t connectTimeout)
205   : AsyncSocket(evb) {
206   connect(nullptr, ip, port, connectTimeout);
207 }
208
209 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
210   : eventBase_(evb)
211   , writeTimeout_(this, evb)
212   , ioHandler_(this, evb, fd)
213   , immediateReadHandler_(this) {
214   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
215           << fd << ")";
216   init();
217   fd_ = fd;
218   setCloseOnExec();
219   state_ = StateEnum::ESTABLISHED;
220 }
221
222 // init() method, since constructor forwarding isn't supported in most
223 // compilers yet.
224 void AsyncSocket::init() {
225   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
226   shutdownFlags_ = 0;
227   state_ = StateEnum::UNINIT;
228   eventFlags_ = EventHandler::NONE;
229   fd_ = -1;
230   sendTimeout_ = 0;
231   maxReadsPerEvent_ = 16;
232   connectCallback_ = nullptr;
233   readCallback_ = nullptr;
234   writeReqHead_ = nullptr;
235   writeReqTail_ = nullptr;
236   shutdownSocketSet_ = nullptr;
237   appBytesWritten_ = 0;
238   appBytesReceived_ = 0;
239 }
240
241 AsyncSocket::~AsyncSocket() {
242   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
243           << ", evb=" << eventBase_ << ", fd=" << fd_
244           << ", state=" << state_ << ")";
245 }
246
247 void AsyncSocket::destroy() {
248   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
249           << ", fd=" << fd_ << ", state=" << state_;
250   // When destroy is called, close the socket immediately
251   closeNow();
252
253   // Then call DelayedDestruction::destroy() to take care of
254   // whether or not we need immediate or delayed destruction
255   DelayedDestruction::destroy();
256 }
257
258 int AsyncSocket::detachFd() {
259   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
260           << ", evb=" << eventBase_ << ", state=" << state_
261           << ", events=" << std::hex << eventFlags_ << ")";
262   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
263   // actually close the descriptor.
264   if (shutdownSocketSet_) {
265     shutdownSocketSet_->remove(fd_);
266   }
267   int fd = fd_;
268   fd_ = -1;
269   // Call closeNow() to invoke all pending callbacks with an error.
270   closeNow();
271   // Update the EventHandler to stop using this fd.
272   // This can only be done after closeNow() unregisters the handler.
273   ioHandler_.changeHandlerFD(-1);
274   return fd;
275 }
276
277 const folly::SocketAddress& AsyncSocket::anyAddress() {
278   static const folly::SocketAddress anyAddress =
279     folly::SocketAddress("0.0.0.0", 0);
280   return anyAddress;
281 }
282
283 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
284   if (shutdownSocketSet_ == newSS) {
285     return;
286   }
287   if (shutdownSocketSet_ && fd_ != -1) {
288     shutdownSocketSet_->remove(fd_);
289   }
290   shutdownSocketSet_ = newSS;
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->add(fd_);
293   }
294 }
295
296 void AsyncSocket::setCloseOnExec() {
297   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
298   if (rv != 0) {
299     auto errnoCopy = errno;
300     throw AsyncSocketException(
301         AsyncSocketException::INTERNAL_ERROR,
302         withAddr("failed to set close-on-exec flag"),
303         errnoCopy);
304   }
305 }
306
307 void AsyncSocket::connect(ConnectCallback* callback,
308                            const folly::SocketAddress& address,
309                            int timeout,
310                            const OptionMap &options,
311                            const folly::SocketAddress& bindAddr) noexcept {
312   DestructorGuard dg(this);
313   assert(eventBase_->isInEventBaseThread());
314
315   addr_ = address;
316
317   // Make sure we're in the uninitialized state
318   if (state_ != StateEnum::UNINIT) {
319     return invalidState(callback);
320   }
321
322   connectTimeout_ = std::chrono::milliseconds(timeout);
323   connectStartTime_ = std::chrono::steady_clock::now();
324   // Make connect end time at least >= connectStartTime.
325   connectEndTime_ = connectStartTime_;
326
327   assert(fd_ == -1);
328   state_ = StateEnum::CONNECTING;
329   connectCallback_ = callback;
330
331   sockaddr_storage addrStorage;
332   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
333
334   try {
335     // Create the socket
336     // Technically the first parameter should actually be a protocol family
337     // constant (PF_xxx) rather than an address family (AF_xxx), but the
338     // distinction is mainly just historical.  In pretty much all
339     // implementations the PF_foo and AF_foo constants are identical.
340     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
341     if (fd_ < 0) {
342       auto errnoCopy = errno;
343       throw AsyncSocketException(
344           AsyncSocketException::INTERNAL_ERROR,
345           withAddr("failed to create socket"),
346           errnoCopy);
347     }
348     if (shutdownSocketSet_) {
349       shutdownSocketSet_->add(fd_);
350     }
351     ioHandler_.changeHandlerFD(fd_);
352
353     setCloseOnExec();
354
355     // Put the socket in non-blocking mode
356     int flags = fcntl(fd_, F_GETFL, 0);
357     if (flags == -1) {
358       auto errnoCopy = errno;
359       throw AsyncSocketException(
360           AsyncSocketException::INTERNAL_ERROR,
361           withAddr("failed to get socket flags"),
362           errnoCopy);
363     }
364     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
365     if (rv == -1) {
366       auto errnoCopy = errno;
367       throw AsyncSocketException(
368           AsyncSocketException::INTERNAL_ERROR,
369           withAddr("failed to put socket in non-blocking mode"),
370           errnoCopy);
371     }
372
373 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
374     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
375     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
376     if (rv == -1) {
377       auto errnoCopy = errno;
378       throw AsyncSocketException(
379           AsyncSocketException::INTERNAL_ERROR,
380           "failed to enable F_SETNOSIGPIPE on socket",
381           errnoCopy);
382     }
383 #endif
384
385     // By default, turn on TCP_NODELAY
386     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
387     // setNoDelay() will log an error message if it fails.
388     if (address.getFamily() != AF_UNIX) {
389       (void)setNoDelay(true);
390     }
391
392     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
393             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
394
395     // bind the socket
396     if (bindAddr != anyAddress()) {
397       int one = 1;
398       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
399         auto errnoCopy = errno;
400         doClose();
401         throw AsyncSocketException(
402             AsyncSocketException::NOT_OPEN,
403             "failed to setsockopt prior to bind on " + bindAddr.describe(),
404             errnoCopy);
405       }
406
407       bindAddr.getAddress(&addrStorage);
408
409       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
410         auto errnoCopy = errno;
411         doClose();
412         throw AsyncSocketException(
413             AsyncSocketException::NOT_OPEN,
414             "failed to bind to async socket: " + bindAddr.describe(),
415             errnoCopy);
416       }
417     }
418
419     // Apply the additional options if any.
420     for (const auto& opt: options) {
421       int rv = opt.first.apply(fd_, opt.second);
422       if (rv != 0) {
423         auto errnoCopy = errno;
424         throw AsyncSocketException(
425             AsyncSocketException::INTERNAL_ERROR,
426             withAddr("failed to set socket option"),
427             errnoCopy);
428       }
429     }
430
431     // Perform the connect()
432     address.getAddress(&addrStorage);
433
434     if (tfoEnabled_) {
435       state_ = StateEnum::FAST_OPEN;
436       tfoAttempted_ = true;
437     } else {
438       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
439         return;
440       }
441     }
442
443     // If we're still here the connect() succeeded immediately.
444     // Fall through to call the callback outside of this try...catch block
445   } catch (const AsyncSocketException& ex) {
446     return failConnect(__func__, ex);
447   } catch (const std::exception& ex) {
448     // shouldn't happen, but handle it just in case
449     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
450                << "): unexpected " << typeid(ex).name() << " exception: "
451                << ex.what();
452     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
453                             withAddr(string("unexpected exception: ") +
454                                      ex.what()));
455     return failConnect(__func__, tex);
456   }
457
458   // The connection succeeded immediately
459   // The read callback may not have been set yet, and no writes may be pending
460   // yet, so we don't have to register for any events at the moment.
461   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
462   assert(readCallback_ == nullptr);
463   assert(writeReqHead_ == nullptr);
464   if (state_ != StateEnum::FAST_OPEN) {
465     state_ = StateEnum::ESTABLISHED;
466   }
467   invokeConnectSuccess();
468 }
469
470 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
471   int rv = ::connect(fd_, saddr, len);
472   if (rv < 0) {
473     auto errnoCopy = errno;
474     if (errnoCopy == EINPROGRESS) {
475       scheduleConnectTimeoutAndRegisterForEvents();
476     } else {
477       throw AsyncSocketException(
478           AsyncSocketException::NOT_OPEN,
479           "connect failed (immediately)",
480           errnoCopy);
481     }
482   }
483   return rv;
484 }
485
486 void AsyncSocket::scheduleConnectTimeoutAndRegisterForEvents() {
487   // Connection in progress.
488   int timeout = connectTimeout_.count();
489   if (timeout > 0) {
490     // Start a timer in case the connection takes too long.
491     if (!writeTimeout_.scheduleTimeout(timeout)) {
492       throw AsyncSocketException(
493           AsyncSocketException::INTERNAL_ERROR,
494           withAddr("failed to schedule AsyncSocket connect timeout"));
495     }
496   }
497
498   // Register for write events, so we'll
499   // be notified when the connection finishes/fails.
500   // Note that we don't register for a persistent event here.
501   assert(eventFlags_ == EventHandler::NONE);
502   eventFlags_ = EventHandler::WRITE;
503   if (!ioHandler_.registerHandler(eventFlags_)) {
504     throw AsyncSocketException(
505         AsyncSocketException::INTERNAL_ERROR,
506         withAddr("failed to register AsyncSocket connect handler"));
507   }
508 }
509
510 void AsyncSocket::connect(ConnectCallback* callback,
511                            const string& ip, uint16_t port,
512                            int timeout,
513                            const OptionMap &options) noexcept {
514   DestructorGuard dg(this);
515   try {
516     connectCallback_ = callback;
517     connect(callback, folly::SocketAddress(ip, port), timeout, options);
518   } catch (const std::exception& ex) {
519     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
520                             ex.what());
521     return failConnect(__func__, tex);
522   }
523 }
524
525 void AsyncSocket::cancelConnect() {
526   connectCallback_ = nullptr;
527   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
528     closeNow();
529   }
530 }
531
532 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
533   sendTimeout_ = milliseconds;
534   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
535
536   // If we are currently pending on write requests, immediately update
537   // writeTimeout_ with the new value.
538   if ((eventFlags_ & EventHandler::WRITE) &&
539       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
540     assert(state_ == StateEnum::ESTABLISHED);
541     assert((shutdownFlags_ & SHUT_WRITE) == 0);
542     if (sendTimeout_ > 0) {
543       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
544         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
545             withAddr("failed to reschedule send timeout in setSendTimeout"));
546         return failWrite(__func__, ex);
547       }
548     } else {
549       writeTimeout_.cancelTimeout();
550     }
551   }
552 }
553
554 void AsyncSocket::setReadCB(ReadCallback *callback) {
555   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
556           << ", callback=" << callback << ", state=" << state_;
557
558   // Short circuit if callback is the same as the existing readCallback_.
559   //
560   // Note that this is needed for proper functioning during some cleanup cases.
561   // During cleanup we allow setReadCallback(nullptr) to be called even if the
562   // read callback is already unset and we have been detached from an event
563   // base.  This check prevents us from asserting
564   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
565   if (callback == readCallback_) {
566     return;
567   }
568
569   /* We are removing a read callback */
570   if (callback == nullptr &&
571       immediateReadHandler_.isLoopCallbackScheduled()) {
572     immediateReadHandler_.cancelLoopCallback();
573   }
574
575   if (shutdownFlags_ & SHUT_READ) {
576     // Reads have already been shut down on this socket.
577     //
578     // Allow setReadCallback(nullptr) to be called in this case, but don't
579     // allow a new callback to be set.
580     //
581     // For example, setReadCallback(nullptr) can happen after an error if we
582     // invoke some other error callback before invoking readError().  The other
583     // error callback that is invoked first may go ahead and clear the read
584     // callback before we get a chance to invoke readError().
585     if (callback != nullptr) {
586       return invalidState(callback);
587     }
588     assert((eventFlags_ & EventHandler::READ) == 0);
589     readCallback_ = nullptr;
590     return;
591   }
592
593   DestructorGuard dg(this);
594   assert(eventBase_->isInEventBaseThread());
595
596   switch ((StateEnum)state_) {
597     case StateEnum::CONNECTING:
598     case StateEnum::FAST_OPEN:
599       // For convenience, we allow the read callback to be set while we are
600       // still connecting.  We just store the callback for now.  Once the
601       // connection completes we'll register for read events.
602       readCallback_ = callback;
603       return;
604     case StateEnum::ESTABLISHED:
605     {
606       readCallback_ = callback;
607       uint16_t oldFlags = eventFlags_;
608       if (readCallback_) {
609         eventFlags_ |= EventHandler::READ;
610       } else {
611         eventFlags_ &= ~EventHandler::READ;
612       }
613
614       // Update our registration if our flags have changed
615       if (eventFlags_ != oldFlags) {
616         // We intentionally ignore the return value here.
617         // updateEventRegistration() will move us into the error state if it
618         // fails, and we don't need to do anything else here afterwards.
619         (void)updateEventRegistration();
620       }
621
622       if (readCallback_) {
623         checkForImmediateRead();
624       }
625       return;
626     }
627     case StateEnum::CLOSED:
628     case StateEnum::ERROR:
629       // We should never reach here.  SHUT_READ should always be set
630       // if we are in STATE_CLOSED or STATE_ERROR.
631       assert(false);
632       return invalidState(callback);
633     case StateEnum::UNINIT:
634       // We do not allow setReadCallback() to be called before we start
635       // connecting.
636       return invalidState(callback);
637   }
638
639   // We don't put a default case in the switch statement, so that the compiler
640   // will warn us to update the switch statement if a new state is added.
641   return invalidState(callback);
642 }
643
644 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
645   return readCallback_;
646 }
647
648 void AsyncSocket::write(WriteCallback* callback,
649                          const void* buf, size_t bytes, WriteFlags flags) {
650   iovec op;
651   op.iov_base = const_cast<void*>(buf);
652   op.iov_len = bytes;
653   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
654 }
655
656 void AsyncSocket::writev(WriteCallback* callback,
657                           const iovec* vec,
658                           size_t count,
659                           WriteFlags flags) {
660   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
661 }
662
663 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
664                               WriteFlags flags) {
665   constexpr size_t kSmallSizeMax = 64;
666   size_t count = buf->countChainElements();
667   if (count <= kSmallSizeMax) {
668     // suppress "warning: variable length array 'vec' is used [-Wvla]"
669     FOLLY_PUSH_WARNING;
670     FOLLY_GCC_DISABLE_WARNING(vla);
671     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
672     FOLLY_POP_WARNING;
673
674     writeChainImpl(callback, vec, count, std::move(buf), flags);
675   } else {
676     iovec* vec = new iovec[count];
677     writeChainImpl(callback, vec, count, std::move(buf), flags);
678     delete[] vec;
679   }
680 }
681
682 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
683     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
684   size_t veclen = buf->fillIov(vec, count);
685   writeImpl(callback, vec, veclen, std::move(buf), flags);
686 }
687
688 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
689                              size_t count, unique_ptr<IOBuf>&& buf,
690                              WriteFlags flags) {
691   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
692           << ", callback=" << callback << ", count=" << count
693           << ", state=" << state_;
694   DestructorGuard dg(this);
695   unique_ptr<IOBuf>ioBuf(std::move(buf));
696   assert(eventBase_->isInEventBaseThread());
697
698   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
699     // No new writes may be performed after the write side of the socket has
700     // been shutdown.
701     //
702     // We could just call callback->writeError() here to fail just this write.
703     // However, fail hard and use invalidState() to fail all outstanding
704     // callbacks and move the socket into the error state.  There's most likely
705     // a bug in the caller's code, so we abort everything rather than trying to
706     // proceed as best we can.
707     return invalidState(callback);
708   }
709
710   uint32_t countWritten = 0;
711   uint32_t partialWritten = 0;
712   int bytesWritten = 0;
713   bool mustRegister = false;
714   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
715       !connecting()) {
716     if (writeReqHead_ == nullptr) {
717       // If we are established and there are no other writes pending,
718       // we can attempt to perform the write immediately.
719       assert(writeReqTail_ == nullptr);
720       assert((eventFlags_ & EventHandler::WRITE) == 0);
721
722       auto writeResult =
723           performWrite(vec, count, flags, &countWritten, &partialWritten);
724       bytesWritten = writeResult.writeReturn;
725       if (bytesWritten < 0) {
726         auto errnoCopy = errno;
727         if (writeResult.exception) {
728           return failWrite(__func__, callback, 0, *writeResult.exception);
729         }
730         AsyncSocketException ex(
731             AsyncSocketException::INTERNAL_ERROR,
732             withAddr("writev failed"),
733             errnoCopy);
734         return failWrite(__func__, callback, 0, ex);
735       } else if (countWritten == count) {
736         // We successfully wrote everything.
737         // Invoke the callback and return.
738         if (callback) {
739           callback->writeSuccess();
740         }
741         return;
742       } else { // continue writing the next writeReq
743         if (bufferCallback_) {
744           bufferCallback_->onEgressBuffered();
745         }
746       }
747       if (!connecting()) {
748         // Writes might put the socket back into connecting state
749         // if TFO is enabled, and using TFO fails.
750         // This means that write timeouts would not be active, however
751         // connect timeouts would affect this stage.
752         mustRegister = true;
753       }
754     }
755   } else if (!connecting()) {
756     // Invalid state for writing
757     return invalidState(callback);
758   }
759
760   // Create a new WriteRequest to add to the queue
761   WriteRequest* req;
762   try {
763     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
764                                         count - countWritten, partialWritten,
765                                         bytesWritten, std::move(ioBuf), flags);
766   } catch (const std::exception& ex) {
767     // we mainly expect to catch std::bad_alloc here
768     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
769         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
770     return failWrite(__func__, callback, bytesWritten, tex);
771   }
772   req->consume();
773   if (writeReqTail_ == nullptr) {
774     assert(writeReqHead_ == nullptr);
775     writeReqHead_ = writeReqTail_ = req;
776   } else {
777     writeReqTail_->append(req);
778     writeReqTail_ = req;
779   }
780
781   // Register for write events if are established and not currently
782   // waiting on write events
783   if (mustRegister) {
784     assert(state_ == StateEnum::ESTABLISHED);
785     assert((eventFlags_ & EventHandler::WRITE) == 0);
786     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
787       assert(state_ == StateEnum::ERROR);
788       return;
789     }
790     if (sendTimeout_ > 0) {
791       // Schedule a timeout to fire if the write takes too long.
792       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
793         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
794                                withAddr("failed to schedule send timeout"));
795         return failWrite(__func__, ex);
796       }
797     }
798   }
799 }
800
801 void AsyncSocket::writeRequest(WriteRequest* req) {
802   if (writeReqTail_ == nullptr) {
803     assert(writeReqHead_ == nullptr);
804     writeReqHead_ = writeReqTail_ = req;
805     req->start();
806   } else {
807     writeReqTail_->append(req);
808     writeReqTail_ = req;
809   }
810 }
811
812 void AsyncSocket::close() {
813   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
814           << ", state=" << state_ << ", shutdownFlags="
815           << std::hex << (int) shutdownFlags_;
816
817   // close() is only different from closeNow() when there are pending writes
818   // that need to drain before we can close.  In all other cases, just call
819   // closeNow().
820   //
821   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
822   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
823   // is still running.  (e.g., If there are multiple pending writes, and we
824   // call writeError() on the first one, it may call close().  In this case we
825   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
826   // writes will still be in the queue.)
827   //
828   // We only need to drain pending writes if we are still in STATE_CONNECTING
829   // or STATE_ESTABLISHED
830   if ((writeReqHead_ == nullptr) ||
831       !(state_ == StateEnum::CONNECTING ||
832       state_ == StateEnum::ESTABLISHED)) {
833     closeNow();
834     return;
835   }
836
837   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
838   // destroyed until close() returns.
839   DestructorGuard dg(this);
840   assert(eventBase_->isInEventBaseThread());
841
842   // Since there are write requests pending, we have to set the
843   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
844   // connect finishes and we finish writing these requests.
845   //
846   // Set SHUT_READ to indicate that reads are shut down, and set the
847   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
848   // pending writes complete.
849   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
850
851   // If a read callback is set, invoke readEOF() immediately to inform it that
852   // the socket has been closed and no more data can be read.
853   if (readCallback_) {
854     // Disable reads if they are enabled
855     if (!updateEventRegistration(0, EventHandler::READ)) {
856       // We're now in the error state; callbacks have been cleaned up
857       assert(state_ == StateEnum::ERROR);
858       assert(readCallback_ == nullptr);
859     } else {
860       ReadCallback* callback = readCallback_;
861       readCallback_ = nullptr;
862       callback->readEOF();
863     }
864   }
865 }
866
867 void AsyncSocket::closeNow() {
868   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
869           << ", state=" << state_ << ", shutdownFlags="
870           << std::hex << (int) shutdownFlags_;
871   DestructorGuard dg(this);
872   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
873
874   switch (state_) {
875     case StateEnum::ESTABLISHED:
876     case StateEnum::CONNECTING:
877     case StateEnum::FAST_OPEN: {
878       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
879       state_ = StateEnum::CLOSED;
880
881       // If the write timeout was set, cancel it.
882       writeTimeout_.cancelTimeout();
883
884       // If we are registered for I/O events, unregister.
885       if (eventFlags_ != EventHandler::NONE) {
886         eventFlags_ = EventHandler::NONE;
887         if (!updateEventRegistration()) {
888           // We will have been moved into the error state.
889           assert(state_ == StateEnum::ERROR);
890           return;
891         }
892       }
893
894       if (immediateReadHandler_.isLoopCallbackScheduled()) {
895         immediateReadHandler_.cancelLoopCallback();
896       }
897
898       if (fd_ >= 0) {
899         ioHandler_.changeHandlerFD(-1);
900         doClose();
901       }
902
903       invokeConnectErr(socketClosedLocallyEx);
904
905       failAllWrites(socketClosedLocallyEx);
906
907       if (readCallback_) {
908         ReadCallback* callback = readCallback_;
909         readCallback_ = nullptr;
910         callback->readEOF();
911       }
912       return;
913     }
914     case StateEnum::CLOSED:
915       // Do nothing.  It's possible that we are being called recursively
916       // from inside a callback that we invoked inside another call to close()
917       // that is still running.
918       return;
919     case StateEnum::ERROR:
920       // Do nothing.  The error handling code has performed (or is performing)
921       // cleanup.
922       return;
923     case StateEnum::UNINIT:
924       assert(eventFlags_ == EventHandler::NONE);
925       assert(connectCallback_ == nullptr);
926       assert(readCallback_ == nullptr);
927       assert(writeReqHead_ == nullptr);
928       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
929       state_ = StateEnum::CLOSED;
930       return;
931   }
932
933   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
934               << ") called in unknown state " << state_;
935 }
936
937 void AsyncSocket::closeWithReset() {
938   // Enable SO_LINGER, with the linger timeout set to 0.
939   // This will trigger a TCP reset when we close the socket.
940   if (fd_ >= 0) {
941     struct linger optLinger = {1, 0};
942     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
943       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
944               << "on " << fd_ << ": errno=" << errno;
945     }
946   }
947
948   // Then let closeNow() take care of the rest
949   closeNow();
950 }
951
952 void AsyncSocket::shutdownWrite() {
953   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
954           << ", state=" << state_ << ", shutdownFlags="
955           << std::hex << (int) shutdownFlags_;
956
957   // If there are no pending writes, shutdownWrite() is identical to
958   // shutdownWriteNow().
959   if (writeReqHead_ == nullptr) {
960     shutdownWriteNow();
961     return;
962   }
963
964   assert(eventBase_->isInEventBaseThread());
965
966   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
967   // shutdown will be performed once all writes complete.
968   shutdownFlags_ |= SHUT_WRITE_PENDING;
969 }
970
971 void AsyncSocket::shutdownWriteNow() {
972   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
973           << ", fd=" << fd_ << ", state=" << state_
974           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
975
976   if (shutdownFlags_ & SHUT_WRITE) {
977     // Writes are already shutdown; nothing else to do.
978     return;
979   }
980
981   // If SHUT_READ is already set, just call closeNow() to completely
982   // close the socket.  This can happen if close() was called with writes
983   // pending, and then shutdownWriteNow() is called before all pending writes
984   // complete.
985   if (shutdownFlags_ & SHUT_READ) {
986     closeNow();
987     return;
988   }
989
990   DestructorGuard dg(this);
991   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
992
993   switch (static_cast<StateEnum>(state_)) {
994     case StateEnum::ESTABLISHED:
995     {
996       shutdownFlags_ |= SHUT_WRITE;
997
998       // If the write timeout was set, cancel it.
999       writeTimeout_.cancelTimeout();
1000
1001       // If we are registered for write events, unregister.
1002       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1003         // We will have been moved into the error state.
1004         assert(state_ == StateEnum::ERROR);
1005         return;
1006       }
1007
1008       // Shutdown writes on the file descriptor
1009       shutdown(fd_, SHUT_WR);
1010
1011       // Immediately fail all write requests
1012       failAllWrites(socketShutdownForWritesEx);
1013       return;
1014     }
1015     case StateEnum::CONNECTING:
1016     {
1017       // Set the SHUT_WRITE_PENDING flag.
1018       // When the connection completes, it will check this flag,
1019       // shutdown the write half of the socket, and then set SHUT_WRITE.
1020       shutdownFlags_ |= SHUT_WRITE_PENDING;
1021
1022       // Immediately fail all write requests
1023       failAllWrites(socketShutdownForWritesEx);
1024       return;
1025     }
1026     case StateEnum::UNINIT:
1027       // Callers normally shouldn't call shutdownWriteNow() before the socket
1028       // even starts connecting.  Nonetheless, go ahead and set
1029       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1030       // immediately shut down the write side of the socket.
1031       shutdownFlags_ |= SHUT_WRITE_PENDING;
1032       return;
1033     case StateEnum::FAST_OPEN:
1034       // In fast open state we haven't call connected yet, and if we shutdown
1035       // the writes, we will never try to call connect, so shut everything down
1036       shutdownFlags_ |= SHUT_WRITE;
1037       // Immediately fail all write requests
1038       failAllWrites(socketShutdownForWritesEx);
1039       return;
1040     case StateEnum::CLOSED:
1041     case StateEnum::ERROR:
1042       // We should never get here.  SHUT_WRITE should always be set
1043       // in STATE_CLOSED and STATE_ERROR.
1044       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1045                  << ", fd=" << fd_ << ") in unexpected state " << state_
1046                  << " with SHUT_WRITE not set ("
1047                  << std::hex << (int) shutdownFlags_ << ")";
1048       assert(false);
1049       return;
1050   }
1051
1052   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1053               << fd_ << ") called in unknown state " << state_;
1054 }
1055
1056 bool AsyncSocket::readable() const {
1057   if (fd_ == -1) {
1058     return false;
1059   }
1060   struct pollfd fds[1];
1061   fds[0].fd = fd_;
1062   fds[0].events = POLLIN;
1063   fds[0].revents = 0;
1064   int rc = poll(fds, 1, 0);
1065   return rc == 1;
1066 }
1067
1068 bool AsyncSocket::isPending() const {
1069   return ioHandler_.isPending();
1070 }
1071
1072 bool AsyncSocket::hangup() const {
1073   if (fd_ == -1) {
1074     // sanity check, no one should ask for hangup if we are not connected.
1075     assert(false);
1076     return false;
1077   }
1078 #ifdef POLLRDHUP // Linux-only
1079   struct pollfd fds[1];
1080   fds[0].fd = fd_;
1081   fds[0].events = POLLRDHUP|POLLHUP;
1082   fds[0].revents = 0;
1083   poll(fds, 1, 0);
1084   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1085 #else
1086   return false;
1087 #endif
1088 }
1089
1090 bool AsyncSocket::good() const {
1091   return (
1092       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1093        state_ == StateEnum::ESTABLISHED) &&
1094       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1095 }
1096
1097 bool AsyncSocket::error() const {
1098   return (state_ == StateEnum::ERROR);
1099 }
1100
1101 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1102   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1103           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1104           << ", state=" << state_ << ", events="
1105           << std::hex << eventFlags_ << ")";
1106   assert(eventBase_ == nullptr);
1107   assert(eventBase->isInEventBaseThread());
1108
1109   eventBase_ = eventBase;
1110   ioHandler_.attachEventBase(eventBase);
1111   writeTimeout_.attachEventBase(eventBase);
1112 }
1113
1114 void AsyncSocket::detachEventBase() {
1115   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1116           << ", old evb=" << eventBase_ << ", state=" << state_
1117           << ", events=" << std::hex << eventFlags_ << ")";
1118   assert(eventBase_ != nullptr);
1119   assert(eventBase_->isInEventBaseThread());
1120
1121   eventBase_ = nullptr;
1122   ioHandler_.detachEventBase();
1123   writeTimeout_.detachEventBase();
1124 }
1125
1126 bool AsyncSocket::isDetachable() const {
1127   DCHECK(eventBase_ != nullptr);
1128   DCHECK(eventBase_->isInEventBaseThread());
1129
1130   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1131 }
1132
1133 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1134   if (!localAddr_.isInitialized()) {
1135     localAddr_.setFromLocalAddress(fd_);
1136   }
1137   *address = localAddr_;
1138 }
1139
1140 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1141   if (!addr_.isInitialized()) {
1142     addr_.setFromPeerAddress(fd_);
1143   }
1144   *address = addr_;
1145 }
1146
1147 int AsyncSocket::setNoDelay(bool noDelay) {
1148   if (fd_ < 0) {
1149     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1150                << this << "(state=" << state_ << ")";
1151     return EINVAL;
1152
1153   }
1154
1155   int value = noDelay ? 1 : 0;
1156   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1157     int errnoCopy = errno;
1158     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1159             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1160             << strerror(errnoCopy);
1161     return errnoCopy;
1162   }
1163
1164   return 0;
1165 }
1166
1167 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1168
1169   #ifndef TCP_CONGESTION
1170   #define TCP_CONGESTION  13
1171   #endif
1172
1173   if (fd_ < 0) {
1174     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1175                << "socket " << this << "(state=" << state_ << ")";
1176     return EINVAL;
1177
1178   }
1179
1180   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1181         cname.length() + 1) != 0) {
1182     int errnoCopy = errno;
1183     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1184             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1185             << strerror(errnoCopy);
1186     return errnoCopy;
1187   }
1188
1189   return 0;
1190 }
1191
1192 int AsyncSocket::setQuickAck(bool quickack) {
1193   if (fd_ < 0) {
1194     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1195                << this << "(state=" << state_ << ")";
1196     return EINVAL;
1197
1198   }
1199
1200 #ifdef TCP_QUICKACK // Linux-only
1201   int value = quickack ? 1 : 0;
1202   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1203     int errnoCopy = errno;
1204     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1205             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1206             << strerror(errnoCopy);
1207     return errnoCopy;
1208   }
1209
1210   return 0;
1211 #else
1212   return ENOSYS;
1213 #endif
1214 }
1215
1216 int AsyncSocket::setSendBufSize(size_t bufsize) {
1217   if (fd_ < 0) {
1218     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1219                << this << "(state=" << state_ << ")";
1220     return EINVAL;
1221   }
1222
1223   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1224     int errnoCopy = errno;
1225     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1226             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1227             << strerror(errnoCopy);
1228     return errnoCopy;
1229   }
1230
1231   return 0;
1232 }
1233
1234 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1235   if (fd_ < 0) {
1236     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1237                << this << "(state=" << state_ << ")";
1238     return EINVAL;
1239   }
1240
1241   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1242     int errnoCopy = errno;
1243     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1244             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1245             << strerror(errnoCopy);
1246     return errnoCopy;
1247   }
1248
1249   return 0;
1250 }
1251
1252 int AsyncSocket::setTCPProfile(int profd) {
1253   if (fd_ < 0) {
1254     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1255                << this << "(state=" << state_ << ")";
1256     return EINVAL;
1257   }
1258
1259   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1260     int errnoCopy = errno;
1261     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1262             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1263             << strerror(errnoCopy);
1264     return errnoCopy;
1265   }
1266
1267   return 0;
1268 }
1269
1270 void AsyncSocket::ioReady(uint16_t events) noexcept {
1271   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1272           << ", events=" << std::hex << events << ", state=" << state_;
1273   DestructorGuard dg(this);
1274   assert(events & EventHandler::READ_WRITE);
1275   assert(eventBase_->isInEventBaseThread());
1276
1277   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1278   if (relevantEvents == EventHandler::READ) {
1279     handleRead();
1280   } else if (relevantEvents == EventHandler::WRITE) {
1281     handleWrite();
1282   } else if (relevantEvents == EventHandler::READ_WRITE) {
1283     EventBase* originalEventBase = eventBase_;
1284     // If both read and write events are ready, process writes first.
1285     handleWrite();
1286
1287     // Return now if handleWrite() detached us from our EventBase
1288     if (eventBase_ != originalEventBase) {
1289       return;
1290     }
1291
1292     // Only call handleRead() if a read callback is still installed.
1293     // (It's possible that the read callback was uninstalled during
1294     // handleWrite().)
1295     if (readCallback_) {
1296       handleRead();
1297     }
1298   } else {
1299     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1300                << std::hex << events << "(this=" << this << ")";
1301     abort();
1302   }
1303 }
1304
1305 AsyncSocket::ReadResult
1306 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1307   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1308           << ", buflen=" << *buflen;
1309
1310   int recvFlags = 0;
1311   if (peek_) {
1312     recvFlags |= MSG_PEEK;
1313   }
1314
1315   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1316   if (bytes < 0) {
1317     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1318       // No more data to read right now.
1319       return ReadResult(READ_BLOCKING);
1320     } else {
1321       return ReadResult(READ_ERROR);
1322     }
1323   } else {
1324     appBytesReceived_ += bytes;
1325     return ReadResult(bytes);
1326   }
1327 }
1328
1329 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1330   // no matter what, buffer should be preapared for non-ssl socket
1331   CHECK(readCallback_);
1332   readCallback_->getReadBuffer(buf, buflen);
1333 }
1334
1335 void AsyncSocket::handleRead() noexcept {
1336   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1337           << ", state=" << state_;
1338   assert(state_ == StateEnum::ESTABLISHED);
1339   assert((shutdownFlags_ & SHUT_READ) == 0);
1340   assert(readCallback_ != nullptr);
1341   assert(eventFlags_ & EventHandler::READ);
1342
1343   // Loop until:
1344   // - a read attempt would block
1345   // - readCallback_ is uninstalled
1346   // - the number of loop iterations exceeds the optional maximum
1347   // - this AsyncSocket is moved to another EventBase
1348   //
1349   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1350   // which is why need to check for it here.
1351   //
1352   // The last bullet point is slightly subtle.  readDataAvailable() may also
1353   // detach this socket from this EventBase.  However, before
1354   // readDataAvailable() returns another thread may pick it up, attach it to
1355   // a different EventBase, and install another readCallback_.  We need to
1356   // exit immediately after readDataAvailable() returns if the eventBase_ has
1357   // changed.  (The caller must perform some sort of locking to transfer the
1358   // AsyncSocket between threads properly.  This will be sufficient to ensure
1359   // that this thread sees the updated eventBase_ variable after
1360   // readDataAvailable() returns.)
1361   uint16_t numReads = 0;
1362   EventBase* originalEventBase = eventBase_;
1363   while (readCallback_ && eventBase_ == originalEventBase) {
1364     // Get the buffer to read into.
1365     void* buf = nullptr;
1366     size_t buflen = 0, offset = 0;
1367     try {
1368       prepareReadBuffer(&buf, &buflen);
1369       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1370     } catch (const AsyncSocketException& ex) {
1371       return failRead(__func__, ex);
1372     } catch (const std::exception& ex) {
1373       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1374                               string("ReadCallback::getReadBuffer() "
1375                                      "threw exception: ") +
1376                               ex.what());
1377       return failRead(__func__, tex);
1378     } catch (...) {
1379       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1380                              "ReadCallback::getReadBuffer() threw "
1381                              "non-exception type");
1382       return failRead(__func__, ex);
1383     }
1384     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1385       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1386                              "ReadCallback::getReadBuffer() returned "
1387                              "empty buffer");
1388       return failRead(__func__, ex);
1389     }
1390
1391     // Perform the read
1392     auto readResult = performRead(&buf, &buflen, &offset);
1393     auto bytesRead = readResult.readReturn;
1394     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1395             << bytesRead << " bytes";
1396     if (bytesRead > 0) {
1397       if (!isBufferMovable_) {
1398         readCallback_->readDataAvailable(bytesRead);
1399       } else {
1400         CHECK(kOpenSslModeMoveBufferOwnership);
1401         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1402                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1403                 << ", offset=" << offset;
1404         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1405         readBuf->trimStart(offset);
1406         readBuf->trimEnd(buflen - offset - bytesRead);
1407         readCallback_->readBufferAvailable(std::move(readBuf));
1408       }
1409
1410       // Fall through and continue around the loop if the read
1411       // completely filled the available buffer.
1412       // Note that readCallback_ may have been uninstalled or changed inside
1413       // readDataAvailable().
1414       if (size_t(bytesRead) < buflen) {
1415         return;
1416       }
1417     } else if (bytesRead == READ_BLOCKING) {
1418         // No more data to read right now.
1419         return;
1420     } else if (bytesRead == READ_ERROR) {
1421       readErr_ = READ_ERROR;
1422       if (readResult.exception) {
1423         return failRead(__func__, *readResult.exception);
1424       }
1425       auto errnoCopy = errno;
1426       AsyncSocketException ex(
1427           AsyncSocketException::INTERNAL_ERROR,
1428           withAddr("recv() failed"),
1429           errnoCopy);
1430       return failRead(__func__, ex);
1431     } else {
1432       assert(bytesRead == READ_EOF);
1433       readErr_ = READ_EOF;
1434       // EOF
1435       shutdownFlags_ |= SHUT_READ;
1436       if (!updateEventRegistration(0, EventHandler::READ)) {
1437         // we've already been moved into STATE_ERROR
1438         assert(state_ == StateEnum::ERROR);
1439         assert(readCallback_ == nullptr);
1440         return;
1441       }
1442
1443       ReadCallback* callback = readCallback_;
1444       readCallback_ = nullptr;
1445       callback->readEOF();
1446       return;
1447     }
1448     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1449       if (readCallback_ != nullptr) {
1450         // We might still have data in the socket.
1451         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1452         scheduleImmediateRead();
1453       }
1454       return;
1455     }
1456   }
1457 }
1458
1459 /**
1460  * This function attempts to write as much data as possible, until no more data
1461  * can be written.
1462  *
1463  * - If it sends all available data, it unregisters for write events, and stops
1464  *   the writeTimeout_.
1465  *
1466  * - If not all of the data can be sent immediately, it reschedules
1467  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1468  *   registered for write events.
1469  */
1470 void AsyncSocket::handleWrite() noexcept {
1471   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1472           << ", state=" << state_;
1473   DestructorGuard dg(this);
1474
1475   if (state_ == StateEnum::CONNECTING) {
1476     handleConnect();
1477     return;
1478   }
1479
1480   // Normal write
1481   assert(state_ == StateEnum::ESTABLISHED);
1482   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1483   assert(writeReqHead_ != nullptr);
1484
1485   // Loop until we run out of write requests,
1486   // or until this socket is moved to another EventBase.
1487   // (See the comment in handleRead() explaining how this can happen.)
1488   EventBase* originalEventBase = eventBase_;
1489   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1490     auto writeResult = writeReqHead_->performWrite();
1491     if (writeResult.writeReturn < 0) {
1492       if (writeResult.exception) {
1493         return failWrite(__func__, *writeResult.exception);
1494       }
1495       auto errnoCopy = errno;
1496       AsyncSocketException ex(
1497           AsyncSocketException::INTERNAL_ERROR,
1498           withAddr("writev() failed"),
1499           errnoCopy);
1500       return failWrite(__func__, ex);
1501     } else if (writeReqHead_->isComplete()) {
1502       // We finished this request
1503       WriteRequest* req = writeReqHead_;
1504       writeReqHead_ = req->getNext();
1505
1506       if (writeReqHead_ == nullptr) {
1507         writeReqTail_ = nullptr;
1508         // This is the last write request.
1509         // Unregister for write events and cancel the send timer
1510         // before we invoke the callback.  We have to update the state properly
1511         // before calling the callback, since it may want to detach us from
1512         // the EventBase.
1513         if (eventFlags_ & EventHandler::WRITE) {
1514           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1515             assert(state_ == StateEnum::ERROR);
1516             return;
1517           }
1518           // Stop the send timeout
1519           writeTimeout_.cancelTimeout();
1520         }
1521         assert(!writeTimeout_.isScheduled());
1522
1523         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1524         // we finish sending the last write request.
1525         //
1526         // We have to do this before invoking writeSuccess(), since
1527         // writeSuccess() may detach us from our EventBase.
1528         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1529           assert(connectCallback_ == nullptr);
1530           shutdownFlags_ |= SHUT_WRITE;
1531
1532           if (shutdownFlags_ & SHUT_READ) {
1533             // Reads have already been shutdown.  Fully close the socket and
1534             // move to STATE_CLOSED.
1535             //
1536             // Note: This code currently moves us to STATE_CLOSED even if
1537             // close() hasn't ever been called.  This can occur if we have
1538             // received EOF from the peer and shutdownWrite() has been called
1539             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1540             // case, until close() is actually called?  I can't think of a
1541             // reason why we would need to do so.  No other operations besides
1542             // calling close() or destroying the socket can be performed at
1543             // this point.
1544             assert(readCallback_ == nullptr);
1545             state_ = StateEnum::CLOSED;
1546             if (fd_ >= 0) {
1547               ioHandler_.changeHandlerFD(-1);
1548               doClose();
1549             }
1550           } else {
1551             // Reads are still enabled, so we are only doing a half-shutdown
1552             shutdown(fd_, SHUT_WR);
1553           }
1554         }
1555       }
1556
1557       // Invoke the callback
1558       WriteCallback* callback = req->getCallback();
1559       req->destroy();
1560       if (callback) {
1561         callback->writeSuccess();
1562       }
1563       // We'll continue around the loop, trying to write another request
1564     } else {
1565       // Partial write.
1566       if (bufferCallback_) {
1567         bufferCallback_->onEgressBuffered();
1568       }
1569       writeReqHead_->consume();
1570       // Stop after a partial write; it's highly likely that a subsequent write
1571       // attempt will just return EAGAIN.
1572       //
1573       // Ensure that we are registered for write events.
1574       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1575         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1576           assert(state_ == StateEnum::ERROR);
1577           return;
1578         }
1579       }
1580
1581       // Reschedule the send timeout, since we have made some write progress.
1582       if (sendTimeout_ > 0) {
1583         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1584           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1585               withAddr("failed to reschedule write timeout"));
1586           return failWrite(__func__, ex);
1587         }
1588       }
1589       return;
1590     }
1591   }
1592   if (!writeReqHead_ && bufferCallback_) {
1593     bufferCallback_->onEgressBufferCleared();
1594   }
1595 }
1596
1597 void AsyncSocket::checkForImmediateRead() noexcept {
1598   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1599   // (However, note that some subclasses do override this method.)
1600   //
1601   // Simply calling handleRead() here would be bad, as this would call
1602   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1603   // buffer even though no data may be available.  This would waste lots of
1604   // memory, since the buffer will sit around unused until the socket actually
1605   // becomes readable.
1606   //
1607   // Checking if the socket is readable now also seems like it would probably
1608   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1609   // would just waste an extra system call.  Even if it is readable, waiting to
1610   // find out from libevent on the next event loop doesn't seem that bad.
1611 }
1612
1613 void AsyncSocket::handleInitialReadWrite() noexcept {
1614   // Our callers should already be holding a DestructorGuard, but grab
1615   // one here just to make sure, in case one of our calling code paths ever
1616   // changes.
1617   DestructorGuard dg(this);
1618
1619   // If we have a readCallback_, make sure we enable read events.  We
1620   // may already be registered for reads if connectSuccess() set
1621   // the read calback.
1622   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1623     assert(state_ == StateEnum::ESTABLISHED);
1624     assert((shutdownFlags_ & SHUT_READ) == 0);
1625     if (!updateEventRegistration(EventHandler::READ, 0)) {
1626       assert(state_ == StateEnum::ERROR);
1627       return;
1628     }
1629     checkForImmediateRead();
1630   } else if (readCallback_ == nullptr) {
1631     // Unregister for read events.
1632     updateEventRegistration(0, EventHandler::READ);
1633   }
1634
1635   // If we have write requests pending, try to send them immediately.
1636   // Since we just finished accepting, there is a very good chance that we can
1637   // write without blocking.
1638   //
1639   // However, we only process them if EventHandler::WRITE is not already set,
1640   // which means that we're already blocked on a write attempt.  (This can
1641   // happen if connectSuccess() called write() before returning.)
1642   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1643     // Call handleWrite() to perform write processing.
1644     handleWrite();
1645   } else if (writeReqHead_ == nullptr) {
1646     // Unregister for write event.
1647     updateEventRegistration(0, EventHandler::WRITE);
1648   }
1649 }
1650
1651 void AsyncSocket::handleConnect() noexcept {
1652   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1653           << ", state=" << state_;
1654   assert(state_ == StateEnum::CONNECTING);
1655   // SHUT_WRITE can never be set while we are still connecting;
1656   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1657   // finishes
1658   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1659
1660   // In case we had a connect timeout, cancel the timeout
1661   writeTimeout_.cancelTimeout();
1662   // We don't use a persistent registration when waiting on a connect event,
1663   // so we have been automatically unregistered now.  Update eventFlags_ to
1664   // reflect reality.
1665   assert(eventFlags_ == EventHandler::WRITE);
1666   eventFlags_ = EventHandler::NONE;
1667
1668   // Call getsockopt() to check if the connect succeeded
1669   int error;
1670   socklen_t len = sizeof(error);
1671   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1672   if (rv != 0) {
1673     auto errnoCopy = errno;
1674     AsyncSocketException ex(
1675         AsyncSocketException::INTERNAL_ERROR,
1676         withAddr("error calling getsockopt() after connect"),
1677         errnoCopy);
1678     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1679                << fd_ << " host=" << addr_.describe()
1680                << ") exception:" << ex.what();
1681     return failConnect(__func__, ex);
1682   }
1683
1684   if (error != 0) {
1685     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1686                            "connect failed", error);
1687     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1688             << fd_ << " host=" << addr_.describe()
1689             << ") exception: " << ex.what();
1690     return failConnect(__func__, ex);
1691   }
1692
1693   // Move into STATE_ESTABLISHED
1694   state_ = StateEnum::ESTABLISHED;
1695
1696   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1697   // perform, immediately shutdown the write half of the socket.
1698   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1699     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1700     // are still connecting we just abort the connect rather than waiting for
1701     // it to complete.
1702     assert((shutdownFlags_ & SHUT_READ) == 0);
1703     shutdown(fd_, SHUT_WR);
1704     shutdownFlags_ |= SHUT_WRITE;
1705   }
1706
1707   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1708           << "successfully connected; state=" << state_;
1709
1710   // Remember the EventBase we are attached to, before we start invoking any
1711   // callbacks (since the callbacks may call detachEventBase()).
1712   EventBase* originalEventBase = eventBase_;
1713
1714   invokeConnectSuccess();
1715   // Note that the connect callback may have changed our state.
1716   // (set or unset the read callback, called write(), closed the socket, etc.)
1717   // The following code needs to handle these situations correctly.
1718   //
1719   // If the socket has been closed, readCallback_ and writeReqHead_ will
1720   // always be nullptr, so that will prevent us from trying to read or write.
1721   //
1722   // The main thing to check for is if eventBase_ is still originalEventBase.
1723   // If not, we have been detached from this event base, so we shouldn't
1724   // perform any more operations.
1725   if (eventBase_ != originalEventBase) {
1726     return;
1727   }
1728
1729   handleInitialReadWrite();
1730 }
1731
1732 void AsyncSocket::timeoutExpired() noexcept {
1733   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1734           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1735   DestructorGuard dg(this);
1736   assert(eventBase_->isInEventBaseThread());
1737
1738   if (state_ == StateEnum::CONNECTING) {
1739     // connect() timed out
1740     // Unregister for I/O events.
1741     if (connectCallback_) {
1742       AsyncSocketException ex(
1743           AsyncSocketException::TIMED_OUT, "connect timed out");
1744       failConnect(__func__, ex);
1745     } else {
1746       // we faced a connect error without a connect callback, which could
1747       // happen due to TFO.
1748       AsyncSocketException ex(
1749           AsyncSocketException::TIMED_OUT, "write timed out during connection");
1750       failWrite(__func__, ex);
1751     }
1752   } else {
1753     // a normal write operation timed out
1754     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1755     failWrite(__func__, ex);
1756   }
1757 }
1758
1759 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
1760   return detail::tfo_sendmsg(fd, msg, msg_flags);
1761 }
1762
1763 AsyncSocket::WriteResult
1764 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
1765   ssize_t totalWritten = 0;
1766   if (state_ == StateEnum::FAST_OPEN) {
1767     sockaddr_storage addr;
1768     auto len = addr_.getAddress(&addr);
1769     msg->msg_name = &addr;
1770     msg->msg_namelen = len;
1771     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
1772     if (totalWritten >= 0) {
1773       tfoFinished_ = true;
1774       state_ = StateEnum::ESTABLISHED;
1775       handleInitialReadWrite();
1776     } else if (errno == EINPROGRESS) {
1777       VLOG(4) << "TFO falling back to connecting";
1778       // A normal sendmsg doesn't return EINPROGRESS, however
1779       // TFO might fallback to connecting if there is no
1780       // cookie.
1781       state_ = StateEnum::CONNECTING;
1782       try {
1783         scheduleConnectTimeoutAndRegisterForEvents();
1784       } catch (const AsyncSocketException& ex) {
1785         return WriteResult(
1786             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1787       }
1788       // Let's fake it that no bytes were written and return an errno.
1789       errno = EAGAIN;
1790       totalWritten = -1;
1791     } else if (errno == EOPNOTSUPP) {
1792       VLOG(4) << "TFO not supported";
1793       // Try falling back to connecting.
1794       state_ = StateEnum::CONNECTING;
1795       try {
1796         int ret = socketConnect((const sockaddr*)&addr, len);
1797         if (ret == 0) {
1798           // connect succeeded immediately
1799           // Treat this like no data was written.
1800           state_ = StateEnum::ESTABLISHED;
1801           handleInitialReadWrite();
1802         }
1803         // If there was no exception during connections,
1804         // we would return that no bytes were written.
1805         errno = EAGAIN;
1806         totalWritten = -1;
1807       } catch (const AsyncSocketException& ex) {
1808         return WriteResult(
1809             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1810       }
1811     } else if (errno == EAGAIN) {
1812       // Normally sendmsg would indicate that the write would block.
1813       // However in the fast open case, it would indicate that sendmsg
1814       // fell back to a connect. This is a return code from connect()
1815       // instead, and is an error condition indicating no fds available.
1816       return WriteResult(
1817           WRITE_ERROR,
1818           folly::make_unique<AsyncSocketException>(
1819               AsyncSocketException::UNKNOWN, "No more free local ports"));
1820     }
1821   } else {
1822     totalWritten = ::sendmsg(fd, msg, msg_flags);
1823   }
1824   return WriteResult(totalWritten);
1825 }
1826
1827 AsyncSocket::WriteResult AsyncSocket::performWrite(
1828     const iovec* vec,
1829     uint32_t count,
1830     WriteFlags flags,
1831     uint32_t* countWritten,
1832     uint32_t* partialWritten) {
1833   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1834   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1835   // (since it may terminate the program if the main program doesn't explicitly
1836   // ignore it).
1837   struct msghdr msg;
1838   msg.msg_name = nullptr;
1839   msg.msg_namelen = 0;
1840   msg.msg_iov = const_cast<iovec *>(vec);
1841   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1842   msg.msg_control = nullptr;
1843   msg.msg_controllen = 0;
1844   msg.msg_flags = 0;
1845
1846   int msg_flags = MSG_DONTWAIT;
1847
1848 #ifdef MSG_NOSIGNAL // Linux-only
1849   msg_flags |= MSG_NOSIGNAL;
1850   if (isSet(flags, WriteFlags::CORK)) {
1851     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1852     // give it the rest of the data rather than immediately sending a partial
1853     // frame, even when TCP_NODELAY is enabled.
1854     msg_flags |= MSG_MORE;
1855   }
1856 #endif
1857   if (isSet(flags, WriteFlags::EOR)) {
1858     // marks that this is the last byte of a record (response)
1859     msg_flags |= MSG_EOR;
1860   }
1861   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
1862   auto totalWritten = writeResult.writeReturn;
1863   if (totalWritten < 0) {
1864     if (!writeResult.exception && errno == EAGAIN) {
1865       // TCP buffer is full; we can't write any more data right now.
1866       *countWritten = 0;
1867       *partialWritten = 0;
1868       return WriteResult(0);
1869     }
1870     // error
1871     *countWritten = 0;
1872     *partialWritten = 0;
1873     return writeResult;
1874   }
1875
1876   appBytesWritten_ += totalWritten;
1877
1878   uint32_t bytesWritten;
1879   uint32_t n;
1880   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1881     const iovec* v = vec + n;
1882     if (v->iov_len > bytesWritten) {
1883       // Partial write finished in the middle of this iovec
1884       *countWritten = n;
1885       *partialWritten = bytesWritten;
1886       return WriteResult(totalWritten);
1887     }
1888
1889     bytesWritten -= v->iov_len;
1890   }
1891
1892   assert(bytesWritten == 0);
1893   *countWritten = n;
1894   *partialWritten = 0;
1895   return WriteResult(totalWritten);
1896 }
1897
1898 /**
1899  * Re-register the EventHandler after eventFlags_ has changed.
1900  *
1901  * If an error occurs, fail() is called to move the socket into the error state
1902  * and call all currently installed callbacks.  After an error, the
1903  * AsyncSocket is completely unregistered.
1904  *
1905  * @return Returns true on succcess, or false on error.
1906  */
1907 bool AsyncSocket::updateEventRegistration() {
1908   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1909           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1910           << ", events=" << std::hex << eventFlags_;
1911   assert(eventBase_->isInEventBaseThread());
1912   if (eventFlags_ == EventHandler::NONE) {
1913     ioHandler_.unregisterHandler();
1914     return true;
1915   }
1916
1917   // Always register for persistent events, so we don't have to re-register
1918   // after being called back.
1919   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1920     eventFlags_ = EventHandler::NONE; // we're not registered after error
1921     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1922         withAddr("failed to update AsyncSocket event registration"));
1923     fail("updateEventRegistration", ex);
1924     return false;
1925   }
1926
1927   return true;
1928 }
1929
1930 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1931                                            uint16_t disable) {
1932   uint16_t oldFlags = eventFlags_;
1933   eventFlags_ |= enable;
1934   eventFlags_ &= ~disable;
1935   if (eventFlags_ == oldFlags) {
1936     return true;
1937   } else {
1938     return updateEventRegistration();
1939   }
1940 }
1941
1942 void AsyncSocket::startFail() {
1943   // startFail() should only be called once
1944   assert(state_ != StateEnum::ERROR);
1945   assert(getDestructorGuardCount() > 0);
1946   state_ = StateEnum::ERROR;
1947   // Ensure that SHUT_READ and SHUT_WRITE are set,
1948   // so all future attempts to read or write will be rejected
1949   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1950
1951   if (eventFlags_ != EventHandler::NONE) {
1952     eventFlags_ = EventHandler::NONE;
1953     ioHandler_.unregisterHandler();
1954   }
1955   writeTimeout_.cancelTimeout();
1956
1957   if (fd_ >= 0) {
1958     ioHandler_.changeHandlerFD(-1);
1959     doClose();
1960   }
1961 }
1962
1963 void AsyncSocket::finishFail() {
1964   assert(state_ == StateEnum::ERROR);
1965   assert(getDestructorGuardCount() > 0);
1966
1967   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1968                          withAddr("socket closing after error"));
1969   invokeConnectErr(ex);
1970   failAllWrites(ex);
1971
1972   if (readCallback_) {
1973     ReadCallback* callback = readCallback_;
1974     readCallback_ = nullptr;
1975     callback->readErr(ex);
1976   }
1977 }
1978
1979 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1980   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1981              << state_ << " host=" << addr_.describe()
1982              << "): failed in " << fn << "(): "
1983              << ex.what();
1984   startFail();
1985   finishFail();
1986 }
1987
1988 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1989   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1990                << state_ << " host=" << addr_.describe()
1991                << "): failed while connecting in " << fn << "(): "
1992                << ex.what();
1993   startFail();
1994
1995   invokeConnectErr(ex);
1996   finishFail();
1997 }
1998
1999 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2000   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2001                << state_ << " host=" << addr_.describe()
2002                << "): failed while reading in " << fn << "(): "
2003                << ex.what();
2004   startFail();
2005
2006   if (readCallback_ != nullptr) {
2007     ReadCallback* callback = readCallback_;
2008     readCallback_ = nullptr;
2009     callback->readErr(ex);
2010   }
2011
2012   finishFail();
2013 }
2014
2015 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2016   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2017                << state_ << " host=" << addr_.describe()
2018                << "): failed while writing in " << fn << "(): "
2019                << ex.what();
2020   startFail();
2021
2022   // Only invoke the first write callback, since the error occurred while
2023   // writing this request.  Let any other pending write callbacks be invoked in
2024   // finishFail().
2025   if (writeReqHead_ != nullptr) {
2026     WriteRequest* req = writeReqHead_;
2027     writeReqHead_ = req->getNext();
2028     WriteCallback* callback = req->getCallback();
2029     uint32_t bytesWritten = req->getTotalBytesWritten();
2030     req->destroy();
2031     if (callback) {
2032       callback->writeErr(bytesWritten, ex);
2033     }
2034   }
2035
2036   finishFail();
2037 }
2038
2039 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2040                              size_t bytesWritten,
2041                              const AsyncSocketException& ex) {
2042   // This version of failWrite() is used when the failure occurs before
2043   // we've added the callback to writeReqHead_.
2044   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2045              << state_ << " host=" << addr_.describe()
2046              <<"): failed while writing in " << fn << "(): "
2047              << ex.what();
2048   startFail();
2049
2050   if (callback != nullptr) {
2051     callback->writeErr(bytesWritten, ex);
2052   }
2053
2054   finishFail();
2055 }
2056
2057 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2058   // Invoke writeError() on all write callbacks.
2059   // This is used when writes are forcibly shutdown with write requests
2060   // pending, or when an error occurs with writes pending.
2061   while (writeReqHead_ != nullptr) {
2062     WriteRequest* req = writeReqHead_;
2063     writeReqHead_ = req->getNext();
2064     WriteCallback* callback = req->getCallback();
2065     if (callback) {
2066       callback->writeErr(req->getTotalBytesWritten(), ex);
2067     }
2068     req->destroy();
2069   }
2070 }
2071
2072 void AsyncSocket::invalidState(ConnectCallback* callback) {
2073   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2074              << "): connect() called in invalid state " << state_;
2075
2076   /*
2077    * The invalidState() methods don't use the normal failure mechanisms,
2078    * since we don't know what state we are in.  We don't want to call
2079    * startFail()/finishFail() recursively if we are already in the middle of
2080    * cleaning up.
2081    */
2082
2083   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2084                          "connect() called with socket in invalid state");
2085   connectEndTime_ = std::chrono::steady_clock::now();
2086   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2087     if (callback) {
2088       callback->connectErr(ex);
2089     }
2090   } else {
2091     // We can't use failConnect() here since connectCallback_
2092     // may already be set to another callback.  Invoke this ConnectCallback
2093     // here; any other connectCallback_ will be invoked in finishFail()
2094     startFail();
2095     if (callback) {
2096       callback->connectErr(ex);
2097     }
2098     finishFail();
2099   }
2100 }
2101
2102 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2103   connectEndTime_ = std::chrono::steady_clock::now();
2104   if (connectCallback_) {
2105     ConnectCallback* callback = connectCallback_;
2106     connectCallback_ = nullptr;
2107     callback->connectErr(ex);
2108   }
2109 }
2110
2111 void AsyncSocket::invokeConnectSuccess() {
2112   connectEndTime_ = std::chrono::steady_clock::now();
2113   if (connectCallback_) {
2114     ConnectCallback* callback = connectCallback_;
2115     connectCallback_ = nullptr;
2116     callback->connectSuccess();
2117   }
2118 }
2119
2120 void AsyncSocket::invalidState(ReadCallback* callback) {
2121   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2122              << "): setReadCallback(" << callback
2123              << ") called in invalid state " << state_;
2124
2125   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2126                          "setReadCallback() called with socket in "
2127                          "invalid state");
2128   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2129     if (callback) {
2130       callback->readErr(ex);
2131     }
2132   } else {
2133     startFail();
2134     if (callback) {
2135       callback->readErr(ex);
2136     }
2137     finishFail();
2138   }
2139 }
2140
2141 void AsyncSocket::invalidState(WriteCallback* callback) {
2142   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2143              << "): write() called in invalid state " << state_;
2144
2145   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2146                          withAddr("write() called with socket in invalid state"));
2147   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2148     if (callback) {
2149       callback->writeErr(0, ex);
2150     }
2151   } else {
2152     startFail();
2153     if (callback) {
2154       callback->writeErr(0, ex);
2155     }
2156     finishFail();
2157   }
2158 }
2159
2160 void AsyncSocket::doClose() {
2161   if (fd_ == -1) return;
2162   if (shutdownSocketSet_) {
2163     shutdownSocketSet_->close(fd_);
2164   } else {
2165     ::close(fd_);
2166   }
2167   fd_ = -1;
2168 }
2169
2170 std::ostream& operator << (std::ostream& os,
2171                            const AsyncSocket::StateEnum& state) {
2172   os << static_cast<int>(state);
2173   return os;
2174 }
2175
2176 std::string AsyncSocket::withAddr(const std::string& s) {
2177   // Don't use addr_ directly because it may not be initialized
2178   // e.g. if constructed from fd
2179   folly::SocketAddress peer, local;
2180   try {
2181     getPeerAddress(&peer);
2182     getLocalAddress(&local);
2183   } catch (const std::exception&) {
2184     // ignore
2185   } catch (...) {
2186     // ignore
2187   }
2188   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2189 }
2190
2191 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2192   bufferCallback_ = cb;
2193 }
2194
2195 } // folly