Remove per-write buffer callback from AsyncSocket
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
341                                 withAddr("failed to create socket"), errno);
342     }
343     if (shutdownSocketSet_) {
344       shutdownSocketSet_->add(fd_);
345     }
346     ioHandler_.changeHandlerFD(fd_);
347
348     setCloseOnExec();
349
350     // Put the socket in non-blocking mode
351     int flags = fcntl(fd_, F_GETFL, 0);
352     if (flags == -1) {
353       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
354                                 withAddr("failed to get socket flags"), errno);
355     }
356     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
357     if (rv == -1) {
358       throw AsyncSocketException(
359           AsyncSocketException::INTERNAL_ERROR,
360           withAddr("failed to put socket in non-blocking mode"),
361           errno);
362     }
363
364 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
365     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
366     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
367     if (rv == -1) {
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           "failed to enable F_SETNOSIGPIPE on socket",
371           errno);
372     }
373 #endif
374
375     // By default, turn on TCP_NODELAY
376     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
377     // setNoDelay() will log an error message if it fails.
378     if (address.getFamily() != AF_UNIX) {
379       (void)setNoDelay(true);
380     }
381
382     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
383             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
384
385     // bind the socket
386     if (bindAddr != anyAddress()) {
387       int one = 1;
388       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
389         doClose();
390         throw AsyncSocketException(
391           AsyncSocketException::NOT_OPEN,
392           "failed to setsockopt prior to bind on " + bindAddr.describe(),
393           errno);
394       }
395
396       bindAddr.getAddress(&addrStorage);
397
398       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
399         doClose();
400         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
401                                   "failed to bind to async socket: " +
402                                   bindAddr.describe(),
403                                   errno);
404       }
405     }
406
407     // Apply the additional options if any.
408     for (const auto& opt: options) {
409       int rv = opt.first.apply(fd_, opt.second);
410       if (rv != 0) {
411         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
412                                   withAddr("failed to set socket option"),
413                                   errno);
414       }
415     }
416
417     // Perform the connect()
418     address.getAddress(&addrStorage);
419
420     rv = ::connect(fd_, saddr, address.getActualSize());
421     if (rv < 0) {
422       if (errno == EINPROGRESS) {
423         // Connection in progress.
424         if (timeout > 0) {
425           // Start a timer in case the connection takes too long.
426           if (!writeTimeout_.scheduleTimeout(timeout)) {
427             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
428                 withAddr("failed to schedule AsyncSocket connect timeout"));
429           }
430         }
431
432         // Register for write events, so we'll
433         // be notified when the connection finishes/fails.
434         // Note that we don't register for a persistent event here.
435         assert(eventFlags_ == EventHandler::NONE);
436         eventFlags_ = EventHandler::WRITE;
437         if (!ioHandler_.registerHandler(eventFlags_)) {
438           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
439               withAddr("failed to register AsyncSocket connect handler"));
440         }
441         return;
442       } else {
443         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
444                                   "connect failed (immediately)", errno);
445       }
446     }
447
448     // If we're still here the connect() succeeded immediately.
449     // Fall through to call the callback outside of this try...catch block
450   } catch (const AsyncSocketException& ex) {
451     return failConnect(__func__, ex);
452   } catch (const std::exception& ex) {
453     // shouldn't happen, but handle it just in case
454     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
455                << "): unexpected " << typeid(ex).name() << " exception: "
456                << ex.what();
457     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
458                             withAddr(string("unexpected exception: ") +
459                                      ex.what()));
460     return failConnect(__func__, tex);
461   }
462
463   // The connection succeeded immediately
464   // The read callback may not have been set yet, and no writes may be pending
465   // yet, so we don't have to register for any events at the moment.
466   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
467   assert(readCallback_ == nullptr);
468   assert(writeReqHead_ == nullptr);
469   state_ = StateEnum::ESTABLISHED;
470   invokeConnectSuccess();
471 }
472
473 void AsyncSocket::connect(ConnectCallback* callback,
474                            const string& ip, uint16_t port,
475                            int timeout,
476                            const OptionMap &options) noexcept {
477   DestructorGuard dg(this);
478   try {
479     connectCallback_ = callback;
480     connect(callback, folly::SocketAddress(ip, port), timeout, options);
481   } catch (const std::exception& ex) {
482     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
483                             ex.what());
484     return failConnect(__func__, tex);
485   }
486 }
487
488 void AsyncSocket::cancelConnect() {
489   connectCallback_ = nullptr;
490   if (state_ == StateEnum::CONNECTING) {
491     closeNow();
492   }
493 }
494
495 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
496   sendTimeout_ = milliseconds;
497   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
498
499   // If we are currently pending on write requests, immediately update
500   // writeTimeout_ with the new value.
501   if ((eventFlags_ & EventHandler::WRITE) &&
502       (state_ != StateEnum::CONNECTING)) {
503     assert(state_ == StateEnum::ESTABLISHED);
504     assert((shutdownFlags_ & SHUT_WRITE) == 0);
505     if (sendTimeout_ > 0) {
506       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
507         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
508             withAddr("failed to reschedule send timeout in setSendTimeout"));
509         return failWrite(__func__, ex);
510       }
511     } else {
512       writeTimeout_.cancelTimeout();
513     }
514   }
515 }
516
517 void AsyncSocket::setReadCB(ReadCallback *callback) {
518   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
519           << ", callback=" << callback << ", state=" << state_;
520
521   // Short circuit if callback is the same as the existing readCallback_.
522   //
523   // Note that this is needed for proper functioning during some cleanup cases.
524   // During cleanup we allow setReadCallback(nullptr) to be called even if the
525   // read callback is already unset and we have been detached from an event
526   // base.  This check prevents us from asserting
527   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
528   if (callback == readCallback_) {
529     return;
530   }
531
532   /* We are removing a read callback */
533   if (callback == nullptr &&
534       immediateReadHandler_.isLoopCallbackScheduled()) {
535     immediateReadHandler_.cancelLoopCallback();
536   }
537
538   if (shutdownFlags_ & SHUT_READ) {
539     // Reads have already been shut down on this socket.
540     //
541     // Allow setReadCallback(nullptr) to be called in this case, but don't
542     // allow a new callback to be set.
543     //
544     // For example, setReadCallback(nullptr) can happen after an error if we
545     // invoke some other error callback before invoking readError().  The other
546     // error callback that is invoked first may go ahead and clear the read
547     // callback before we get a chance to invoke readError().
548     if (callback != nullptr) {
549       return invalidState(callback);
550     }
551     assert((eventFlags_ & EventHandler::READ) == 0);
552     readCallback_ = nullptr;
553     return;
554   }
555
556   DestructorGuard dg(this);
557   assert(eventBase_->isInEventBaseThread());
558
559   switch ((StateEnum)state_) {
560     case StateEnum::CONNECTING:
561       // For convenience, we allow the read callback to be set while we are
562       // still connecting.  We just store the callback for now.  Once the
563       // connection completes we'll register for read events.
564       readCallback_ = callback;
565       return;
566     case StateEnum::ESTABLISHED:
567     {
568       readCallback_ = callback;
569       uint16_t oldFlags = eventFlags_;
570       if (readCallback_) {
571         eventFlags_ |= EventHandler::READ;
572       } else {
573         eventFlags_ &= ~EventHandler::READ;
574       }
575
576       // Update our registration if our flags have changed
577       if (eventFlags_ != oldFlags) {
578         // We intentionally ignore the return value here.
579         // updateEventRegistration() will move us into the error state if it
580         // fails, and we don't need to do anything else here afterwards.
581         (void)updateEventRegistration();
582       }
583
584       if (readCallback_) {
585         checkForImmediateRead();
586       }
587       return;
588     }
589     case StateEnum::CLOSED:
590     case StateEnum::ERROR:
591       // We should never reach here.  SHUT_READ should always be set
592       // if we are in STATE_CLOSED or STATE_ERROR.
593       assert(false);
594       return invalidState(callback);
595     case StateEnum::UNINIT:
596       // We do not allow setReadCallback() to be called before we start
597       // connecting.
598       return invalidState(callback);
599   }
600
601   // We don't put a default case in the switch statement, so that the compiler
602   // will warn us to update the switch statement if a new state is added.
603   return invalidState(callback);
604 }
605
606 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
607   return readCallback_;
608 }
609
610 void AsyncSocket::write(WriteCallback* callback,
611                          const void* buf, size_t bytes, WriteFlags flags) {
612   iovec op;
613   op.iov_base = const_cast<void*>(buf);
614   op.iov_len = bytes;
615   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
616 }
617
618 void AsyncSocket::writev(WriteCallback* callback,
619                           const iovec* vec,
620                           size_t count,
621                           WriteFlags flags) {
622   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
623 }
624
625 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
626                               WriteFlags flags) {
627   constexpr size_t kSmallSizeMax = 64;
628   size_t count = buf->countChainElements();
629   if (count <= kSmallSizeMax) {
630     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632   } else {
633     iovec* vec = new iovec[count];
634     writeChainImpl(callback, vec, count, std::move(buf), flags);
635     delete[] vec;
636   }
637 }
638
639 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
640     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
641   size_t veclen = buf->fillIov(vec, count);
642   writeImpl(callback, vec, veclen, std::move(buf), flags);
643 }
644
645 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
646                              size_t count, unique_ptr<IOBuf>&& buf,
647                              WriteFlags flags) {
648   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
649           << ", callback=" << callback << ", count=" << count
650           << ", state=" << state_;
651   DestructorGuard dg(this);
652   unique_ptr<IOBuf>ioBuf(std::move(buf));
653   assert(eventBase_->isInEventBaseThread());
654
655   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
656     // No new writes may be performed after the write side of the socket has
657     // been shutdown.
658     //
659     // We could just call callback->writeError() here to fail just this write.
660     // However, fail hard and use invalidState() to fail all outstanding
661     // callbacks and move the socket into the error state.  There's most likely
662     // a bug in the caller's code, so we abort everything rather than trying to
663     // proceed as best we can.
664     return invalidState(callback);
665   }
666
667   uint32_t countWritten = 0;
668   uint32_t partialWritten = 0;
669   int bytesWritten = 0;
670   bool mustRegister = false;
671   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
672     if (writeReqHead_ == nullptr) {
673       // If we are established and there are no other writes pending,
674       // we can attempt to perform the write immediately.
675       assert(writeReqTail_ == nullptr);
676       assert((eventFlags_ & EventHandler::WRITE) == 0);
677
678       bytesWritten = performWrite(vec, count, flags,
679                                   &countWritten, &partialWritten);
680       if (bytesWritten < 0) {
681         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
682                                withAddr("writev failed"), errno);
683         return failWrite(__func__, callback, 0, ex);
684       } else if (countWritten == count) {
685         // We successfully wrote everything.
686         // Invoke the callback and return.
687         if (callback) {
688           callback->writeSuccess();
689         }
690         return;
691       } // else { continue writing the next writeReq }
692       mustRegister = true;
693     }
694   } else if (!connecting()) {
695     // Invalid state for writing
696     return invalidState(callback);
697   }
698
699   // Create a new WriteRequest to add to the queue
700   WriteRequest* req;
701   try {
702     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
703                                         count - countWritten, partialWritten,
704                                         bytesWritten, std::move(ioBuf), flags);
705   } catch (const std::exception& ex) {
706     // we mainly expect to catch std::bad_alloc here
707     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
708         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
709     return failWrite(__func__, callback, bytesWritten, tex);
710   }
711   req->consume();
712   if (writeReqTail_ == nullptr) {
713     assert(writeReqHead_ == nullptr);
714     writeReqHead_ = writeReqTail_ = req;
715   } else {
716     writeReqTail_->append(req);
717     writeReqTail_ = req;
718   }
719
720   // Register for write events if are established and not currently
721   // waiting on write events
722   if (mustRegister) {
723     assert(state_ == StateEnum::ESTABLISHED);
724     assert((eventFlags_ & EventHandler::WRITE) == 0);
725     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
726       assert(state_ == StateEnum::ERROR);
727       return;
728     }
729     if (sendTimeout_ > 0) {
730       // Schedule a timeout to fire if the write takes too long.
731       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
732         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
733                                withAddr("failed to schedule send timeout"));
734         return failWrite(__func__, ex);
735       }
736     }
737   }
738 }
739
740 void AsyncSocket::writeRequest(WriteRequest* req) {
741   if (writeReqTail_ == nullptr) {
742     assert(writeReqHead_ == nullptr);
743     writeReqHead_ = writeReqTail_ = req;
744     req->start();
745   } else {
746     writeReqTail_->append(req);
747     writeReqTail_ = req;
748   }
749 }
750
751 void AsyncSocket::close() {
752   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
753           << ", state=" << state_ << ", shutdownFlags="
754           << std::hex << (int) shutdownFlags_;
755
756   // close() is only different from closeNow() when there are pending writes
757   // that need to drain before we can close.  In all other cases, just call
758   // closeNow().
759   //
760   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
761   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
762   // is still running.  (e.g., If there are multiple pending writes, and we
763   // call writeError() on the first one, it may call close().  In this case we
764   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
765   // writes will still be in the queue.)
766   //
767   // We only need to drain pending writes if we are still in STATE_CONNECTING
768   // or STATE_ESTABLISHED
769   if ((writeReqHead_ == nullptr) ||
770       !(state_ == StateEnum::CONNECTING ||
771       state_ == StateEnum::ESTABLISHED)) {
772     closeNow();
773     return;
774   }
775
776   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
777   // destroyed until close() returns.
778   DestructorGuard dg(this);
779   assert(eventBase_->isInEventBaseThread());
780
781   // Since there are write requests pending, we have to set the
782   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
783   // connect finishes and we finish writing these requests.
784   //
785   // Set SHUT_READ to indicate that reads are shut down, and set the
786   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
787   // pending writes complete.
788   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
789
790   // If a read callback is set, invoke readEOF() immediately to inform it that
791   // the socket has been closed and no more data can be read.
792   if (readCallback_) {
793     // Disable reads if they are enabled
794     if (!updateEventRegistration(0, EventHandler::READ)) {
795       // We're now in the error state; callbacks have been cleaned up
796       assert(state_ == StateEnum::ERROR);
797       assert(readCallback_ == nullptr);
798     } else {
799       ReadCallback* callback = readCallback_;
800       readCallback_ = nullptr;
801       callback->readEOF();
802     }
803   }
804 }
805
806 void AsyncSocket::closeNow() {
807   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
808           << ", state=" << state_ << ", shutdownFlags="
809           << std::hex << (int) shutdownFlags_;
810   DestructorGuard dg(this);
811   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
812
813   switch (state_) {
814     case StateEnum::ESTABLISHED:
815     case StateEnum::CONNECTING:
816     {
817       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
818       state_ = StateEnum::CLOSED;
819
820       // If the write timeout was set, cancel it.
821       writeTimeout_.cancelTimeout();
822
823       // If we are registered for I/O events, unregister.
824       if (eventFlags_ != EventHandler::NONE) {
825         eventFlags_ = EventHandler::NONE;
826         if (!updateEventRegistration()) {
827           // We will have been moved into the error state.
828           assert(state_ == StateEnum::ERROR);
829           return;
830         }
831       }
832
833       if (immediateReadHandler_.isLoopCallbackScheduled()) {
834         immediateReadHandler_.cancelLoopCallback();
835       }
836
837       if (fd_ >= 0) {
838         ioHandler_.changeHandlerFD(-1);
839         doClose();
840       }
841
842       invokeConnectErr(socketClosedLocallyEx);
843
844       failAllWrites(socketClosedLocallyEx);
845
846       if (readCallback_) {
847         ReadCallback* callback = readCallback_;
848         readCallback_ = nullptr;
849         callback->readEOF();
850       }
851       return;
852     }
853     case StateEnum::CLOSED:
854       // Do nothing.  It's possible that we are being called recursively
855       // from inside a callback that we invoked inside another call to close()
856       // that is still running.
857       return;
858     case StateEnum::ERROR:
859       // Do nothing.  The error handling code has performed (or is performing)
860       // cleanup.
861       return;
862     case StateEnum::UNINIT:
863       assert(eventFlags_ == EventHandler::NONE);
864       assert(connectCallback_ == nullptr);
865       assert(readCallback_ == nullptr);
866       assert(writeReqHead_ == nullptr);
867       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
868       state_ = StateEnum::CLOSED;
869       return;
870   }
871
872   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
873               << ") called in unknown state " << state_;
874 }
875
876 void AsyncSocket::closeWithReset() {
877   // Enable SO_LINGER, with the linger timeout set to 0.
878   // This will trigger a TCP reset when we close the socket.
879   if (fd_ >= 0) {
880     struct linger optLinger = {1, 0};
881     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
882       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
883               << "on " << fd_ << ": errno=" << errno;
884     }
885   }
886
887   // Then let closeNow() take care of the rest
888   closeNow();
889 }
890
891 void AsyncSocket::shutdownWrite() {
892   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
893           << ", state=" << state_ << ", shutdownFlags="
894           << std::hex << (int) shutdownFlags_;
895
896   // If there are no pending writes, shutdownWrite() is identical to
897   // shutdownWriteNow().
898   if (writeReqHead_ == nullptr) {
899     shutdownWriteNow();
900     return;
901   }
902
903   assert(eventBase_->isInEventBaseThread());
904
905   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
906   // shutdown will be performed once all writes complete.
907   shutdownFlags_ |= SHUT_WRITE_PENDING;
908 }
909
910 void AsyncSocket::shutdownWriteNow() {
911   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
912           << ", fd=" << fd_ << ", state=" << state_
913           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
914
915   if (shutdownFlags_ & SHUT_WRITE) {
916     // Writes are already shutdown; nothing else to do.
917     return;
918   }
919
920   // If SHUT_READ is already set, just call closeNow() to completely
921   // close the socket.  This can happen if close() was called with writes
922   // pending, and then shutdownWriteNow() is called before all pending writes
923   // complete.
924   if (shutdownFlags_ & SHUT_READ) {
925     closeNow();
926     return;
927   }
928
929   DestructorGuard dg(this);
930   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
931
932   switch (static_cast<StateEnum>(state_)) {
933     case StateEnum::ESTABLISHED:
934     {
935       shutdownFlags_ |= SHUT_WRITE;
936
937       // If the write timeout was set, cancel it.
938       writeTimeout_.cancelTimeout();
939
940       // If we are registered for write events, unregister.
941       if (!updateEventRegistration(0, EventHandler::WRITE)) {
942         // We will have been moved into the error state.
943         assert(state_ == StateEnum::ERROR);
944         return;
945       }
946
947       // Shutdown writes on the file descriptor
948       ::shutdown(fd_, SHUT_WR);
949
950       // Immediately fail all write requests
951       failAllWrites(socketShutdownForWritesEx);
952       return;
953     }
954     case StateEnum::CONNECTING:
955     {
956       // Set the SHUT_WRITE_PENDING flag.
957       // When the connection completes, it will check this flag,
958       // shutdown the write half of the socket, and then set SHUT_WRITE.
959       shutdownFlags_ |= SHUT_WRITE_PENDING;
960
961       // Immediately fail all write requests
962       failAllWrites(socketShutdownForWritesEx);
963       return;
964     }
965     case StateEnum::UNINIT:
966       // Callers normally shouldn't call shutdownWriteNow() before the socket
967       // even starts connecting.  Nonetheless, go ahead and set
968       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
969       // immediately shut down the write side of the socket.
970       shutdownFlags_ |= SHUT_WRITE_PENDING;
971       return;
972     case StateEnum::CLOSED:
973     case StateEnum::ERROR:
974       // We should never get here.  SHUT_WRITE should always be set
975       // in STATE_CLOSED and STATE_ERROR.
976       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
977                  << ", fd=" << fd_ << ") in unexpected state " << state_
978                  << " with SHUT_WRITE not set ("
979                  << std::hex << (int) shutdownFlags_ << ")";
980       assert(false);
981       return;
982   }
983
984   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
985               << fd_ << ") called in unknown state " << state_;
986 }
987
988 bool AsyncSocket::readable() const {
989   if (fd_ == -1) {
990     return false;
991   }
992   struct pollfd fds[1];
993   fds[0].fd = fd_;
994   fds[0].events = POLLIN;
995   fds[0].revents = 0;
996   int rc = poll(fds, 1, 0);
997   return rc == 1;
998 }
999
1000 bool AsyncSocket::isPending() const {
1001   return ioHandler_.isPending();
1002 }
1003
1004 bool AsyncSocket::hangup() const {
1005   if (fd_ == -1) {
1006     // sanity check, no one should ask for hangup if we are not connected.
1007     assert(false);
1008     return false;
1009   }
1010 #ifdef POLLRDHUP // Linux-only
1011   struct pollfd fds[1];
1012   fds[0].fd = fd_;
1013   fds[0].events = POLLRDHUP|POLLHUP;
1014   fds[0].revents = 0;
1015   poll(fds, 1, 0);
1016   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1017 #else
1018   return false;
1019 #endif
1020 }
1021
1022 bool AsyncSocket::good() const {
1023   return ((state_ == StateEnum::CONNECTING ||
1024           state_ == StateEnum::ESTABLISHED) &&
1025           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1026 }
1027
1028 bool AsyncSocket::error() const {
1029   return (state_ == StateEnum::ERROR);
1030 }
1031
1032 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1033   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1034           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1035           << ", state=" << state_ << ", events="
1036           << std::hex << eventFlags_ << ")";
1037   assert(eventBase_ == nullptr);
1038   assert(eventBase->isInEventBaseThread());
1039
1040   eventBase_ = eventBase;
1041   ioHandler_.attachEventBase(eventBase);
1042   writeTimeout_.attachEventBase(eventBase);
1043 }
1044
1045 void AsyncSocket::detachEventBase() {
1046   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1047           << ", old evb=" << eventBase_ << ", state=" << state_
1048           << ", events=" << std::hex << eventFlags_ << ")";
1049   assert(eventBase_ != nullptr);
1050   assert(eventBase_->isInEventBaseThread());
1051
1052   eventBase_ = nullptr;
1053   ioHandler_.detachEventBase();
1054   writeTimeout_.detachEventBase();
1055 }
1056
1057 bool AsyncSocket::isDetachable() const {
1058   DCHECK(eventBase_ != nullptr);
1059   DCHECK(eventBase_->isInEventBaseThread());
1060
1061   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1062 }
1063
1064 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1065   address->setFromLocalAddress(fd_);
1066 }
1067
1068 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1069   if (!addr_.isInitialized()) {
1070     addr_.setFromPeerAddress(fd_);
1071   }
1072   *address = addr_;
1073 }
1074
1075 int AsyncSocket::setNoDelay(bool noDelay) {
1076   if (fd_ < 0) {
1077     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1078                << this << "(state=" << state_ << ")";
1079     return EINVAL;
1080
1081   }
1082
1083   int value = noDelay ? 1 : 0;
1084   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1085     int errnoCopy = errno;
1086     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1087             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1088             << strerror(errnoCopy);
1089     return errnoCopy;
1090   }
1091
1092   return 0;
1093 }
1094
1095 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1096
1097   #ifndef TCP_CONGESTION
1098   #define TCP_CONGESTION  13
1099   #endif
1100
1101   if (fd_ < 0) {
1102     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1103                << "socket " << this << "(state=" << state_ << ")";
1104     return EINVAL;
1105
1106   }
1107
1108   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1109         cname.length() + 1) != 0) {
1110     int errnoCopy = errno;
1111     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1112             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1113             << strerror(errnoCopy);
1114     return errnoCopy;
1115   }
1116
1117   return 0;
1118 }
1119
1120 int AsyncSocket::setQuickAck(bool quickack) {
1121   if (fd_ < 0) {
1122     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1123                << this << "(state=" << state_ << ")";
1124     return EINVAL;
1125
1126   }
1127
1128 #ifdef TCP_QUICKACK // Linux-only
1129   int value = quickack ? 1 : 0;
1130   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1131     int errnoCopy = errno;
1132     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1133             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1134             << strerror(errnoCopy);
1135     return errnoCopy;
1136   }
1137
1138   return 0;
1139 #else
1140   return ENOSYS;
1141 #endif
1142 }
1143
1144 int AsyncSocket::setSendBufSize(size_t bufsize) {
1145   if (fd_ < 0) {
1146     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1147                << this << "(state=" << state_ << ")";
1148     return EINVAL;
1149   }
1150
1151   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1152     int errnoCopy = errno;
1153     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1154             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1155             << strerror(errnoCopy);
1156     return errnoCopy;
1157   }
1158
1159   return 0;
1160 }
1161
1162 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1163   if (fd_ < 0) {
1164     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1165                << this << "(state=" << state_ << ")";
1166     return EINVAL;
1167   }
1168
1169   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1170     int errnoCopy = errno;
1171     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1172             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1173             << strerror(errnoCopy);
1174     return errnoCopy;
1175   }
1176
1177   return 0;
1178 }
1179
1180 int AsyncSocket::setTCPProfile(int profd) {
1181   if (fd_ < 0) {
1182     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1183                << this << "(state=" << state_ << ")";
1184     return EINVAL;
1185   }
1186
1187   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1188     int errnoCopy = errno;
1189     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1190             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1191             << strerror(errnoCopy);
1192     return errnoCopy;
1193   }
1194
1195   return 0;
1196 }
1197
1198 void AsyncSocket::setPersistentCork(bool cork) {
1199   if (setCork(cork) == 0) {
1200     persistentCork_ = cork;
1201   }
1202 }
1203
1204 int AsyncSocket::setCork(bool cork) {
1205 #ifdef TCP_CORK
1206   if (fd_ < 0) {
1207     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1208             << this << "(stats=" << state_ << ")";
1209     return EINVAL;
1210   }
1211
1212   if (corked_ == cork) {
1213     return 0;
1214   }
1215
1216   int flag = cork ? 1 : 0;
1217   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1218     int errnoCopy = errno;
1219     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1220             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1221             << folly::errnoStr(errnoCopy);
1222     return errnoCopy;
1223   }
1224   corked_ = cork;
1225 #endif
1226   return 0;
1227 }
1228
1229 void AsyncSocket::ioReady(uint16_t events) noexcept {
1230   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1231           << ", events=" << std::hex << events << ", state=" << state_;
1232   DestructorGuard dg(this);
1233   assert(events & EventHandler::READ_WRITE);
1234   assert(eventBase_->isInEventBaseThread());
1235
1236   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1237   if (relevantEvents == EventHandler::READ) {
1238     handleRead();
1239   } else if (relevantEvents == EventHandler::WRITE) {
1240     handleWrite();
1241   } else if (relevantEvents == EventHandler::READ_WRITE) {
1242     EventBase* originalEventBase = eventBase_;
1243     // If both read and write events are ready, process writes first.
1244     handleWrite();
1245
1246     // Return now if handleWrite() detached us from our EventBase
1247     if (eventBase_ != originalEventBase) {
1248       return;
1249     }
1250
1251     // Only call handleRead() if a read callback is still installed.
1252     // (It's possible that the read callback was uninstalled during
1253     // handleWrite().)
1254     if (readCallback_) {
1255       handleRead();
1256     }
1257   } else {
1258     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1259                << std::hex << events << "(this=" << this << ")";
1260     abort();
1261   }
1262 }
1263
1264 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1265   VLOG(5) << "AsyncSocket::performRead() this=" << this
1266           << ", buf=" << *buf << ", buflen=" << *buflen;
1267
1268   int recvFlags = 0;
1269   if (peek_) {
1270     recvFlags |= MSG_PEEK;
1271   }
1272
1273   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1274   if (bytes < 0) {
1275     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1276       // No more data to read right now.
1277       return READ_BLOCKING;
1278     } else {
1279       return READ_ERROR;
1280     }
1281   } else {
1282     appBytesReceived_ += bytes;
1283     return bytes;
1284   }
1285 }
1286
1287 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1288   // no matter what, buffer should be preapared for non-ssl socket
1289   CHECK(readCallback_);
1290   readCallback_->getReadBuffer(buf, buflen);
1291 }
1292
1293 void AsyncSocket::handleRead() noexcept {
1294   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1295           << ", state=" << state_;
1296   assert(state_ == StateEnum::ESTABLISHED);
1297   assert((shutdownFlags_ & SHUT_READ) == 0);
1298   assert(readCallback_ != nullptr);
1299   assert(eventFlags_ & EventHandler::READ);
1300
1301   // Loop until:
1302   // - a read attempt would block
1303   // - readCallback_ is uninstalled
1304   // - the number of loop iterations exceeds the optional maximum
1305   // - this AsyncSocket is moved to another EventBase
1306   //
1307   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1308   // which is why need to check for it here.
1309   //
1310   // The last bullet point is slightly subtle.  readDataAvailable() may also
1311   // detach this socket from this EventBase.  However, before
1312   // readDataAvailable() returns another thread may pick it up, attach it to
1313   // a different EventBase, and install another readCallback_.  We need to
1314   // exit immediately after readDataAvailable() returns if the eventBase_ has
1315   // changed.  (The caller must perform some sort of locking to transfer the
1316   // AsyncSocket between threads properly.  This will be sufficient to ensure
1317   // that this thread sees the updated eventBase_ variable after
1318   // readDataAvailable() returns.)
1319   uint16_t numReads = 0;
1320   EventBase* originalEventBase = eventBase_;
1321   while (readCallback_ && eventBase_ == originalEventBase) {
1322     // Get the buffer to read into.
1323     void* buf = nullptr;
1324     size_t buflen = 0, offset = 0;
1325     try {
1326       prepareReadBuffer(&buf, &buflen);
1327       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1328     } catch (const AsyncSocketException& ex) {
1329       return failRead(__func__, ex);
1330     } catch (const std::exception& ex) {
1331       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1332                               string("ReadCallback::getReadBuffer() "
1333                                      "threw exception: ") +
1334                               ex.what());
1335       return failRead(__func__, tex);
1336     } catch (...) {
1337       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1338                              "ReadCallback::getReadBuffer() threw "
1339                              "non-exception type");
1340       return failRead(__func__, ex);
1341     }
1342     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1343       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1344                              "ReadCallback::getReadBuffer() returned "
1345                              "empty buffer");
1346       return failRead(__func__, ex);
1347     }
1348
1349     // Perform the read
1350     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1351     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1352             << bytesRead << " bytes";
1353     if (bytesRead > 0) {
1354       if (!isBufferMovable_) {
1355         readCallback_->readDataAvailable(bytesRead);
1356       } else {
1357         CHECK(kOpenSslModeMoveBufferOwnership);
1358         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1359                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1360                 << ", offset=" << offset;
1361         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1362         readBuf->trimStart(offset);
1363         readBuf->trimEnd(buflen - offset - bytesRead);
1364         readCallback_->readBufferAvailable(std::move(readBuf));
1365       }
1366
1367       // Fall through and continue around the loop if the read
1368       // completely filled the available buffer.
1369       // Note that readCallback_ may have been uninstalled or changed inside
1370       // readDataAvailable().
1371       if (size_t(bytesRead) < buflen) {
1372         return;
1373       }
1374     } else if (bytesRead == READ_BLOCKING) {
1375         // No more data to read right now.
1376         return;
1377     } else if (bytesRead == READ_ERROR) {
1378       readErr_ = READ_ERROR;
1379       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1380                              withAddr("recv() failed"), errno);
1381       return failRead(__func__, ex);
1382     } else {
1383       assert(bytesRead == READ_EOF);
1384       readErr_ = READ_EOF;
1385       // EOF
1386       shutdownFlags_ |= SHUT_READ;
1387       if (!updateEventRegistration(0, EventHandler::READ)) {
1388         // we've already been moved into STATE_ERROR
1389         assert(state_ == StateEnum::ERROR);
1390         assert(readCallback_ == nullptr);
1391         return;
1392       }
1393
1394       ReadCallback* callback = readCallback_;
1395       readCallback_ = nullptr;
1396       callback->readEOF();
1397       return;
1398     }
1399     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1400       if (readCallback_ != nullptr) {
1401         // We might still have data in the socket.
1402         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1403         scheduleImmediateRead();
1404       }
1405       return;
1406     }
1407   }
1408 }
1409
1410 /**
1411  * This function attempts to write as much data as possible, until no more data
1412  * can be written.
1413  *
1414  * - If it sends all available data, it unregisters for write events, and stops
1415  *   the writeTimeout_.
1416  *
1417  * - If not all of the data can be sent immediately, it reschedules
1418  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1419  *   registered for write events.
1420  */
1421 void AsyncSocket::handleWrite() noexcept {
1422   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1423           << ", state=" << state_;
1424   if (state_ == StateEnum::CONNECTING) {
1425     handleConnect();
1426     return;
1427   }
1428
1429   // Normal write
1430   assert(state_ == StateEnum::ESTABLISHED);
1431   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1432   assert(writeReqHead_ != nullptr);
1433
1434   // Loop until we run out of write requests,
1435   // or until this socket is moved to another EventBase.
1436   // (See the comment in handleRead() explaining how this can happen.)
1437   EventBase* originalEventBase = eventBase_;
1438   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1439     if (!writeReqHead_->performWrite()) {
1440       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1441                              withAddr("writev() failed"), errno);
1442       return failWrite(__func__, ex);
1443     } else if (writeReqHead_->isComplete()) {
1444       // We finished this request
1445       WriteRequest* req = writeReqHead_;
1446       writeReqHead_ = req->getNext();
1447
1448       if (writeReqHead_ == nullptr) {
1449         writeReqTail_ = nullptr;
1450         // This is the last write request.
1451         // Unregister for write events and cancel the send timer
1452         // before we invoke the callback.  We have to update the state properly
1453         // before calling the callback, since it may want to detach us from
1454         // the EventBase.
1455         if (eventFlags_ & EventHandler::WRITE) {
1456           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1457             assert(state_ == StateEnum::ERROR);
1458             return;
1459           }
1460           // Stop the send timeout
1461           writeTimeout_.cancelTimeout();
1462         }
1463         assert(!writeTimeout_.isScheduled());
1464
1465         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1466         // we finish sending the last write request.
1467         //
1468         // We have to do this before invoking writeSuccess(), since
1469         // writeSuccess() may detach us from our EventBase.
1470         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1471           assert(connectCallback_ == nullptr);
1472           shutdownFlags_ |= SHUT_WRITE;
1473
1474           if (shutdownFlags_ & SHUT_READ) {
1475             // Reads have already been shutdown.  Fully close the socket and
1476             // move to STATE_CLOSED.
1477             //
1478             // Note: This code currently moves us to STATE_CLOSED even if
1479             // close() hasn't ever been called.  This can occur if we have
1480             // received EOF from the peer and shutdownWrite() has been called
1481             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1482             // case, until close() is actually called?  I can't think of a
1483             // reason why we would need to do so.  No other operations besides
1484             // calling close() or destroying the socket can be performed at
1485             // this point.
1486             assert(readCallback_ == nullptr);
1487             state_ = StateEnum::CLOSED;
1488             if (fd_ >= 0) {
1489               ioHandler_.changeHandlerFD(-1);
1490               doClose();
1491             }
1492           } else {
1493             // Reads are still enabled, so we are only doing a half-shutdown
1494             ::shutdown(fd_, SHUT_WR);
1495           }
1496         }
1497       }
1498
1499       // Invoke the callback
1500       WriteCallback* callback = req->getCallback();
1501       req->destroy();
1502       if (callback) {
1503         callback->writeSuccess();
1504       }
1505       // We'll continue around the loop, trying to write another request
1506     } else {
1507       // Partial write.
1508       writeReqHead_->consume();
1509       // Stop after a partial write; it's highly likely that a subsequent write
1510       // attempt will just return EAGAIN.
1511       //
1512       // Ensure that we are registered for write events.
1513       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1514         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1515           assert(state_ == StateEnum::ERROR);
1516           return;
1517         }
1518       }
1519
1520       // Reschedule the send timeout, since we have made some write progress.
1521       if (sendTimeout_ > 0) {
1522         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1523           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1524               withAddr("failed to reschedule write timeout"));
1525           return failWrite(__func__, ex);
1526         }
1527       }
1528       return;
1529     }
1530   }
1531 }
1532
1533 void AsyncSocket::checkForImmediateRead() noexcept {
1534   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1535   // (However, note that some subclasses do override this method.)
1536   //
1537   // Simply calling handleRead() here would be bad, as this would call
1538   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1539   // buffer even though no data may be available.  This would waste lots of
1540   // memory, since the buffer will sit around unused until the socket actually
1541   // becomes readable.
1542   //
1543   // Checking if the socket is readable now also seems like it would probably
1544   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1545   // would just waste an extra system call.  Even if it is readable, waiting to
1546   // find out from libevent on the next event loop doesn't seem that bad.
1547 }
1548
1549 void AsyncSocket::handleInitialReadWrite() noexcept {
1550   // Our callers should already be holding a DestructorGuard, but grab
1551   // one here just to make sure, in case one of our calling code paths ever
1552   // changes.
1553   DestructorGuard dg(this);
1554
1555   // If we have a readCallback_, make sure we enable read events.  We
1556   // may already be registered for reads if connectSuccess() set
1557   // the read calback.
1558   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1559     assert(state_ == StateEnum::ESTABLISHED);
1560     assert((shutdownFlags_ & SHUT_READ) == 0);
1561     if (!updateEventRegistration(EventHandler::READ, 0)) {
1562       assert(state_ == StateEnum::ERROR);
1563       return;
1564     }
1565     checkForImmediateRead();
1566   } else if (readCallback_ == nullptr) {
1567     // Unregister for read events.
1568     updateEventRegistration(0, EventHandler::READ);
1569   }
1570
1571   // If we have write requests pending, try to send them immediately.
1572   // Since we just finished accepting, there is a very good chance that we can
1573   // write without blocking.
1574   //
1575   // However, we only process them if EventHandler::WRITE is not already set,
1576   // which means that we're already blocked on a write attempt.  (This can
1577   // happen if connectSuccess() called write() before returning.)
1578   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1579     // Call handleWrite() to perform write processing.
1580     handleWrite();
1581   } else if (writeReqHead_ == nullptr) {
1582     // Unregister for write event.
1583     updateEventRegistration(0, EventHandler::WRITE);
1584   }
1585 }
1586
1587 void AsyncSocket::handleConnect() noexcept {
1588   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1589           << ", state=" << state_;
1590   assert(state_ == StateEnum::CONNECTING);
1591   // SHUT_WRITE can never be set while we are still connecting;
1592   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1593   // finishes
1594   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1595
1596   // In case we had a connect timeout, cancel the timeout
1597   writeTimeout_.cancelTimeout();
1598   // We don't use a persistent registration when waiting on a connect event,
1599   // so we have been automatically unregistered now.  Update eventFlags_ to
1600   // reflect reality.
1601   assert(eventFlags_ == EventHandler::WRITE);
1602   eventFlags_ = EventHandler::NONE;
1603
1604   // Call getsockopt() to check if the connect succeeded
1605   int error;
1606   socklen_t len = sizeof(error);
1607   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1608   if (rv != 0) {
1609     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1610                            withAddr("error calling getsockopt() after connect"),
1611                            errno);
1612     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1613                << fd_ << " host=" << addr_.describe()
1614                << ") exception:" << ex.what();
1615     return failConnect(__func__, ex);
1616   }
1617
1618   if (error != 0) {
1619     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1620                            "connect failed", error);
1621     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1622             << fd_ << " host=" << addr_.describe()
1623             << ") exception: " << ex.what();
1624     return failConnect(__func__, ex);
1625   }
1626
1627   // Move into STATE_ESTABLISHED
1628   state_ = StateEnum::ESTABLISHED;
1629
1630   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1631   // perform, immediately shutdown the write half of the socket.
1632   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1633     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1634     // are still connecting we just abort the connect rather than waiting for
1635     // it to complete.
1636     assert((shutdownFlags_ & SHUT_READ) == 0);
1637     ::shutdown(fd_, SHUT_WR);
1638     shutdownFlags_ |= SHUT_WRITE;
1639   }
1640
1641   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1642           << "successfully connected; state=" << state_;
1643
1644   // Remember the EventBase we are attached to, before we start invoking any
1645   // callbacks (since the callbacks may call detachEventBase()).
1646   EventBase* originalEventBase = eventBase_;
1647
1648   invokeConnectSuccess();
1649   // Note that the connect callback may have changed our state.
1650   // (set or unset the read callback, called write(), closed the socket, etc.)
1651   // The following code needs to handle these situations correctly.
1652   //
1653   // If the socket has been closed, readCallback_ and writeReqHead_ will
1654   // always be nullptr, so that will prevent us from trying to read or write.
1655   //
1656   // The main thing to check for is if eventBase_ is still originalEventBase.
1657   // If not, we have been detached from this event base, so we shouldn't
1658   // perform any more operations.
1659   if (eventBase_ != originalEventBase) {
1660     return;
1661   }
1662
1663   handleInitialReadWrite();
1664 }
1665
1666 void AsyncSocket::timeoutExpired() noexcept {
1667   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1668           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1669   DestructorGuard dg(this);
1670   assert(eventBase_->isInEventBaseThread());
1671
1672   if (state_ == StateEnum::CONNECTING) {
1673     // connect() timed out
1674     // Unregister for I/O events.
1675     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1676                            "connect timed out");
1677     failConnect(__func__, ex);
1678   } else {
1679     // a normal write operation timed out
1680     assert(state_ == StateEnum::ESTABLISHED);
1681     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1682     failWrite(__func__, ex);
1683   }
1684 }
1685
1686 ssize_t AsyncSocket::performWrite(const iovec* vec,
1687                                    uint32_t count,
1688                                    WriteFlags flags,
1689                                    uint32_t* countWritten,
1690                                    uint32_t* partialWritten) {
1691   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1692   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1693   // (since it may terminate the program if the main program doesn't explicitly
1694   // ignore it).
1695   struct msghdr msg;
1696   msg.msg_name = nullptr;
1697   msg.msg_namelen = 0;
1698   msg.msg_iov = const_cast<iovec *>(vec);
1699 #ifdef IOV_MAX // not defined on Android
1700   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1701 #else
1702   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1703 #endif
1704   msg.msg_control = nullptr;
1705   msg.msg_controllen = 0;
1706   msg.msg_flags = 0;
1707
1708   int msg_flags = MSG_DONTWAIT;
1709
1710 #ifdef MSG_NOSIGNAL // Linux-only
1711   msg_flags |= MSG_NOSIGNAL;
1712   if (isSet(flags, WriteFlags::CORK)) {
1713     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1714     // give it the rest of the data rather than immediately sending a partial
1715     // frame, even when TCP_NODELAY is enabled.
1716     msg_flags |= MSG_MORE;
1717   }
1718 #endif
1719   if (isSet(flags, WriteFlags::EOR)) {
1720     // marks that this is the last byte of a record (response)
1721     msg_flags |= MSG_EOR;
1722   }
1723   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1724   if (totalWritten < 0) {
1725     if (errno == EAGAIN) {
1726       // TCP buffer is full; we can't write any more data right now.
1727       *countWritten = 0;
1728       *partialWritten = 0;
1729       return 0;
1730     }
1731     // error
1732     *countWritten = 0;
1733     *partialWritten = 0;
1734     return -1;
1735   }
1736
1737   appBytesWritten_ += totalWritten;
1738
1739   uint32_t bytesWritten;
1740   uint32_t n;
1741   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1742     const iovec* v = vec + n;
1743     if (v->iov_len > bytesWritten) {
1744       // Partial write finished in the middle of this iovec
1745       *countWritten = n;
1746       *partialWritten = bytesWritten;
1747       return totalWritten;
1748     }
1749
1750     bytesWritten -= v->iov_len;
1751   }
1752
1753   assert(bytesWritten == 0);
1754   *countWritten = n;
1755   *partialWritten = 0;
1756   return totalWritten;
1757 }
1758
1759 /**
1760  * Re-register the EventHandler after eventFlags_ has changed.
1761  *
1762  * If an error occurs, fail() is called to move the socket into the error state
1763  * and call all currently installed callbacks.  After an error, the
1764  * AsyncSocket is completely unregistered.
1765  *
1766  * @return Returns true on succcess, or false on error.
1767  */
1768 bool AsyncSocket::updateEventRegistration() {
1769   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1770           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1771           << ", events=" << std::hex << eventFlags_;
1772   assert(eventBase_->isInEventBaseThread());
1773   if (eventFlags_ == EventHandler::NONE) {
1774     ioHandler_.unregisterHandler();
1775     return true;
1776   }
1777
1778   // Always register for persistent events, so we don't have to re-register
1779   // after being called back.
1780   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1781     eventFlags_ = EventHandler::NONE; // we're not registered after error
1782     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1783         withAddr("failed to update AsyncSocket event registration"));
1784     fail("updateEventRegistration", ex);
1785     return false;
1786   }
1787
1788   return true;
1789 }
1790
1791 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1792                                            uint16_t disable) {
1793   uint16_t oldFlags = eventFlags_;
1794   eventFlags_ |= enable;
1795   eventFlags_ &= ~disable;
1796   if (eventFlags_ == oldFlags) {
1797     return true;
1798   } else {
1799     return updateEventRegistration();
1800   }
1801 }
1802
1803 void AsyncSocket::startFail() {
1804   // startFail() should only be called once
1805   assert(state_ != StateEnum::ERROR);
1806   assert(getDestructorGuardCount() > 0);
1807   state_ = StateEnum::ERROR;
1808   // Ensure that SHUT_READ and SHUT_WRITE are set,
1809   // so all future attempts to read or write will be rejected
1810   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1811
1812   if (eventFlags_ != EventHandler::NONE) {
1813     eventFlags_ = EventHandler::NONE;
1814     ioHandler_.unregisterHandler();
1815   }
1816   writeTimeout_.cancelTimeout();
1817
1818   if (fd_ >= 0) {
1819     ioHandler_.changeHandlerFD(-1);
1820     doClose();
1821   }
1822 }
1823
1824 void AsyncSocket::finishFail() {
1825   assert(state_ == StateEnum::ERROR);
1826   assert(getDestructorGuardCount() > 0);
1827
1828   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1829                          withAddr("socket closing after error"));
1830   invokeConnectErr(ex);
1831   failAllWrites(ex);
1832
1833   if (readCallback_) {
1834     ReadCallback* callback = readCallback_;
1835     readCallback_ = nullptr;
1836     callback->readErr(ex);
1837   }
1838 }
1839
1840 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1841   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1842              << state_ << " host=" << addr_.describe()
1843              << "): failed in " << fn << "(): "
1844              << ex.what();
1845   startFail();
1846   finishFail();
1847 }
1848
1849 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1850   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1851                << state_ << " host=" << addr_.describe()
1852                << "): failed while connecting in " << fn << "(): "
1853                << ex.what();
1854   startFail();
1855
1856   invokeConnectErr(ex);
1857   finishFail();
1858 }
1859
1860 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1861   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1862                << state_ << " host=" << addr_.describe()
1863                << "): failed while reading in " << fn << "(): "
1864                << ex.what();
1865   startFail();
1866
1867   if (readCallback_ != nullptr) {
1868     ReadCallback* callback = readCallback_;
1869     readCallback_ = nullptr;
1870     callback->readErr(ex);
1871   }
1872
1873   finishFail();
1874 }
1875
1876 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1877   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1878                << state_ << " host=" << addr_.describe()
1879                << "): failed while writing in " << fn << "(): "
1880                << ex.what();
1881   startFail();
1882
1883   // Only invoke the first write callback, since the error occurred while
1884   // writing this request.  Let any other pending write callbacks be invoked in
1885   // finishFail().
1886   if (writeReqHead_ != nullptr) {
1887     WriteRequest* req = writeReqHead_;
1888     writeReqHead_ = req->getNext();
1889     WriteCallback* callback = req->getCallback();
1890     uint32_t bytesWritten = req->getTotalBytesWritten();
1891     req->destroy();
1892     if (callback) {
1893       callback->writeErr(bytesWritten, ex);
1894     }
1895   }
1896
1897   finishFail();
1898 }
1899
1900 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1901                              size_t bytesWritten,
1902                              const AsyncSocketException& ex) {
1903   // This version of failWrite() is used when the failure occurs before
1904   // we've added the callback to writeReqHead_.
1905   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1906              << state_ << " host=" << addr_.describe()
1907              <<"): failed while writing in " << fn << "(): "
1908              << ex.what();
1909   startFail();
1910
1911   if (callback != nullptr) {
1912     callback->writeErr(bytesWritten, ex);
1913   }
1914
1915   finishFail();
1916 }
1917
1918 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1919   // Invoke writeError() on all write callbacks.
1920   // This is used when writes are forcibly shutdown with write requests
1921   // pending, or when an error occurs with writes pending.
1922   while (writeReqHead_ != nullptr) {
1923     WriteRequest* req = writeReqHead_;
1924     writeReqHead_ = req->getNext();
1925     WriteCallback* callback = req->getCallback();
1926     if (callback) {
1927       callback->writeErr(req->getTotalBytesWritten(), ex);
1928     }
1929     req->destroy();
1930   }
1931 }
1932
1933 void AsyncSocket::invalidState(ConnectCallback* callback) {
1934   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1935              << "): connect() called in invalid state " << state_;
1936
1937   /*
1938    * The invalidState() methods don't use the normal failure mechanisms,
1939    * since we don't know what state we are in.  We don't want to call
1940    * startFail()/finishFail() recursively if we are already in the middle of
1941    * cleaning up.
1942    */
1943
1944   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1945                          "connect() called with socket in invalid state");
1946   connectEndTime_ = std::chrono::steady_clock::now();
1947   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1948     if (callback) {
1949       callback->connectErr(ex);
1950     }
1951   } else {
1952     // We can't use failConnect() here since connectCallback_
1953     // may already be set to another callback.  Invoke this ConnectCallback
1954     // here; any other connectCallback_ will be invoked in finishFail()
1955     startFail();
1956     if (callback) {
1957       callback->connectErr(ex);
1958     }
1959     finishFail();
1960   }
1961 }
1962
1963 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1964   connectEndTime_ = std::chrono::steady_clock::now();
1965   if (connectCallback_) {
1966     ConnectCallback* callback = connectCallback_;
1967     connectCallback_ = nullptr;
1968     callback->connectErr(ex);
1969   }
1970 }
1971
1972 void AsyncSocket::invokeConnectSuccess() {
1973   connectEndTime_ = std::chrono::steady_clock::now();
1974   if (connectCallback_) {
1975     ConnectCallback* callback = connectCallback_;
1976     connectCallback_ = nullptr;
1977     callback->connectSuccess();
1978   }
1979 }
1980
1981 void AsyncSocket::invalidState(ReadCallback* callback) {
1982   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1983              << "): setReadCallback(" << callback
1984              << ") called in invalid state " << state_;
1985
1986   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1987                          "setReadCallback() called with socket in "
1988                          "invalid state");
1989   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1990     if (callback) {
1991       callback->readErr(ex);
1992     }
1993   } else {
1994     startFail();
1995     if (callback) {
1996       callback->readErr(ex);
1997     }
1998     finishFail();
1999   }
2000 }
2001
2002 void AsyncSocket::invalidState(WriteCallback* callback) {
2003   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2004              << "): write() called in invalid state " << state_;
2005
2006   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2007                          withAddr("write() called with socket in invalid state"));
2008   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2009     if (callback) {
2010       callback->writeErr(0, ex);
2011     }
2012   } else {
2013     startFail();
2014     if (callback) {
2015       callback->writeErr(0, ex);
2016     }
2017     finishFail();
2018   }
2019 }
2020
2021 void AsyncSocket::doClose() {
2022   if (fd_ == -1) return;
2023   if (shutdownSocketSet_) {
2024     shutdownSocketSet_->close(fd_);
2025   } else {
2026     ::close(fd_);
2027   }
2028   fd_ = -1;
2029 }
2030
2031 std::ostream& operator << (std::ostream& os,
2032                            const AsyncSocket::StateEnum& state) {
2033   os << static_cast<int>(state);
2034   return os;
2035 }
2036
2037 std::string AsyncSocket::withAddr(const std::string& s) {
2038   // Don't use addr_ directly because it may not be initialized
2039   // e.g. if constructed from fd
2040   folly::SocketAddress peer, local;
2041   try {
2042     getPeerAddress(&peer);
2043     getLocalAddress(&local);
2044   } catch (const std::exception&) {
2045     // ignore
2046   } catch (...) {
2047     // ignore
2048   }
2049   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2050 }
2051
2052 } // folly