Add mechanizm for caching local and peer addresses in AsyncSSLSocket.
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
341                                 withAddr("failed to create socket"), errno);
342     }
343     if (shutdownSocketSet_) {
344       shutdownSocketSet_->add(fd_);
345     }
346     ioHandler_.changeHandlerFD(fd_);
347
348     setCloseOnExec();
349
350     // Put the socket in non-blocking mode
351     int flags = fcntl(fd_, F_GETFL, 0);
352     if (flags == -1) {
353       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
354                                 withAddr("failed to get socket flags"), errno);
355     }
356     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
357     if (rv == -1) {
358       throw AsyncSocketException(
359           AsyncSocketException::INTERNAL_ERROR,
360           withAddr("failed to put socket in non-blocking mode"),
361           errno);
362     }
363
364 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
365     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
366     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
367     if (rv == -1) {
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           "failed to enable F_SETNOSIGPIPE on socket",
371           errno);
372     }
373 #endif
374
375     // By default, turn on TCP_NODELAY
376     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
377     // setNoDelay() will log an error message if it fails.
378     if (address.getFamily() != AF_UNIX) {
379       (void)setNoDelay(true);
380     }
381
382     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
383             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
384
385     // bind the socket
386     if (bindAddr != anyAddress()) {
387       int one = 1;
388       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
389         doClose();
390         throw AsyncSocketException(
391           AsyncSocketException::NOT_OPEN,
392           "failed to setsockopt prior to bind on " + bindAddr.describe(),
393           errno);
394       }
395
396       bindAddr.getAddress(&addrStorage);
397
398       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
399         doClose();
400         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
401                                   "failed to bind to async socket: " +
402                                   bindAddr.describe(),
403                                   errno);
404       }
405     }
406
407     // Apply the additional options if any.
408     for (const auto& opt: options) {
409       int rv = opt.first.apply(fd_, opt.second);
410       if (rv != 0) {
411         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
412                                   withAddr("failed to set socket option"),
413                                   errno);
414       }
415     }
416
417     // Perform the connect()
418     address.getAddress(&addrStorage);
419
420     rv = ::connect(fd_, saddr, address.getActualSize());
421     if (rv < 0) {
422       if (errno == EINPROGRESS) {
423         // Connection in progress.
424         if (timeout > 0) {
425           // Start a timer in case the connection takes too long.
426           if (!writeTimeout_.scheduleTimeout(timeout)) {
427             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
428                 withAddr("failed to schedule AsyncSocket connect timeout"));
429           }
430         }
431
432         // Register for write events, so we'll
433         // be notified when the connection finishes/fails.
434         // Note that we don't register for a persistent event here.
435         assert(eventFlags_ == EventHandler::NONE);
436         eventFlags_ = EventHandler::WRITE;
437         if (!ioHandler_.registerHandler(eventFlags_)) {
438           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
439               withAddr("failed to register AsyncSocket connect handler"));
440         }
441         return;
442       } else {
443         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
444                                   "connect failed (immediately)", errno);
445       }
446     }
447
448     // If we're still here the connect() succeeded immediately.
449     // Fall through to call the callback outside of this try...catch block
450   } catch (const AsyncSocketException& ex) {
451     return failConnect(__func__, ex);
452   } catch (const std::exception& ex) {
453     // shouldn't happen, but handle it just in case
454     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
455                << "): unexpected " << typeid(ex).name() << " exception: "
456                << ex.what();
457     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
458                             withAddr(string("unexpected exception: ") +
459                                      ex.what()));
460     return failConnect(__func__, tex);
461   }
462
463   // The connection succeeded immediately
464   // The read callback may not have been set yet, and no writes may be pending
465   // yet, so we don't have to register for any events at the moment.
466   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
467   assert(readCallback_ == nullptr);
468   assert(writeReqHead_ == nullptr);
469   state_ = StateEnum::ESTABLISHED;
470   invokeConnectSuccess();
471 }
472
473 void AsyncSocket::connect(ConnectCallback* callback,
474                            const string& ip, uint16_t port,
475                            int timeout,
476                            const OptionMap &options) noexcept {
477   DestructorGuard dg(this);
478   try {
479     connectCallback_ = callback;
480     connect(callback, folly::SocketAddress(ip, port), timeout, options);
481   } catch (const std::exception& ex) {
482     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
483                             ex.what());
484     return failConnect(__func__, tex);
485   }
486 }
487
488 void AsyncSocket::cancelConnect() {
489   connectCallback_ = nullptr;
490   if (state_ == StateEnum::CONNECTING) {
491     closeNow();
492   }
493 }
494
495 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
496   sendTimeout_ = milliseconds;
497   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
498
499   // If we are currently pending on write requests, immediately update
500   // writeTimeout_ with the new value.
501   if ((eventFlags_ & EventHandler::WRITE) &&
502       (state_ != StateEnum::CONNECTING)) {
503     assert(state_ == StateEnum::ESTABLISHED);
504     assert((shutdownFlags_ & SHUT_WRITE) == 0);
505     if (sendTimeout_ > 0) {
506       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
507         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
508             withAddr("failed to reschedule send timeout in setSendTimeout"));
509         return failWrite(__func__, ex);
510       }
511     } else {
512       writeTimeout_.cancelTimeout();
513     }
514   }
515 }
516
517 void AsyncSocket::setReadCB(ReadCallback *callback) {
518   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
519           << ", callback=" << callback << ", state=" << state_;
520
521   // Short circuit if callback is the same as the existing readCallback_.
522   //
523   // Note that this is needed for proper functioning during some cleanup cases.
524   // During cleanup we allow setReadCallback(nullptr) to be called even if the
525   // read callback is already unset and we have been detached from an event
526   // base.  This check prevents us from asserting
527   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
528   if (callback == readCallback_) {
529     return;
530   }
531
532   /* We are removing a read callback */
533   if (callback == nullptr &&
534       immediateReadHandler_.isLoopCallbackScheduled()) {
535     immediateReadHandler_.cancelLoopCallback();
536   }
537
538   if (shutdownFlags_ & SHUT_READ) {
539     // Reads have already been shut down on this socket.
540     //
541     // Allow setReadCallback(nullptr) to be called in this case, but don't
542     // allow a new callback to be set.
543     //
544     // For example, setReadCallback(nullptr) can happen after an error if we
545     // invoke some other error callback before invoking readError().  The other
546     // error callback that is invoked first may go ahead and clear the read
547     // callback before we get a chance to invoke readError().
548     if (callback != nullptr) {
549       return invalidState(callback);
550     }
551     assert((eventFlags_ & EventHandler::READ) == 0);
552     readCallback_ = nullptr;
553     return;
554   }
555
556   DestructorGuard dg(this);
557   assert(eventBase_->isInEventBaseThread());
558
559   switch ((StateEnum)state_) {
560     case StateEnum::CONNECTING:
561       // For convenience, we allow the read callback to be set while we are
562       // still connecting.  We just store the callback for now.  Once the
563       // connection completes we'll register for read events.
564       readCallback_ = callback;
565       return;
566     case StateEnum::ESTABLISHED:
567     {
568       readCallback_ = callback;
569       uint16_t oldFlags = eventFlags_;
570       if (readCallback_) {
571         eventFlags_ |= EventHandler::READ;
572       } else {
573         eventFlags_ &= ~EventHandler::READ;
574       }
575
576       // Update our registration if our flags have changed
577       if (eventFlags_ != oldFlags) {
578         // We intentionally ignore the return value here.
579         // updateEventRegistration() will move us into the error state if it
580         // fails, and we don't need to do anything else here afterwards.
581         (void)updateEventRegistration();
582       }
583
584       if (readCallback_) {
585         checkForImmediateRead();
586       }
587       return;
588     }
589     case StateEnum::CLOSED:
590     case StateEnum::ERROR:
591       // We should never reach here.  SHUT_READ should always be set
592       // if we are in STATE_CLOSED or STATE_ERROR.
593       assert(false);
594       return invalidState(callback);
595     case StateEnum::UNINIT:
596       // We do not allow setReadCallback() to be called before we start
597       // connecting.
598       return invalidState(callback);
599   }
600
601   // We don't put a default case in the switch statement, so that the compiler
602   // will warn us to update the switch statement if a new state is added.
603   return invalidState(callback);
604 }
605
606 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
607   return readCallback_;
608 }
609
610 void AsyncSocket::write(WriteCallback* callback,
611                          const void* buf, size_t bytes, WriteFlags flags) {
612   iovec op;
613   op.iov_base = const_cast<void*>(buf);
614   op.iov_len = bytes;
615   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
616 }
617
618 void AsyncSocket::writev(WriteCallback* callback,
619                           const iovec* vec,
620                           size_t count,
621                           WriteFlags flags) {
622   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
623 }
624
625 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
626                               WriteFlags flags) {
627   constexpr size_t kSmallSizeMax = 64;
628   size_t count = buf->countChainElements();
629   if (count <= kSmallSizeMax) {
630     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632   } else {
633     iovec* vec = new iovec[count];
634     writeChainImpl(callback, vec, count, std::move(buf), flags);
635     delete[] vec;
636   }
637 }
638
639 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
640     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
641   size_t veclen = buf->fillIov(vec, count);
642   writeImpl(callback, vec, veclen, std::move(buf), flags);
643 }
644
645 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
646                              size_t count, unique_ptr<IOBuf>&& buf,
647                              WriteFlags flags) {
648   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
649           << ", callback=" << callback << ", count=" << count
650           << ", state=" << state_;
651   DestructorGuard dg(this);
652   unique_ptr<IOBuf>ioBuf(std::move(buf));
653   assert(eventBase_->isInEventBaseThread());
654
655   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
656     // No new writes may be performed after the write side of the socket has
657     // been shutdown.
658     //
659     // We could just call callback->writeError() here to fail just this write.
660     // However, fail hard and use invalidState() to fail all outstanding
661     // callbacks and move the socket into the error state.  There's most likely
662     // a bug in the caller's code, so we abort everything rather than trying to
663     // proceed as best we can.
664     return invalidState(callback);
665   }
666
667   uint32_t countWritten = 0;
668   uint32_t partialWritten = 0;
669   int bytesWritten = 0;
670   bool mustRegister = false;
671   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
672     if (writeReqHead_ == nullptr) {
673       // If we are established and there are no other writes pending,
674       // we can attempt to perform the write immediately.
675       assert(writeReqTail_ == nullptr);
676       assert((eventFlags_ & EventHandler::WRITE) == 0);
677
678       bytesWritten = performWrite(vec, count, flags,
679                                   &countWritten, &partialWritten);
680       if (bytesWritten < 0) {
681         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
682                                withAddr("writev failed"), errno);
683         return failWrite(__func__, callback, 0, ex);
684       } else if (countWritten == count) {
685         // We successfully wrote everything.
686         // Invoke the callback and return.
687         if (callback) {
688           callback->writeSuccess();
689         }
690         return;
691       } else { // continue writing the next writeReq
692         if (bufferCallback_) {
693           bufferCallback_->onEgressBuffered();
694         }
695       }
696       mustRegister = true;
697     }
698   } else if (!connecting()) {
699     // Invalid state for writing
700     return invalidState(callback);
701   }
702
703   // Create a new WriteRequest to add to the queue
704   WriteRequest* req;
705   try {
706     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
707                                         count - countWritten, partialWritten,
708                                         bytesWritten, std::move(ioBuf), flags);
709   } catch (const std::exception& ex) {
710     // we mainly expect to catch std::bad_alloc here
711     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
712         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
713     return failWrite(__func__, callback, bytesWritten, tex);
714   }
715   req->consume();
716   if (writeReqTail_ == nullptr) {
717     assert(writeReqHead_ == nullptr);
718     writeReqHead_ = writeReqTail_ = req;
719   } else {
720     writeReqTail_->append(req);
721     writeReqTail_ = req;
722   }
723
724   // Register for write events if are established and not currently
725   // waiting on write events
726   if (mustRegister) {
727     assert(state_ == StateEnum::ESTABLISHED);
728     assert((eventFlags_ & EventHandler::WRITE) == 0);
729     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
730       assert(state_ == StateEnum::ERROR);
731       return;
732     }
733     if (sendTimeout_ > 0) {
734       // Schedule a timeout to fire if the write takes too long.
735       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
736         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
737                                withAddr("failed to schedule send timeout"));
738         return failWrite(__func__, ex);
739       }
740     }
741   }
742 }
743
744 void AsyncSocket::writeRequest(WriteRequest* req) {
745   if (writeReqTail_ == nullptr) {
746     assert(writeReqHead_ == nullptr);
747     writeReqHead_ = writeReqTail_ = req;
748     req->start();
749   } else {
750     writeReqTail_->append(req);
751     writeReqTail_ = req;
752   }
753 }
754
755 void AsyncSocket::close() {
756   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
757           << ", state=" << state_ << ", shutdownFlags="
758           << std::hex << (int) shutdownFlags_;
759
760   // close() is only different from closeNow() when there are pending writes
761   // that need to drain before we can close.  In all other cases, just call
762   // closeNow().
763   //
764   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
765   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
766   // is still running.  (e.g., If there are multiple pending writes, and we
767   // call writeError() on the first one, it may call close().  In this case we
768   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
769   // writes will still be in the queue.)
770   //
771   // We only need to drain pending writes if we are still in STATE_CONNECTING
772   // or STATE_ESTABLISHED
773   if ((writeReqHead_ == nullptr) ||
774       !(state_ == StateEnum::CONNECTING ||
775       state_ == StateEnum::ESTABLISHED)) {
776     closeNow();
777     return;
778   }
779
780   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
781   // destroyed until close() returns.
782   DestructorGuard dg(this);
783   assert(eventBase_->isInEventBaseThread());
784
785   // Since there are write requests pending, we have to set the
786   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
787   // connect finishes and we finish writing these requests.
788   //
789   // Set SHUT_READ to indicate that reads are shut down, and set the
790   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
791   // pending writes complete.
792   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
793
794   // If a read callback is set, invoke readEOF() immediately to inform it that
795   // the socket has been closed and no more data can be read.
796   if (readCallback_) {
797     // Disable reads if they are enabled
798     if (!updateEventRegistration(0, EventHandler::READ)) {
799       // We're now in the error state; callbacks have been cleaned up
800       assert(state_ == StateEnum::ERROR);
801       assert(readCallback_ == nullptr);
802     } else {
803       ReadCallback* callback = readCallback_;
804       readCallback_ = nullptr;
805       callback->readEOF();
806     }
807   }
808 }
809
810 void AsyncSocket::closeNow() {
811   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
812           << ", state=" << state_ << ", shutdownFlags="
813           << std::hex << (int) shutdownFlags_;
814   DestructorGuard dg(this);
815   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
816
817   switch (state_) {
818     case StateEnum::ESTABLISHED:
819     case StateEnum::CONNECTING:
820     {
821       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
822       state_ = StateEnum::CLOSED;
823
824       // If the write timeout was set, cancel it.
825       writeTimeout_.cancelTimeout();
826
827       // If we are registered for I/O events, unregister.
828       if (eventFlags_ != EventHandler::NONE) {
829         eventFlags_ = EventHandler::NONE;
830         if (!updateEventRegistration()) {
831           // We will have been moved into the error state.
832           assert(state_ == StateEnum::ERROR);
833           return;
834         }
835       }
836
837       if (immediateReadHandler_.isLoopCallbackScheduled()) {
838         immediateReadHandler_.cancelLoopCallback();
839       }
840
841       if (fd_ >= 0) {
842         ioHandler_.changeHandlerFD(-1);
843         doClose();
844       }
845
846       invokeConnectErr(socketClosedLocallyEx);
847
848       failAllWrites(socketClosedLocallyEx);
849
850       if (readCallback_) {
851         ReadCallback* callback = readCallback_;
852         readCallback_ = nullptr;
853         callback->readEOF();
854       }
855       return;
856     }
857     case StateEnum::CLOSED:
858       // Do nothing.  It's possible that we are being called recursively
859       // from inside a callback that we invoked inside another call to close()
860       // that is still running.
861       return;
862     case StateEnum::ERROR:
863       // Do nothing.  The error handling code has performed (or is performing)
864       // cleanup.
865       return;
866     case StateEnum::UNINIT:
867       assert(eventFlags_ == EventHandler::NONE);
868       assert(connectCallback_ == nullptr);
869       assert(readCallback_ == nullptr);
870       assert(writeReqHead_ == nullptr);
871       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
872       state_ = StateEnum::CLOSED;
873       return;
874   }
875
876   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
877               << ") called in unknown state " << state_;
878 }
879
880 void AsyncSocket::closeWithReset() {
881   // Enable SO_LINGER, with the linger timeout set to 0.
882   // This will trigger a TCP reset when we close the socket.
883   if (fd_ >= 0) {
884     struct linger optLinger = {1, 0};
885     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
886       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
887               << "on " << fd_ << ": errno=" << errno;
888     }
889   }
890
891   // Then let closeNow() take care of the rest
892   closeNow();
893 }
894
895 void AsyncSocket::shutdownWrite() {
896   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
897           << ", state=" << state_ << ", shutdownFlags="
898           << std::hex << (int) shutdownFlags_;
899
900   // If there are no pending writes, shutdownWrite() is identical to
901   // shutdownWriteNow().
902   if (writeReqHead_ == nullptr) {
903     shutdownWriteNow();
904     return;
905   }
906
907   assert(eventBase_->isInEventBaseThread());
908
909   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
910   // shutdown will be performed once all writes complete.
911   shutdownFlags_ |= SHUT_WRITE_PENDING;
912 }
913
914 void AsyncSocket::shutdownWriteNow() {
915   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
916           << ", fd=" << fd_ << ", state=" << state_
917           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
918
919   if (shutdownFlags_ & SHUT_WRITE) {
920     // Writes are already shutdown; nothing else to do.
921     return;
922   }
923
924   // If SHUT_READ is already set, just call closeNow() to completely
925   // close the socket.  This can happen if close() was called with writes
926   // pending, and then shutdownWriteNow() is called before all pending writes
927   // complete.
928   if (shutdownFlags_ & SHUT_READ) {
929     closeNow();
930     return;
931   }
932
933   DestructorGuard dg(this);
934   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
935
936   switch (static_cast<StateEnum>(state_)) {
937     case StateEnum::ESTABLISHED:
938     {
939       shutdownFlags_ |= SHUT_WRITE;
940
941       // If the write timeout was set, cancel it.
942       writeTimeout_.cancelTimeout();
943
944       // If we are registered for write events, unregister.
945       if (!updateEventRegistration(0, EventHandler::WRITE)) {
946         // We will have been moved into the error state.
947         assert(state_ == StateEnum::ERROR);
948         return;
949       }
950
951       // Shutdown writes on the file descriptor
952       ::shutdown(fd_, SHUT_WR);
953
954       // Immediately fail all write requests
955       failAllWrites(socketShutdownForWritesEx);
956       return;
957     }
958     case StateEnum::CONNECTING:
959     {
960       // Set the SHUT_WRITE_PENDING flag.
961       // When the connection completes, it will check this flag,
962       // shutdown the write half of the socket, and then set SHUT_WRITE.
963       shutdownFlags_ |= SHUT_WRITE_PENDING;
964
965       // Immediately fail all write requests
966       failAllWrites(socketShutdownForWritesEx);
967       return;
968     }
969     case StateEnum::UNINIT:
970       // Callers normally shouldn't call shutdownWriteNow() before the socket
971       // even starts connecting.  Nonetheless, go ahead and set
972       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
973       // immediately shut down the write side of the socket.
974       shutdownFlags_ |= SHUT_WRITE_PENDING;
975       return;
976     case StateEnum::CLOSED:
977     case StateEnum::ERROR:
978       // We should never get here.  SHUT_WRITE should always be set
979       // in STATE_CLOSED and STATE_ERROR.
980       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
981                  << ", fd=" << fd_ << ") in unexpected state " << state_
982                  << " with SHUT_WRITE not set ("
983                  << std::hex << (int) shutdownFlags_ << ")";
984       assert(false);
985       return;
986   }
987
988   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
989               << fd_ << ") called in unknown state " << state_;
990 }
991
992 bool AsyncSocket::readable() const {
993   if (fd_ == -1) {
994     return false;
995   }
996   struct pollfd fds[1];
997   fds[0].fd = fd_;
998   fds[0].events = POLLIN;
999   fds[0].revents = 0;
1000   int rc = poll(fds, 1, 0);
1001   return rc == 1;
1002 }
1003
1004 bool AsyncSocket::isPending() const {
1005   return ioHandler_.isPending();
1006 }
1007
1008 bool AsyncSocket::hangup() const {
1009   if (fd_ == -1) {
1010     // sanity check, no one should ask for hangup if we are not connected.
1011     assert(false);
1012     return false;
1013   }
1014 #ifdef POLLRDHUP // Linux-only
1015   struct pollfd fds[1];
1016   fds[0].fd = fd_;
1017   fds[0].events = POLLRDHUP|POLLHUP;
1018   fds[0].revents = 0;
1019   poll(fds, 1, 0);
1020   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1021 #else
1022   return false;
1023 #endif
1024 }
1025
1026 bool AsyncSocket::good() const {
1027   return ((state_ == StateEnum::CONNECTING ||
1028           state_ == StateEnum::ESTABLISHED) &&
1029           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1030 }
1031
1032 bool AsyncSocket::error() const {
1033   return (state_ == StateEnum::ERROR);
1034 }
1035
1036 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1037   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1038           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1039           << ", state=" << state_ << ", events="
1040           << std::hex << eventFlags_ << ")";
1041   assert(eventBase_ == nullptr);
1042   assert(eventBase->isInEventBaseThread());
1043
1044   eventBase_ = eventBase;
1045   ioHandler_.attachEventBase(eventBase);
1046   writeTimeout_.attachEventBase(eventBase);
1047 }
1048
1049 void AsyncSocket::detachEventBase() {
1050   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1051           << ", old evb=" << eventBase_ << ", state=" << state_
1052           << ", events=" << std::hex << eventFlags_ << ")";
1053   assert(eventBase_ != nullptr);
1054   assert(eventBase_->isInEventBaseThread());
1055
1056   eventBase_ = nullptr;
1057   ioHandler_.detachEventBase();
1058   writeTimeout_.detachEventBase();
1059 }
1060
1061 bool AsyncSocket::isDetachable() const {
1062   DCHECK(eventBase_ != nullptr);
1063   DCHECK(eventBase_->isInEventBaseThread());
1064
1065   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1066 }
1067
1068 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1069   if (!localAddr_.isInitialized()) {
1070     localAddr_.setFromLocalAddress(fd_);
1071   }
1072   *address = localAddr_;
1073 }
1074
1075 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1076   if (!addr_.isInitialized()) {
1077     addr_.setFromPeerAddress(fd_);
1078   }
1079   *address = addr_;
1080 }
1081
1082 int AsyncSocket::setNoDelay(bool noDelay) {
1083   if (fd_ < 0) {
1084     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1085                << this << "(state=" << state_ << ")";
1086     return EINVAL;
1087
1088   }
1089
1090   int value = noDelay ? 1 : 0;
1091   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1092     int errnoCopy = errno;
1093     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1094             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1095             << strerror(errnoCopy);
1096     return errnoCopy;
1097   }
1098
1099   return 0;
1100 }
1101
1102 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1103
1104   #ifndef TCP_CONGESTION
1105   #define TCP_CONGESTION  13
1106   #endif
1107
1108   if (fd_ < 0) {
1109     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1110                << "socket " << this << "(state=" << state_ << ")";
1111     return EINVAL;
1112
1113   }
1114
1115   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1116         cname.length() + 1) != 0) {
1117     int errnoCopy = errno;
1118     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1119             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1120             << strerror(errnoCopy);
1121     return errnoCopy;
1122   }
1123
1124   return 0;
1125 }
1126
1127 int AsyncSocket::setQuickAck(bool quickack) {
1128   if (fd_ < 0) {
1129     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1130                << this << "(state=" << state_ << ")";
1131     return EINVAL;
1132
1133   }
1134
1135 #ifdef TCP_QUICKACK // Linux-only
1136   int value = quickack ? 1 : 0;
1137   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1138     int errnoCopy = errno;
1139     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1140             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1141             << strerror(errnoCopy);
1142     return errnoCopy;
1143   }
1144
1145   return 0;
1146 #else
1147   return ENOSYS;
1148 #endif
1149 }
1150
1151 int AsyncSocket::setSendBufSize(size_t bufsize) {
1152   if (fd_ < 0) {
1153     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1154                << this << "(state=" << state_ << ")";
1155     return EINVAL;
1156   }
1157
1158   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1159     int errnoCopy = errno;
1160     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1161             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1162             << strerror(errnoCopy);
1163     return errnoCopy;
1164   }
1165
1166   return 0;
1167 }
1168
1169 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1170   if (fd_ < 0) {
1171     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1172                << this << "(state=" << state_ << ")";
1173     return EINVAL;
1174   }
1175
1176   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1177     int errnoCopy = errno;
1178     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1179             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1180             << strerror(errnoCopy);
1181     return errnoCopy;
1182   }
1183
1184   return 0;
1185 }
1186
1187 int AsyncSocket::setTCPProfile(int profd) {
1188   if (fd_ < 0) {
1189     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1190                << this << "(state=" << state_ << ")";
1191     return EINVAL;
1192   }
1193
1194   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1195     int errnoCopy = errno;
1196     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1197             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1198             << strerror(errnoCopy);
1199     return errnoCopy;
1200   }
1201
1202   return 0;
1203 }
1204
1205 void AsyncSocket::setPersistentCork(bool cork) {
1206   if (setCork(cork) == 0) {
1207     persistentCork_ = cork;
1208   }
1209 }
1210
1211 int AsyncSocket::setCork(bool cork) {
1212 #ifdef TCP_CORK
1213   if (fd_ < 0) {
1214     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1215             << this << "(stats=" << state_ << ")";
1216     return EINVAL;
1217   }
1218
1219   if (corked_ == cork) {
1220     return 0;
1221   }
1222
1223   int flag = cork ? 1 : 0;
1224   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1225     int errnoCopy = errno;
1226     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1227             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1228             << folly::errnoStr(errnoCopy);
1229     return errnoCopy;
1230   }
1231   corked_ = cork;
1232 #endif
1233   return 0;
1234 }
1235
1236 void AsyncSocket::ioReady(uint16_t events) noexcept {
1237   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1238           << ", events=" << std::hex << events << ", state=" << state_;
1239   DestructorGuard dg(this);
1240   assert(events & EventHandler::READ_WRITE);
1241   assert(eventBase_->isInEventBaseThread());
1242
1243   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1244   if (relevantEvents == EventHandler::READ) {
1245     handleRead();
1246   } else if (relevantEvents == EventHandler::WRITE) {
1247     handleWrite();
1248   } else if (relevantEvents == EventHandler::READ_WRITE) {
1249     EventBase* originalEventBase = eventBase_;
1250     // If both read and write events are ready, process writes first.
1251     handleWrite();
1252
1253     // Return now if handleWrite() detached us from our EventBase
1254     if (eventBase_ != originalEventBase) {
1255       return;
1256     }
1257
1258     // Only call handleRead() if a read callback is still installed.
1259     // (It's possible that the read callback was uninstalled during
1260     // handleWrite().)
1261     if (readCallback_) {
1262       handleRead();
1263     }
1264   } else {
1265     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1266                << std::hex << events << "(this=" << this << ")";
1267     abort();
1268   }
1269 }
1270
1271 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1272   VLOG(5) << "AsyncSocket::performRead() this=" << this
1273           << ", buf=" << *buf << ", buflen=" << *buflen;
1274
1275   int recvFlags = 0;
1276   if (peek_) {
1277     recvFlags |= MSG_PEEK;
1278   }
1279
1280   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1281   if (bytes < 0) {
1282     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1283       // No more data to read right now.
1284       return READ_BLOCKING;
1285     } else {
1286       return READ_ERROR;
1287     }
1288   } else {
1289     appBytesReceived_ += bytes;
1290     return bytes;
1291   }
1292 }
1293
1294 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1295   // no matter what, buffer should be preapared for non-ssl socket
1296   CHECK(readCallback_);
1297   readCallback_->getReadBuffer(buf, buflen);
1298 }
1299
1300 void AsyncSocket::handleRead() noexcept {
1301   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1302           << ", state=" << state_;
1303   assert(state_ == StateEnum::ESTABLISHED);
1304   assert((shutdownFlags_ & SHUT_READ) == 0);
1305   assert(readCallback_ != nullptr);
1306   assert(eventFlags_ & EventHandler::READ);
1307
1308   // Loop until:
1309   // - a read attempt would block
1310   // - readCallback_ is uninstalled
1311   // - the number of loop iterations exceeds the optional maximum
1312   // - this AsyncSocket is moved to another EventBase
1313   //
1314   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1315   // which is why need to check for it here.
1316   //
1317   // The last bullet point is slightly subtle.  readDataAvailable() may also
1318   // detach this socket from this EventBase.  However, before
1319   // readDataAvailable() returns another thread may pick it up, attach it to
1320   // a different EventBase, and install another readCallback_.  We need to
1321   // exit immediately after readDataAvailable() returns if the eventBase_ has
1322   // changed.  (The caller must perform some sort of locking to transfer the
1323   // AsyncSocket between threads properly.  This will be sufficient to ensure
1324   // that this thread sees the updated eventBase_ variable after
1325   // readDataAvailable() returns.)
1326   uint16_t numReads = 0;
1327   EventBase* originalEventBase = eventBase_;
1328   while (readCallback_ && eventBase_ == originalEventBase) {
1329     // Get the buffer to read into.
1330     void* buf = nullptr;
1331     size_t buflen = 0, offset = 0;
1332     try {
1333       prepareReadBuffer(&buf, &buflen);
1334       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1335     } catch (const AsyncSocketException& ex) {
1336       return failRead(__func__, ex);
1337     } catch (const std::exception& ex) {
1338       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1339                               string("ReadCallback::getReadBuffer() "
1340                                      "threw exception: ") +
1341                               ex.what());
1342       return failRead(__func__, tex);
1343     } catch (...) {
1344       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1345                              "ReadCallback::getReadBuffer() threw "
1346                              "non-exception type");
1347       return failRead(__func__, ex);
1348     }
1349     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1350       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1351                              "ReadCallback::getReadBuffer() returned "
1352                              "empty buffer");
1353       return failRead(__func__, ex);
1354     }
1355
1356     // Perform the read
1357     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1358     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1359             << bytesRead << " bytes";
1360     if (bytesRead > 0) {
1361       if (!isBufferMovable_) {
1362         readCallback_->readDataAvailable(bytesRead);
1363       } else {
1364         CHECK(kOpenSslModeMoveBufferOwnership);
1365         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1366                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1367                 << ", offset=" << offset;
1368         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1369         readBuf->trimStart(offset);
1370         readBuf->trimEnd(buflen - offset - bytesRead);
1371         readCallback_->readBufferAvailable(std::move(readBuf));
1372       }
1373
1374       // Fall through and continue around the loop if the read
1375       // completely filled the available buffer.
1376       // Note that readCallback_ may have been uninstalled or changed inside
1377       // readDataAvailable().
1378       if (size_t(bytesRead) < buflen) {
1379         return;
1380       }
1381     } else if (bytesRead == READ_BLOCKING) {
1382         // No more data to read right now.
1383         return;
1384     } else if (bytesRead == READ_ERROR) {
1385       readErr_ = READ_ERROR;
1386       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1387                              withAddr("recv() failed"), errno);
1388       return failRead(__func__, ex);
1389     } else {
1390       assert(bytesRead == READ_EOF);
1391       readErr_ = READ_EOF;
1392       // EOF
1393       shutdownFlags_ |= SHUT_READ;
1394       if (!updateEventRegistration(0, EventHandler::READ)) {
1395         // we've already been moved into STATE_ERROR
1396         assert(state_ == StateEnum::ERROR);
1397         assert(readCallback_ == nullptr);
1398         return;
1399       }
1400
1401       ReadCallback* callback = readCallback_;
1402       readCallback_ = nullptr;
1403       callback->readEOF();
1404       return;
1405     }
1406     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1407       if (readCallback_ != nullptr) {
1408         // We might still have data in the socket.
1409         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1410         scheduleImmediateRead();
1411       }
1412       return;
1413     }
1414   }
1415 }
1416
1417 /**
1418  * This function attempts to write as much data as possible, until no more data
1419  * can be written.
1420  *
1421  * - If it sends all available data, it unregisters for write events, and stops
1422  *   the writeTimeout_.
1423  *
1424  * - If not all of the data can be sent immediately, it reschedules
1425  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1426  *   registered for write events.
1427  */
1428 void AsyncSocket::handleWrite() noexcept {
1429   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1430           << ", state=" << state_;
1431   if (state_ == StateEnum::CONNECTING) {
1432     handleConnect();
1433     return;
1434   }
1435
1436   // Normal write
1437   assert(state_ == StateEnum::ESTABLISHED);
1438   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1439   assert(writeReqHead_ != nullptr);
1440
1441   // Loop until we run out of write requests,
1442   // or until this socket is moved to another EventBase.
1443   // (See the comment in handleRead() explaining how this can happen.)
1444   EventBase* originalEventBase = eventBase_;
1445   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1446     if (!writeReqHead_->performWrite()) {
1447       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1448                              withAddr("writev() failed"), errno);
1449       return failWrite(__func__, ex);
1450     } else if (writeReqHead_->isComplete()) {
1451       // We finished this request
1452       WriteRequest* req = writeReqHead_;
1453       writeReqHead_ = req->getNext();
1454
1455       if (writeReqHead_ == nullptr) {
1456         writeReqTail_ = nullptr;
1457         // This is the last write request.
1458         // Unregister for write events and cancel the send timer
1459         // before we invoke the callback.  We have to update the state properly
1460         // before calling the callback, since it may want to detach us from
1461         // the EventBase.
1462         if (eventFlags_ & EventHandler::WRITE) {
1463           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1464             assert(state_ == StateEnum::ERROR);
1465             return;
1466           }
1467           // Stop the send timeout
1468           writeTimeout_.cancelTimeout();
1469         }
1470         assert(!writeTimeout_.isScheduled());
1471
1472         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1473         // we finish sending the last write request.
1474         //
1475         // We have to do this before invoking writeSuccess(), since
1476         // writeSuccess() may detach us from our EventBase.
1477         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1478           assert(connectCallback_ == nullptr);
1479           shutdownFlags_ |= SHUT_WRITE;
1480
1481           if (shutdownFlags_ & SHUT_READ) {
1482             // Reads have already been shutdown.  Fully close the socket and
1483             // move to STATE_CLOSED.
1484             //
1485             // Note: This code currently moves us to STATE_CLOSED even if
1486             // close() hasn't ever been called.  This can occur if we have
1487             // received EOF from the peer and shutdownWrite() has been called
1488             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1489             // case, until close() is actually called?  I can't think of a
1490             // reason why we would need to do so.  No other operations besides
1491             // calling close() or destroying the socket can be performed at
1492             // this point.
1493             assert(readCallback_ == nullptr);
1494             state_ = StateEnum::CLOSED;
1495             if (fd_ >= 0) {
1496               ioHandler_.changeHandlerFD(-1);
1497               doClose();
1498             }
1499           } else {
1500             // Reads are still enabled, so we are only doing a half-shutdown
1501             ::shutdown(fd_, SHUT_WR);
1502           }
1503         }
1504       }
1505
1506       // Invoke the callback
1507       WriteCallback* callback = req->getCallback();
1508       req->destroy();
1509       if (callback) {
1510         callback->writeSuccess();
1511       }
1512       // We'll continue around the loop, trying to write another request
1513     } else {
1514       // Partial write.
1515       if (bufferCallback_) {
1516         bufferCallback_->onEgressBuffered();
1517       }
1518       writeReqHead_->consume();
1519       // Stop after a partial write; it's highly likely that a subsequent write
1520       // attempt will just return EAGAIN.
1521       //
1522       // Ensure that we are registered for write events.
1523       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1524         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1525           assert(state_ == StateEnum::ERROR);
1526           return;
1527         }
1528       }
1529
1530       // Reschedule the send timeout, since we have made some write progress.
1531       if (sendTimeout_ > 0) {
1532         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1533           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1534               withAddr("failed to reschedule write timeout"));
1535           return failWrite(__func__, ex);
1536         }
1537       }
1538       return;
1539     }
1540   }
1541   if (!writeReqHead_ && bufferCallback_) {
1542     bufferCallback_->onEgressBufferCleared();
1543   }
1544 }
1545
1546 void AsyncSocket::checkForImmediateRead() noexcept {
1547   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1548   // (However, note that some subclasses do override this method.)
1549   //
1550   // Simply calling handleRead() here would be bad, as this would call
1551   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1552   // buffer even though no data may be available.  This would waste lots of
1553   // memory, since the buffer will sit around unused until the socket actually
1554   // becomes readable.
1555   //
1556   // Checking if the socket is readable now also seems like it would probably
1557   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1558   // would just waste an extra system call.  Even if it is readable, waiting to
1559   // find out from libevent on the next event loop doesn't seem that bad.
1560 }
1561
1562 void AsyncSocket::handleInitialReadWrite() noexcept {
1563   // Our callers should already be holding a DestructorGuard, but grab
1564   // one here just to make sure, in case one of our calling code paths ever
1565   // changes.
1566   DestructorGuard dg(this);
1567
1568   // If we have a readCallback_, make sure we enable read events.  We
1569   // may already be registered for reads if connectSuccess() set
1570   // the read calback.
1571   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1572     assert(state_ == StateEnum::ESTABLISHED);
1573     assert((shutdownFlags_ & SHUT_READ) == 0);
1574     if (!updateEventRegistration(EventHandler::READ, 0)) {
1575       assert(state_ == StateEnum::ERROR);
1576       return;
1577     }
1578     checkForImmediateRead();
1579   } else if (readCallback_ == nullptr) {
1580     // Unregister for read events.
1581     updateEventRegistration(0, EventHandler::READ);
1582   }
1583
1584   // If we have write requests pending, try to send them immediately.
1585   // Since we just finished accepting, there is a very good chance that we can
1586   // write without blocking.
1587   //
1588   // However, we only process them if EventHandler::WRITE is not already set,
1589   // which means that we're already blocked on a write attempt.  (This can
1590   // happen if connectSuccess() called write() before returning.)
1591   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1592     // Call handleWrite() to perform write processing.
1593     handleWrite();
1594   } else if (writeReqHead_ == nullptr) {
1595     // Unregister for write event.
1596     updateEventRegistration(0, EventHandler::WRITE);
1597   }
1598 }
1599
1600 void AsyncSocket::handleConnect() noexcept {
1601   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1602           << ", state=" << state_;
1603   assert(state_ == StateEnum::CONNECTING);
1604   // SHUT_WRITE can never be set while we are still connecting;
1605   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1606   // finishes
1607   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1608
1609   // In case we had a connect timeout, cancel the timeout
1610   writeTimeout_.cancelTimeout();
1611   // We don't use a persistent registration when waiting on a connect event,
1612   // so we have been automatically unregistered now.  Update eventFlags_ to
1613   // reflect reality.
1614   assert(eventFlags_ == EventHandler::WRITE);
1615   eventFlags_ = EventHandler::NONE;
1616
1617   // Call getsockopt() to check if the connect succeeded
1618   int error;
1619   socklen_t len = sizeof(error);
1620   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1621   if (rv != 0) {
1622     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1623                            withAddr("error calling getsockopt() after connect"),
1624                            errno);
1625     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1626                << fd_ << " host=" << addr_.describe()
1627                << ") exception:" << ex.what();
1628     return failConnect(__func__, ex);
1629   }
1630
1631   if (error != 0) {
1632     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1633                            "connect failed", error);
1634     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1635             << fd_ << " host=" << addr_.describe()
1636             << ") exception: " << ex.what();
1637     return failConnect(__func__, ex);
1638   }
1639
1640   // Move into STATE_ESTABLISHED
1641   state_ = StateEnum::ESTABLISHED;
1642
1643   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1644   // perform, immediately shutdown the write half of the socket.
1645   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1646     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1647     // are still connecting we just abort the connect rather than waiting for
1648     // it to complete.
1649     assert((shutdownFlags_ & SHUT_READ) == 0);
1650     ::shutdown(fd_, SHUT_WR);
1651     shutdownFlags_ |= SHUT_WRITE;
1652   }
1653
1654   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1655           << "successfully connected; state=" << state_;
1656
1657   // Remember the EventBase we are attached to, before we start invoking any
1658   // callbacks (since the callbacks may call detachEventBase()).
1659   EventBase* originalEventBase = eventBase_;
1660
1661   invokeConnectSuccess();
1662   // Note that the connect callback may have changed our state.
1663   // (set or unset the read callback, called write(), closed the socket, etc.)
1664   // The following code needs to handle these situations correctly.
1665   //
1666   // If the socket has been closed, readCallback_ and writeReqHead_ will
1667   // always be nullptr, so that will prevent us from trying to read or write.
1668   //
1669   // The main thing to check for is if eventBase_ is still originalEventBase.
1670   // If not, we have been detached from this event base, so we shouldn't
1671   // perform any more operations.
1672   if (eventBase_ != originalEventBase) {
1673     return;
1674   }
1675
1676   handleInitialReadWrite();
1677 }
1678
1679 void AsyncSocket::timeoutExpired() noexcept {
1680   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1681           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1682   DestructorGuard dg(this);
1683   assert(eventBase_->isInEventBaseThread());
1684
1685   if (state_ == StateEnum::CONNECTING) {
1686     // connect() timed out
1687     // Unregister for I/O events.
1688     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1689                            "connect timed out");
1690     failConnect(__func__, ex);
1691   } else {
1692     // a normal write operation timed out
1693     assert(state_ == StateEnum::ESTABLISHED);
1694     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1695     failWrite(__func__, ex);
1696   }
1697 }
1698
1699 ssize_t AsyncSocket::performWrite(const iovec* vec,
1700                                    uint32_t count,
1701                                    WriteFlags flags,
1702                                    uint32_t* countWritten,
1703                                    uint32_t* partialWritten) {
1704   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1705   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1706   // (since it may terminate the program if the main program doesn't explicitly
1707   // ignore it).
1708   struct msghdr msg;
1709   msg.msg_name = nullptr;
1710   msg.msg_namelen = 0;
1711   msg.msg_iov = const_cast<iovec *>(vec);
1712 #ifdef IOV_MAX // not defined on Android
1713   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1714 #else
1715   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1716 #endif
1717   msg.msg_control = nullptr;
1718   msg.msg_controllen = 0;
1719   msg.msg_flags = 0;
1720
1721   int msg_flags = MSG_DONTWAIT;
1722
1723 #ifdef MSG_NOSIGNAL // Linux-only
1724   msg_flags |= MSG_NOSIGNAL;
1725   if (isSet(flags, WriteFlags::CORK)) {
1726     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1727     // give it the rest of the data rather than immediately sending a partial
1728     // frame, even when TCP_NODELAY is enabled.
1729     msg_flags |= MSG_MORE;
1730   }
1731 #endif
1732   if (isSet(flags, WriteFlags::EOR)) {
1733     // marks that this is the last byte of a record (response)
1734     msg_flags |= MSG_EOR;
1735   }
1736   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1737   if (totalWritten < 0) {
1738     if (errno == EAGAIN) {
1739       // TCP buffer is full; we can't write any more data right now.
1740       *countWritten = 0;
1741       *partialWritten = 0;
1742       return 0;
1743     }
1744     // error
1745     *countWritten = 0;
1746     *partialWritten = 0;
1747     return -1;
1748   }
1749
1750   appBytesWritten_ += totalWritten;
1751
1752   uint32_t bytesWritten;
1753   uint32_t n;
1754   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1755     const iovec* v = vec + n;
1756     if (v->iov_len > bytesWritten) {
1757       // Partial write finished in the middle of this iovec
1758       *countWritten = n;
1759       *partialWritten = bytesWritten;
1760       return totalWritten;
1761     }
1762
1763     bytesWritten -= v->iov_len;
1764   }
1765
1766   assert(bytesWritten == 0);
1767   *countWritten = n;
1768   *partialWritten = 0;
1769   return totalWritten;
1770 }
1771
1772 /**
1773  * Re-register the EventHandler after eventFlags_ has changed.
1774  *
1775  * If an error occurs, fail() is called to move the socket into the error state
1776  * and call all currently installed callbacks.  After an error, the
1777  * AsyncSocket is completely unregistered.
1778  *
1779  * @return Returns true on succcess, or false on error.
1780  */
1781 bool AsyncSocket::updateEventRegistration() {
1782   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1783           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1784           << ", events=" << std::hex << eventFlags_;
1785   assert(eventBase_->isInEventBaseThread());
1786   if (eventFlags_ == EventHandler::NONE) {
1787     ioHandler_.unregisterHandler();
1788     return true;
1789   }
1790
1791   // Always register for persistent events, so we don't have to re-register
1792   // after being called back.
1793   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1794     eventFlags_ = EventHandler::NONE; // we're not registered after error
1795     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1796         withAddr("failed to update AsyncSocket event registration"));
1797     fail("updateEventRegistration", ex);
1798     return false;
1799   }
1800
1801   return true;
1802 }
1803
1804 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1805                                            uint16_t disable) {
1806   uint16_t oldFlags = eventFlags_;
1807   eventFlags_ |= enable;
1808   eventFlags_ &= ~disable;
1809   if (eventFlags_ == oldFlags) {
1810     return true;
1811   } else {
1812     return updateEventRegistration();
1813   }
1814 }
1815
1816 void AsyncSocket::startFail() {
1817   // startFail() should only be called once
1818   assert(state_ != StateEnum::ERROR);
1819   assert(getDestructorGuardCount() > 0);
1820   state_ = StateEnum::ERROR;
1821   // Ensure that SHUT_READ and SHUT_WRITE are set,
1822   // so all future attempts to read or write will be rejected
1823   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1824
1825   if (eventFlags_ != EventHandler::NONE) {
1826     eventFlags_ = EventHandler::NONE;
1827     ioHandler_.unregisterHandler();
1828   }
1829   writeTimeout_.cancelTimeout();
1830
1831   if (fd_ >= 0) {
1832     ioHandler_.changeHandlerFD(-1);
1833     doClose();
1834   }
1835 }
1836
1837 void AsyncSocket::finishFail() {
1838   assert(state_ == StateEnum::ERROR);
1839   assert(getDestructorGuardCount() > 0);
1840
1841   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1842                          withAddr("socket closing after error"));
1843   invokeConnectErr(ex);
1844   failAllWrites(ex);
1845
1846   if (readCallback_) {
1847     ReadCallback* callback = readCallback_;
1848     readCallback_ = nullptr;
1849     callback->readErr(ex);
1850   }
1851 }
1852
1853 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1854   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1855              << state_ << " host=" << addr_.describe()
1856              << "): failed in " << fn << "(): "
1857              << ex.what();
1858   startFail();
1859   finishFail();
1860 }
1861
1862 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1863   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1864                << state_ << " host=" << addr_.describe()
1865                << "): failed while connecting in " << fn << "(): "
1866                << ex.what();
1867   startFail();
1868
1869   invokeConnectErr(ex);
1870   finishFail();
1871 }
1872
1873 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1874   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1875                << state_ << " host=" << addr_.describe()
1876                << "): failed while reading in " << fn << "(): "
1877                << ex.what();
1878   startFail();
1879
1880   if (readCallback_ != nullptr) {
1881     ReadCallback* callback = readCallback_;
1882     readCallback_ = nullptr;
1883     callback->readErr(ex);
1884   }
1885
1886   finishFail();
1887 }
1888
1889 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1890   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1891                << state_ << " host=" << addr_.describe()
1892                << "): failed while writing in " << fn << "(): "
1893                << ex.what();
1894   startFail();
1895
1896   // Only invoke the first write callback, since the error occurred while
1897   // writing this request.  Let any other pending write callbacks be invoked in
1898   // finishFail().
1899   if (writeReqHead_ != nullptr) {
1900     WriteRequest* req = writeReqHead_;
1901     writeReqHead_ = req->getNext();
1902     WriteCallback* callback = req->getCallback();
1903     uint32_t bytesWritten = req->getTotalBytesWritten();
1904     req->destroy();
1905     if (callback) {
1906       callback->writeErr(bytesWritten, ex);
1907     }
1908   }
1909
1910   finishFail();
1911 }
1912
1913 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1914                              size_t bytesWritten,
1915                              const AsyncSocketException& ex) {
1916   // This version of failWrite() is used when the failure occurs before
1917   // we've added the callback to writeReqHead_.
1918   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1919              << state_ << " host=" << addr_.describe()
1920              <<"): failed while writing in " << fn << "(): "
1921              << ex.what();
1922   startFail();
1923
1924   if (callback != nullptr) {
1925     callback->writeErr(bytesWritten, ex);
1926   }
1927
1928   finishFail();
1929 }
1930
1931 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1932   // Invoke writeError() on all write callbacks.
1933   // This is used when writes are forcibly shutdown with write requests
1934   // pending, or when an error occurs with writes pending.
1935   while (writeReqHead_ != nullptr) {
1936     WriteRequest* req = writeReqHead_;
1937     writeReqHead_ = req->getNext();
1938     WriteCallback* callback = req->getCallback();
1939     if (callback) {
1940       callback->writeErr(req->getTotalBytesWritten(), ex);
1941     }
1942     req->destroy();
1943   }
1944 }
1945
1946 void AsyncSocket::invalidState(ConnectCallback* callback) {
1947   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1948              << "): connect() called in invalid state " << state_;
1949
1950   /*
1951    * The invalidState() methods don't use the normal failure mechanisms,
1952    * since we don't know what state we are in.  We don't want to call
1953    * startFail()/finishFail() recursively if we are already in the middle of
1954    * cleaning up.
1955    */
1956
1957   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1958                          "connect() called with socket in invalid state");
1959   connectEndTime_ = std::chrono::steady_clock::now();
1960   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1961     if (callback) {
1962       callback->connectErr(ex);
1963     }
1964   } else {
1965     // We can't use failConnect() here since connectCallback_
1966     // may already be set to another callback.  Invoke this ConnectCallback
1967     // here; any other connectCallback_ will be invoked in finishFail()
1968     startFail();
1969     if (callback) {
1970       callback->connectErr(ex);
1971     }
1972     finishFail();
1973   }
1974 }
1975
1976 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1977   connectEndTime_ = std::chrono::steady_clock::now();
1978   if (connectCallback_) {
1979     ConnectCallback* callback = connectCallback_;
1980     connectCallback_ = nullptr;
1981     callback->connectErr(ex);
1982   }
1983 }
1984
1985 void AsyncSocket::invokeConnectSuccess() {
1986   connectEndTime_ = std::chrono::steady_clock::now();
1987   if (connectCallback_) {
1988     ConnectCallback* callback = connectCallback_;
1989     connectCallback_ = nullptr;
1990     callback->connectSuccess();
1991   }
1992 }
1993
1994 void AsyncSocket::invalidState(ReadCallback* callback) {
1995   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1996              << "): setReadCallback(" << callback
1997              << ") called in invalid state " << state_;
1998
1999   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2000                          "setReadCallback() called with socket in "
2001                          "invalid state");
2002   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2003     if (callback) {
2004       callback->readErr(ex);
2005     }
2006   } else {
2007     startFail();
2008     if (callback) {
2009       callback->readErr(ex);
2010     }
2011     finishFail();
2012   }
2013 }
2014
2015 void AsyncSocket::invalidState(WriteCallback* callback) {
2016   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2017              << "): write() called in invalid state " << state_;
2018
2019   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2020                          withAddr("write() called with socket in invalid state"));
2021   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2022     if (callback) {
2023       callback->writeErr(0, ex);
2024     }
2025   } else {
2026     startFail();
2027     if (callback) {
2028       callback->writeErr(0, ex);
2029     }
2030     finishFail();
2031   }
2032 }
2033
2034 void AsyncSocket::doClose() {
2035   if (fd_ == -1) return;
2036   if (shutdownSocketSet_) {
2037     shutdownSocketSet_->close(fd_);
2038   } else {
2039     ::close(fd_);
2040   }
2041   fd_ = -1;
2042 }
2043
2044 std::ostream& operator << (std::ostream& os,
2045                            const AsyncSocket::StateEnum& state) {
2046   os << static_cast<int>(state);
2047   return os;
2048 }
2049
2050 std::string AsyncSocket::withAddr(const std::string& s) {
2051   // Don't use addr_ directly because it may not be initialized
2052   // e.g. if constructed from fd
2053   folly::SocketAddress peer, local;
2054   try {
2055     getPeerAddress(&peer);
2056     getLocalAddress(&local);
2057   } catch (const std::exception&) {
2058     // ignore
2059   } catch (...) {
2060     // ignore
2061   }
2062   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2063 }
2064
2065 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2066   bufferCallback_ = cb;
2067 }
2068
2069 } // folly