a8fb13e0425f25fce7b658fa8321e37a464b8617
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
341                                 withAddr("failed to create socket"), errno);
342     }
343     if (shutdownSocketSet_) {
344       shutdownSocketSet_->add(fd_);
345     }
346     ioHandler_.changeHandlerFD(fd_);
347
348     setCloseOnExec();
349
350     // Put the socket in non-blocking mode
351     int flags = fcntl(fd_, F_GETFL, 0);
352     if (flags == -1) {
353       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
354                                 withAddr("failed to get socket flags"), errno);
355     }
356     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
357     if (rv == -1) {
358       throw AsyncSocketException(
359           AsyncSocketException::INTERNAL_ERROR,
360           withAddr("failed to put socket in non-blocking mode"),
361           errno);
362     }
363
364 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
365     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
366     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
367     if (rv == -1) {
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           "failed to enable F_SETNOSIGPIPE on socket",
371           errno);
372     }
373 #endif
374
375     // By default, turn on TCP_NODELAY
376     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
377     // setNoDelay() will log an error message if it fails.
378     if (address.getFamily() != AF_UNIX) {
379       (void)setNoDelay(true);
380     }
381
382     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
383             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
384
385     // bind the socket
386     if (bindAddr != anyAddress()) {
387       int one = 1;
388       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
389         doClose();
390         throw AsyncSocketException(
391           AsyncSocketException::NOT_OPEN,
392           "failed to setsockopt prior to bind on " + bindAddr.describe(),
393           errno);
394       }
395
396       bindAddr.getAddress(&addrStorage);
397
398       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
399         doClose();
400         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
401                                   "failed to bind to async socket: " +
402                                   bindAddr.describe(),
403                                   errno);
404       }
405     }
406
407     // Apply the additional options if any.
408     for (const auto& opt: options) {
409       int rv = opt.first.apply(fd_, opt.second);
410       if (rv != 0) {
411         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
412                                   withAddr("failed to set socket option"),
413                                   errno);
414       }
415     }
416
417     // Perform the connect()
418     address.getAddress(&addrStorage);
419
420     rv = ::connect(fd_, saddr, address.getActualSize());
421     if (rv < 0) {
422       if (errno == EINPROGRESS) {
423         // Connection in progress.
424         if (timeout > 0) {
425           // Start a timer in case the connection takes too long.
426           if (!writeTimeout_.scheduleTimeout(timeout)) {
427             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
428                 withAddr("failed to schedule AsyncSocket connect timeout"));
429           }
430         }
431
432         // Register for write events, so we'll
433         // be notified when the connection finishes/fails.
434         // Note that we don't register for a persistent event here.
435         assert(eventFlags_ == EventHandler::NONE);
436         eventFlags_ = EventHandler::WRITE;
437         if (!ioHandler_.registerHandler(eventFlags_)) {
438           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
439               withAddr("failed to register AsyncSocket connect handler"));
440         }
441         return;
442       } else {
443         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
444                                   "connect failed (immediately)", errno);
445       }
446     }
447
448     // If we're still here the connect() succeeded immediately.
449     // Fall through to call the callback outside of this try...catch block
450   } catch (const AsyncSocketException& ex) {
451     return failConnect(__func__, ex);
452   } catch (const std::exception& ex) {
453     // shouldn't happen, but handle it just in case
454     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
455                << "): unexpected " << typeid(ex).name() << " exception: "
456                << ex.what();
457     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
458                             withAddr(string("unexpected exception: ") +
459                                      ex.what()));
460     return failConnect(__func__, tex);
461   }
462
463   // The connection succeeded immediately
464   // The read callback may not have been set yet, and no writes may be pending
465   // yet, so we don't have to register for any events at the moment.
466   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
467   assert(readCallback_ == nullptr);
468   assert(writeReqHead_ == nullptr);
469   state_ = StateEnum::ESTABLISHED;
470   invokeConnectSuccess();
471 }
472
473 void AsyncSocket::connect(ConnectCallback* callback,
474                            const string& ip, uint16_t port,
475                            int timeout,
476                            const OptionMap &options) noexcept {
477   DestructorGuard dg(this);
478   try {
479     connectCallback_ = callback;
480     connect(callback, folly::SocketAddress(ip, port), timeout, options);
481   } catch (const std::exception& ex) {
482     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
483                             ex.what());
484     return failConnect(__func__, tex);
485   }
486 }
487
488 void AsyncSocket::cancelConnect() {
489   connectCallback_ = nullptr;
490   if (state_ == StateEnum::CONNECTING) {
491     closeNow();
492   }
493 }
494
495 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
496   sendTimeout_ = milliseconds;
497   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
498
499   // If we are currently pending on write requests, immediately update
500   // writeTimeout_ with the new value.
501   if ((eventFlags_ & EventHandler::WRITE) &&
502       (state_ != StateEnum::CONNECTING)) {
503     assert(state_ == StateEnum::ESTABLISHED);
504     assert((shutdownFlags_ & SHUT_WRITE) == 0);
505     if (sendTimeout_ > 0) {
506       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
507         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
508             withAddr("failed to reschedule send timeout in setSendTimeout"));
509         return failWrite(__func__, ex);
510       }
511     } else {
512       writeTimeout_.cancelTimeout();
513     }
514   }
515 }
516
517 void AsyncSocket::setReadCB(ReadCallback *callback) {
518   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
519           << ", callback=" << callback << ", state=" << state_;
520
521   // Short circuit if callback is the same as the existing readCallback_.
522   //
523   // Note that this is needed for proper functioning during some cleanup cases.
524   // During cleanup we allow setReadCallback(nullptr) to be called even if the
525   // read callback is already unset and we have been detached from an event
526   // base.  This check prevents us from asserting
527   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
528   if (callback == readCallback_) {
529     return;
530   }
531
532   /* We are removing a read callback */
533   if (callback == nullptr &&
534       immediateReadHandler_.isLoopCallbackScheduled()) {
535     immediateReadHandler_.cancelLoopCallback();
536   }
537
538   if (shutdownFlags_ & SHUT_READ) {
539     // Reads have already been shut down on this socket.
540     //
541     // Allow setReadCallback(nullptr) to be called in this case, but don't
542     // allow a new callback to be set.
543     //
544     // For example, setReadCallback(nullptr) can happen after an error if we
545     // invoke some other error callback before invoking readError().  The other
546     // error callback that is invoked first may go ahead and clear the read
547     // callback before we get a chance to invoke readError().
548     if (callback != nullptr) {
549       return invalidState(callback);
550     }
551     assert((eventFlags_ & EventHandler::READ) == 0);
552     readCallback_ = nullptr;
553     return;
554   }
555
556   DestructorGuard dg(this);
557   assert(eventBase_->isInEventBaseThread());
558
559   switch ((StateEnum)state_) {
560     case StateEnum::CONNECTING:
561       // For convenience, we allow the read callback to be set while we are
562       // still connecting.  We just store the callback for now.  Once the
563       // connection completes we'll register for read events.
564       readCallback_ = callback;
565       return;
566     case StateEnum::ESTABLISHED:
567     {
568       readCallback_ = callback;
569       uint16_t oldFlags = eventFlags_;
570       if (readCallback_) {
571         eventFlags_ |= EventHandler::READ;
572       } else {
573         eventFlags_ &= ~EventHandler::READ;
574       }
575
576       // Update our registration if our flags have changed
577       if (eventFlags_ != oldFlags) {
578         // We intentionally ignore the return value here.
579         // updateEventRegistration() will move us into the error state if it
580         // fails, and we don't need to do anything else here afterwards.
581         (void)updateEventRegistration();
582       }
583
584       if (readCallback_) {
585         checkForImmediateRead();
586       }
587       return;
588     }
589     case StateEnum::CLOSED:
590     case StateEnum::ERROR:
591       // We should never reach here.  SHUT_READ should always be set
592       // if we are in STATE_CLOSED or STATE_ERROR.
593       assert(false);
594       return invalidState(callback);
595     case StateEnum::UNINIT:
596       // We do not allow setReadCallback() to be called before we start
597       // connecting.
598       return invalidState(callback);
599   }
600
601   // We don't put a default case in the switch statement, so that the compiler
602   // will warn us to update the switch statement if a new state is added.
603   return invalidState(callback);
604 }
605
606 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
607   return readCallback_;
608 }
609
610 void AsyncSocket::write(WriteCallback* callback,
611                          const void* buf, size_t bytes, WriteFlags flags) {
612   iovec op;
613   op.iov_base = const_cast<void*>(buf);
614   op.iov_len = bytes;
615   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
616 }
617
618 void AsyncSocket::writev(WriteCallback* callback,
619                           const iovec* vec,
620                           size_t count,
621                           WriteFlags flags) {
622   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
623 }
624
625 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
626                               WriteFlags flags) {
627   constexpr size_t kSmallSizeMax = 64;
628   size_t count = buf->countChainElements();
629   if (count <= kSmallSizeMax) {
630     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632   } else {
633     iovec* vec = new iovec[count];
634     writeChainImpl(callback, vec, count, std::move(buf), flags);
635     delete[] vec;
636   }
637 }
638
639 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
640     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
641   size_t veclen = buf->fillIov(vec, count);
642   writeImpl(callback, vec, veclen, std::move(buf), flags);
643 }
644
645 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
646                              size_t count, unique_ptr<IOBuf>&& buf,
647                              WriteFlags flags) {
648   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
649           << ", callback=" << callback << ", count=" << count
650           << ", state=" << state_;
651   DestructorGuard dg(this);
652   unique_ptr<IOBuf>ioBuf(std::move(buf));
653   assert(eventBase_->isInEventBaseThread());
654
655   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
656     // No new writes may be performed after the write side of the socket has
657     // been shutdown.
658     //
659     // We could just call callback->writeError() here to fail just this write.
660     // However, fail hard and use invalidState() to fail all outstanding
661     // callbacks and move the socket into the error state.  There's most likely
662     // a bug in the caller's code, so we abort everything rather than trying to
663     // proceed as best we can.
664     return invalidState(callback);
665   }
666
667   uint32_t countWritten = 0;
668   uint32_t partialWritten = 0;
669   int bytesWritten = 0;
670   bool mustRegister = false;
671   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
672     if (writeReqHead_ == nullptr) {
673       // If we are established and there are no other writes pending,
674       // we can attempt to perform the write immediately.
675       assert(writeReqTail_ == nullptr);
676       assert((eventFlags_ & EventHandler::WRITE) == 0);
677
678       bytesWritten = performWrite(vec, count, flags,
679                                   &countWritten, &partialWritten);
680       if (bytesWritten < 0) {
681         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
682                                withAddr("writev failed"), errno);
683         return failWrite(__func__, callback, 0, ex);
684       } else if (countWritten == count) {
685         // We successfully wrote everything.
686         // Invoke the callback and return.
687         if (callback) {
688           callback->writeSuccess();
689         }
690         return;
691       } else { // continue writing the next writeReq
692         if (bufferCallback_) {
693           bufferCallback_->onEgressBuffered();
694         }
695       }
696       mustRegister = true;
697     }
698   } else if (!connecting()) {
699     // Invalid state for writing
700     return invalidState(callback);
701   }
702
703   // Create a new WriteRequest to add to the queue
704   WriteRequest* req;
705   try {
706     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
707                                         count - countWritten, partialWritten,
708                                         bytesWritten, std::move(ioBuf), flags);
709   } catch (const std::exception& ex) {
710     // we mainly expect to catch std::bad_alloc here
711     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
712         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
713     return failWrite(__func__, callback, bytesWritten, tex);
714   }
715   req->consume();
716   if (writeReqTail_ == nullptr) {
717     assert(writeReqHead_ == nullptr);
718     writeReqHead_ = writeReqTail_ = req;
719   } else {
720     writeReqTail_->append(req);
721     writeReqTail_ = req;
722   }
723
724   // Register for write events if are established and not currently
725   // waiting on write events
726   if (mustRegister) {
727     assert(state_ == StateEnum::ESTABLISHED);
728     assert((eventFlags_ & EventHandler::WRITE) == 0);
729     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
730       assert(state_ == StateEnum::ERROR);
731       return;
732     }
733     if (sendTimeout_ > 0) {
734       // Schedule a timeout to fire if the write takes too long.
735       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
736         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
737                                withAddr("failed to schedule send timeout"));
738         return failWrite(__func__, ex);
739       }
740     }
741   }
742 }
743
744 void AsyncSocket::writeRequest(WriteRequest* req) {
745   if (writeReqTail_ == nullptr) {
746     assert(writeReqHead_ == nullptr);
747     writeReqHead_ = writeReqTail_ = req;
748     req->start();
749   } else {
750     writeReqTail_->append(req);
751     writeReqTail_ = req;
752   }
753 }
754
755 void AsyncSocket::close() {
756   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
757           << ", state=" << state_ << ", shutdownFlags="
758           << std::hex << (int) shutdownFlags_;
759
760   // close() is only different from closeNow() when there are pending writes
761   // that need to drain before we can close.  In all other cases, just call
762   // closeNow().
763   //
764   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
765   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
766   // is still running.  (e.g., If there are multiple pending writes, and we
767   // call writeError() on the first one, it may call close().  In this case we
768   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
769   // writes will still be in the queue.)
770   //
771   // We only need to drain pending writes if we are still in STATE_CONNECTING
772   // or STATE_ESTABLISHED
773   if ((writeReqHead_ == nullptr) ||
774       !(state_ == StateEnum::CONNECTING ||
775       state_ == StateEnum::ESTABLISHED)) {
776     closeNow();
777     return;
778   }
779
780   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
781   // destroyed until close() returns.
782   DestructorGuard dg(this);
783   assert(eventBase_->isInEventBaseThread());
784
785   // Since there are write requests pending, we have to set the
786   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
787   // connect finishes and we finish writing these requests.
788   //
789   // Set SHUT_READ to indicate that reads are shut down, and set the
790   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
791   // pending writes complete.
792   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
793
794   // If a read callback is set, invoke readEOF() immediately to inform it that
795   // the socket has been closed and no more data can be read.
796   if (readCallback_) {
797     // Disable reads if they are enabled
798     if (!updateEventRegistration(0, EventHandler::READ)) {
799       // We're now in the error state; callbacks have been cleaned up
800       assert(state_ == StateEnum::ERROR);
801       assert(readCallback_ == nullptr);
802     } else {
803       ReadCallback* callback = readCallback_;
804       readCallback_ = nullptr;
805       callback->readEOF();
806     }
807   }
808 }
809
810 void AsyncSocket::closeNow() {
811   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
812           << ", state=" << state_ << ", shutdownFlags="
813           << std::hex << (int) shutdownFlags_;
814   DestructorGuard dg(this);
815   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
816
817   switch (state_) {
818     case StateEnum::ESTABLISHED:
819     case StateEnum::CONNECTING:
820     {
821       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
822       state_ = StateEnum::CLOSED;
823
824       // If the write timeout was set, cancel it.
825       writeTimeout_.cancelTimeout();
826
827       // If we are registered for I/O events, unregister.
828       if (eventFlags_ != EventHandler::NONE) {
829         eventFlags_ = EventHandler::NONE;
830         if (!updateEventRegistration()) {
831           // We will have been moved into the error state.
832           assert(state_ == StateEnum::ERROR);
833           return;
834         }
835       }
836
837       if (immediateReadHandler_.isLoopCallbackScheduled()) {
838         immediateReadHandler_.cancelLoopCallback();
839       }
840
841       if (fd_ >= 0) {
842         ioHandler_.changeHandlerFD(-1);
843         doClose();
844       }
845
846       invokeConnectErr(socketClosedLocallyEx);
847
848       failAllWrites(socketClosedLocallyEx);
849
850       if (readCallback_) {
851         ReadCallback* callback = readCallback_;
852         readCallback_ = nullptr;
853         callback->readEOF();
854       }
855       return;
856     }
857     case StateEnum::CLOSED:
858       // Do nothing.  It's possible that we are being called recursively
859       // from inside a callback that we invoked inside another call to close()
860       // that is still running.
861       return;
862     case StateEnum::ERROR:
863       // Do nothing.  The error handling code has performed (or is performing)
864       // cleanup.
865       return;
866     case StateEnum::UNINIT:
867       assert(eventFlags_ == EventHandler::NONE);
868       assert(connectCallback_ == nullptr);
869       assert(readCallback_ == nullptr);
870       assert(writeReqHead_ == nullptr);
871       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
872       state_ = StateEnum::CLOSED;
873       return;
874   }
875
876   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
877               << ") called in unknown state " << state_;
878 }
879
880 void AsyncSocket::closeWithReset() {
881   // Enable SO_LINGER, with the linger timeout set to 0.
882   // This will trigger a TCP reset when we close the socket.
883   if (fd_ >= 0) {
884     struct linger optLinger = {1, 0};
885     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
886       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
887               << "on " << fd_ << ": errno=" << errno;
888     }
889   }
890
891   // Then let closeNow() take care of the rest
892   closeNow();
893 }
894
895 void AsyncSocket::shutdownWrite() {
896   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
897           << ", state=" << state_ << ", shutdownFlags="
898           << std::hex << (int) shutdownFlags_;
899
900   // If there are no pending writes, shutdownWrite() is identical to
901   // shutdownWriteNow().
902   if (writeReqHead_ == nullptr) {
903     shutdownWriteNow();
904     return;
905   }
906
907   assert(eventBase_->isInEventBaseThread());
908
909   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
910   // shutdown will be performed once all writes complete.
911   shutdownFlags_ |= SHUT_WRITE_PENDING;
912 }
913
914 void AsyncSocket::shutdownWriteNow() {
915   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
916           << ", fd=" << fd_ << ", state=" << state_
917           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
918
919   if (shutdownFlags_ & SHUT_WRITE) {
920     // Writes are already shutdown; nothing else to do.
921     return;
922   }
923
924   // If SHUT_READ is already set, just call closeNow() to completely
925   // close the socket.  This can happen if close() was called with writes
926   // pending, and then shutdownWriteNow() is called before all pending writes
927   // complete.
928   if (shutdownFlags_ & SHUT_READ) {
929     closeNow();
930     return;
931   }
932
933   DestructorGuard dg(this);
934   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
935
936   switch (static_cast<StateEnum>(state_)) {
937     case StateEnum::ESTABLISHED:
938     {
939       shutdownFlags_ |= SHUT_WRITE;
940
941       // If the write timeout was set, cancel it.
942       writeTimeout_.cancelTimeout();
943
944       // If we are registered for write events, unregister.
945       if (!updateEventRegistration(0, EventHandler::WRITE)) {
946         // We will have been moved into the error state.
947         assert(state_ == StateEnum::ERROR);
948         return;
949       }
950
951       // Shutdown writes on the file descriptor
952       ::shutdown(fd_, SHUT_WR);
953
954       // Immediately fail all write requests
955       failAllWrites(socketShutdownForWritesEx);
956       return;
957     }
958     case StateEnum::CONNECTING:
959     {
960       // Set the SHUT_WRITE_PENDING flag.
961       // When the connection completes, it will check this flag,
962       // shutdown the write half of the socket, and then set SHUT_WRITE.
963       shutdownFlags_ |= SHUT_WRITE_PENDING;
964
965       // Immediately fail all write requests
966       failAllWrites(socketShutdownForWritesEx);
967       return;
968     }
969     case StateEnum::UNINIT:
970       // Callers normally shouldn't call shutdownWriteNow() before the socket
971       // even starts connecting.  Nonetheless, go ahead and set
972       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
973       // immediately shut down the write side of the socket.
974       shutdownFlags_ |= SHUT_WRITE_PENDING;
975       return;
976     case StateEnum::CLOSED:
977     case StateEnum::ERROR:
978       // We should never get here.  SHUT_WRITE should always be set
979       // in STATE_CLOSED and STATE_ERROR.
980       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
981                  << ", fd=" << fd_ << ") in unexpected state " << state_
982                  << " with SHUT_WRITE not set ("
983                  << std::hex << (int) shutdownFlags_ << ")";
984       assert(false);
985       return;
986   }
987
988   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
989               << fd_ << ") called in unknown state " << state_;
990 }
991
992 bool AsyncSocket::readable() const {
993   if (fd_ == -1) {
994     return false;
995   }
996   struct pollfd fds[1];
997   fds[0].fd = fd_;
998   fds[0].events = POLLIN;
999   fds[0].revents = 0;
1000   int rc = poll(fds, 1, 0);
1001   return rc == 1;
1002 }
1003
1004 bool AsyncSocket::isPending() const {
1005   return ioHandler_.isPending();
1006 }
1007
1008 bool AsyncSocket::hangup() const {
1009   if (fd_ == -1) {
1010     // sanity check, no one should ask for hangup if we are not connected.
1011     assert(false);
1012     return false;
1013   }
1014 #ifdef POLLRDHUP // Linux-only
1015   struct pollfd fds[1];
1016   fds[0].fd = fd_;
1017   fds[0].events = POLLRDHUP|POLLHUP;
1018   fds[0].revents = 0;
1019   poll(fds, 1, 0);
1020   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1021 #else
1022   return false;
1023 #endif
1024 }
1025
1026 bool AsyncSocket::good() const {
1027   return ((state_ == StateEnum::CONNECTING ||
1028           state_ == StateEnum::ESTABLISHED) &&
1029           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1030 }
1031
1032 bool AsyncSocket::error() const {
1033   return (state_ == StateEnum::ERROR);
1034 }
1035
1036 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1037   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1038           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1039           << ", state=" << state_ << ", events="
1040           << std::hex << eventFlags_ << ")";
1041   assert(eventBase_ == nullptr);
1042   assert(eventBase->isInEventBaseThread());
1043
1044   eventBase_ = eventBase;
1045   ioHandler_.attachEventBase(eventBase);
1046   writeTimeout_.attachEventBase(eventBase);
1047 }
1048
1049 void AsyncSocket::detachEventBase() {
1050   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1051           << ", old evb=" << eventBase_ << ", state=" << state_
1052           << ", events=" << std::hex << eventFlags_ << ")";
1053   assert(eventBase_ != nullptr);
1054   assert(eventBase_->isInEventBaseThread());
1055
1056   eventBase_ = nullptr;
1057   ioHandler_.detachEventBase();
1058   writeTimeout_.detachEventBase();
1059 }
1060
1061 bool AsyncSocket::isDetachable() const {
1062   DCHECK(eventBase_ != nullptr);
1063   DCHECK(eventBase_->isInEventBaseThread());
1064
1065   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1066 }
1067
1068 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1069   if (!localAddr_.isInitialized()) {
1070     localAddr_.setFromLocalAddress(fd_);
1071   }
1072   *address = localAddr_;
1073 }
1074
1075 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1076   if (!addr_.isInitialized()) {
1077     addr_.setFromPeerAddress(fd_);
1078   }
1079   *address = addr_;
1080 }
1081
1082 int AsyncSocket::setNoDelay(bool noDelay) {
1083   if (fd_ < 0) {
1084     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1085                << this << "(state=" << state_ << ")";
1086     return EINVAL;
1087
1088   }
1089
1090   int value = noDelay ? 1 : 0;
1091   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1092     int errnoCopy = errno;
1093     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1094             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1095             << strerror(errnoCopy);
1096     return errnoCopy;
1097   }
1098
1099   return 0;
1100 }
1101
1102 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1103
1104   #ifndef TCP_CONGESTION
1105   #define TCP_CONGESTION  13
1106   #endif
1107
1108   if (fd_ < 0) {
1109     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1110                << "socket " << this << "(state=" << state_ << ")";
1111     return EINVAL;
1112
1113   }
1114
1115   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1116         cname.length() + 1) != 0) {
1117     int errnoCopy = errno;
1118     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1119             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1120             << strerror(errnoCopy);
1121     return errnoCopy;
1122   }
1123
1124   return 0;
1125 }
1126
1127 int AsyncSocket::setQuickAck(bool quickack) {
1128   if (fd_ < 0) {
1129     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1130                << this << "(state=" << state_ << ")";
1131     return EINVAL;
1132
1133   }
1134
1135 #ifdef TCP_QUICKACK // Linux-only
1136   int value = quickack ? 1 : 0;
1137   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1138     int errnoCopy = errno;
1139     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1140             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1141             << strerror(errnoCopy);
1142     return errnoCopy;
1143   }
1144
1145   return 0;
1146 #else
1147   return ENOSYS;
1148 #endif
1149 }
1150
1151 int AsyncSocket::setSendBufSize(size_t bufsize) {
1152   if (fd_ < 0) {
1153     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1154                << this << "(state=" << state_ << ")";
1155     return EINVAL;
1156   }
1157
1158   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1159     int errnoCopy = errno;
1160     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1161             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1162             << strerror(errnoCopy);
1163     return errnoCopy;
1164   }
1165
1166   return 0;
1167 }
1168
1169 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1170   if (fd_ < 0) {
1171     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1172                << this << "(state=" << state_ << ")";
1173     return EINVAL;
1174   }
1175
1176   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1177     int errnoCopy = errno;
1178     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1179             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1180             << strerror(errnoCopy);
1181     return errnoCopy;
1182   }
1183
1184   return 0;
1185 }
1186
1187 int AsyncSocket::setTCPProfile(int profd) {
1188   if (fd_ < 0) {
1189     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1190                << this << "(state=" << state_ << ")";
1191     return EINVAL;
1192   }
1193
1194   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1195     int errnoCopy = errno;
1196     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1197             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1198             << strerror(errnoCopy);
1199     return errnoCopy;
1200   }
1201
1202   return 0;
1203 }
1204
1205 void AsyncSocket::setPersistentCork(bool cork) {
1206   if (setCork(cork) == 0) {
1207     persistentCork_ = cork;
1208   }
1209 }
1210
1211 int AsyncSocket::setCork(bool cork) {
1212 #ifdef TCP_CORK
1213   if (fd_ < 0) {
1214     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1215             << this << "(stats=" << state_ << ")";
1216     return EINVAL;
1217   }
1218
1219   if (corked_ == cork) {
1220     return 0;
1221   }
1222
1223   int flag = cork ? 1 : 0;
1224   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1225     int errnoCopy = errno;
1226     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1227             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1228             << folly::errnoStr(errnoCopy);
1229     return errnoCopy;
1230   }
1231   corked_ = cork;
1232 #endif
1233   return 0;
1234 }
1235
1236 void AsyncSocket::ioReady(uint16_t events) noexcept {
1237   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1238           << ", events=" << std::hex << events << ", state=" << state_;
1239   DestructorGuard dg(this);
1240   assert(events & EventHandler::READ_WRITE);
1241   assert(eventBase_->isInEventBaseThread());
1242
1243   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1244   if (relevantEvents == EventHandler::READ) {
1245     handleRead();
1246   } else if (relevantEvents == EventHandler::WRITE) {
1247     handleWrite();
1248   } else if (relevantEvents == EventHandler::READ_WRITE) {
1249     EventBase* originalEventBase = eventBase_;
1250     // If both read and write events are ready, process writes first.
1251     handleWrite();
1252
1253     // Return now if handleWrite() detached us from our EventBase
1254     if (eventBase_ != originalEventBase) {
1255       return;
1256     }
1257
1258     // Only call handleRead() if a read callback is still installed.
1259     // (It's possible that the read callback was uninstalled during
1260     // handleWrite().)
1261     if (readCallback_) {
1262       handleRead();
1263     }
1264   } else {
1265     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1266                << std::hex << events << "(this=" << this << ")";
1267     abort();
1268   }
1269 }
1270
1271 ssize_t AsyncSocket::performRead(void** buf,
1272                                  size_t* buflen,
1273                                  size_t* /* offset */) {
1274   VLOG(5) << "AsyncSocket::performRead() this=" << this
1275           << ", buf=" << *buf << ", buflen=" << *buflen;
1276
1277   int recvFlags = 0;
1278   if (peek_) {
1279     recvFlags |= MSG_PEEK;
1280   }
1281
1282   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1283   if (bytes < 0) {
1284     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1285       // No more data to read right now.
1286       return READ_BLOCKING;
1287     } else {
1288       return READ_ERROR;
1289     }
1290   } else {
1291     appBytesReceived_ += bytes;
1292     return bytes;
1293   }
1294 }
1295
1296 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1297   // no matter what, buffer should be preapared for non-ssl socket
1298   CHECK(readCallback_);
1299   readCallback_->getReadBuffer(buf, buflen);
1300 }
1301
1302 void AsyncSocket::handleRead() noexcept {
1303   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1304           << ", state=" << state_;
1305   assert(state_ == StateEnum::ESTABLISHED);
1306   assert((shutdownFlags_ & SHUT_READ) == 0);
1307   assert(readCallback_ != nullptr);
1308   assert(eventFlags_ & EventHandler::READ);
1309
1310   // Loop until:
1311   // - a read attempt would block
1312   // - readCallback_ is uninstalled
1313   // - the number of loop iterations exceeds the optional maximum
1314   // - this AsyncSocket is moved to another EventBase
1315   //
1316   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1317   // which is why need to check for it here.
1318   //
1319   // The last bullet point is slightly subtle.  readDataAvailable() may also
1320   // detach this socket from this EventBase.  However, before
1321   // readDataAvailable() returns another thread may pick it up, attach it to
1322   // a different EventBase, and install another readCallback_.  We need to
1323   // exit immediately after readDataAvailable() returns if the eventBase_ has
1324   // changed.  (The caller must perform some sort of locking to transfer the
1325   // AsyncSocket between threads properly.  This will be sufficient to ensure
1326   // that this thread sees the updated eventBase_ variable after
1327   // readDataAvailable() returns.)
1328   uint16_t numReads = 0;
1329   EventBase* originalEventBase = eventBase_;
1330   while (readCallback_ && eventBase_ == originalEventBase) {
1331     // Get the buffer to read into.
1332     void* buf = nullptr;
1333     size_t buflen = 0, offset = 0;
1334     try {
1335       prepareReadBuffer(&buf, &buflen);
1336       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1337     } catch (const AsyncSocketException& ex) {
1338       return failRead(__func__, ex);
1339     } catch (const std::exception& ex) {
1340       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1341                               string("ReadCallback::getReadBuffer() "
1342                                      "threw exception: ") +
1343                               ex.what());
1344       return failRead(__func__, tex);
1345     } catch (...) {
1346       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1347                              "ReadCallback::getReadBuffer() threw "
1348                              "non-exception type");
1349       return failRead(__func__, ex);
1350     }
1351     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1352       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1353                              "ReadCallback::getReadBuffer() returned "
1354                              "empty buffer");
1355       return failRead(__func__, ex);
1356     }
1357
1358     // Perform the read
1359     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1360     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1361             << bytesRead << " bytes";
1362     if (bytesRead > 0) {
1363       if (!isBufferMovable_) {
1364         readCallback_->readDataAvailable(bytesRead);
1365       } else {
1366         CHECK(kOpenSslModeMoveBufferOwnership);
1367         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1368                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1369                 << ", offset=" << offset;
1370         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1371         readBuf->trimStart(offset);
1372         readBuf->trimEnd(buflen - offset - bytesRead);
1373         readCallback_->readBufferAvailable(std::move(readBuf));
1374       }
1375
1376       // Fall through and continue around the loop if the read
1377       // completely filled the available buffer.
1378       // Note that readCallback_ may have been uninstalled or changed inside
1379       // readDataAvailable().
1380       if (size_t(bytesRead) < buflen) {
1381         return;
1382       }
1383     } else if (bytesRead == READ_BLOCKING) {
1384         // No more data to read right now.
1385         return;
1386     } else if (bytesRead == READ_ERROR) {
1387       readErr_ = READ_ERROR;
1388       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1389                              withAddr("recv() failed"), errno);
1390       return failRead(__func__, ex);
1391     } else {
1392       assert(bytesRead == READ_EOF);
1393       readErr_ = READ_EOF;
1394       // EOF
1395       shutdownFlags_ |= SHUT_READ;
1396       if (!updateEventRegistration(0, EventHandler::READ)) {
1397         // we've already been moved into STATE_ERROR
1398         assert(state_ == StateEnum::ERROR);
1399         assert(readCallback_ == nullptr);
1400         return;
1401       }
1402
1403       ReadCallback* callback = readCallback_;
1404       readCallback_ = nullptr;
1405       callback->readEOF();
1406       return;
1407     }
1408     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1409       if (readCallback_ != nullptr) {
1410         // We might still have data in the socket.
1411         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1412         scheduleImmediateRead();
1413       }
1414       return;
1415     }
1416   }
1417 }
1418
1419 /**
1420  * This function attempts to write as much data as possible, until no more data
1421  * can be written.
1422  *
1423  * - If it sends all available data, it unregisters for write events, and stops
1424  *   the writeTimeout_.
1425  *
1426  * - If not all of the data can be sent immediately, it reschedules
1427  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1428  *   registered for write events.
1429  */
1430 void AsyncSocket::handleWrite() noexcept {
1431   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1432           << ", state=" << state_;
1433   if (state_ == StateEnum::CONNECTING) {
1434     handleConnect();
1435     return;
1436   }
1437
1438   // Normal write
1439   assert(state_ == StateEnum::ESTABLISHED);
1440   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1441   assert(writeReqHead_ != nullptr);
1442
1443   // Loop until we run out of write requests,
1444   // or until this socket is moved to another EventBase.
1445   // (See the comment in handleRead() explaining how this can happen.)
1446   EventBase* originalEventBase = eventBase_;
1447   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1448     if (!writeReqHead_->performWrite()) {
1449       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1450                              withAddr("writev() failed"), errno);
1451       return failWrite(__func__, ex);
1452     } else if (writeReqHead_->isComplete()) {
1453       // We finished this request
1454       WriteRequest* req = writeReqHead_;
1455       writeReqHead_ = req->getNext();
1456
1457       if (writeReqHead_ == nullptr) {
1458         writeReqTail_ = nullptr;
1459         // This is the last write request.
1460         // Unregister for write events and cancel the send timer
1461         // before we invoke the callback.  We have to update the state properly
1462         // before calling the callback, since it may want to detach us from
1463         // the EventBase.
1464         if (eventFlags_ & EventHandler::WRITE) {
1465           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1466             assert(state_ == StateEnum::ERROR);
1467             return;
1468           }
1469           // Stop the send timeout
1470           writeTimeout_.cancelTimeout();
1471         }
1472         assert(!writeTimeout_.isScheduled());
1473
1474         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1475         // we finish sending the last write request.
1476         //
1477         // We have to do this before invoking writeSuccess(), since
1478         // writeSuccess() may detach us from our EventBase.
1479         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1480           assert(connectCallback_ == nullptr);
1481           shutdownFlags_ |= SHUT_WRITE;
1482
1483           if (shutdownFlags_ & SHUT_READ) {
1484             // Reads have already been shutdown.  Fully close the socket and
1485             // move to STATE_CLOSED.
1486             //
1487             // Note: This code currently moves us to STATE_CLOSED even if
1488             // close() hasn't ever been called.  This can occur if we have
1489             // received EOF from the peer and shutdownWrite() has been called
1490             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1491             // case, until close() is actually called?  I can't think of a
1492             // reason why we would need to do so.  No other operations besides
1493             // calling close() or destroying the socket can be performed at
1494             // this point.
1495             assert(readCallback_ == nullptr);
1496             state_ = StateEnum::CLOSED;
1497             if (fd_ >= 0) {
1498               ioHandler_.changeHandlerFD(-1);
1499               doClose();
1500             }
1501           } else {
1502             // Reads are still enabled, so we are only doing a half-shutdown
1503             ::shutdown(fd_, SHUT_WR);
1504           }
1505         }
1506       }
1507
1508       // Invoke the callback
1509       WriteCallback* callback = req->getCallback();
1510       req->destroy();
1511       if (callback) {
1512         callback->writeSuccess();
1513       }
1514       // We'll continue around the loop, trying to write another request
1515     } else {
1516       // Partial write.
1517       if (bufferCallback_) {
1518         bufferCallback_->onEgressBuffered();
1519       }
1520       writeReqHead_->consume();
1521       // Stop after a partial write; it's highly likely that a subsequent write
1522       // attempt will just return EAGAIN.
1523       //
1524       // Ensure that we are registered for write events.
1525       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1526         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1527           assert(state_ == StateEnum::ERROR);
1528           return;
1529         }
1530       }
1531
1532       // Reschedule the send timeout, since we have made some write progress.
1533       if (sendTimeout_ > 0) {
1534         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1535           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1536               withAddr("failed to reschedule write timeout"));
1537           return failWrite(__func__, ex);
1538         }
1539       }
1540       return;
1541     }
1542   }
1543   if (!writeReqHead_ && bufferCallback_) {
1544     bufferCallback_->onEgressBufferCleared();
1545   }
1546 }
1547
1548 void AsyncSocket::checkForImmediateRead() noexcept {
1549   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1550   // (However, note that some subclasses do override this method.)
1551   //
1552   // Simply calling handleRead() here would be bad, as this would call
1553   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1554   // buffer even though no data may be available.  This would waste lots of
1555   // memory, since the buffer will sit around unused until the socket actually
1556   // becomes readable.
1557   //
1558   // Checking if the socket is readable now also seems like it would probably
1559   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1560   // would just waste an extra system call.  Even if it is readable, waiting to
1561   // find out from libevent on the next event loop doesn't seem that bad.
1562 }
1563
1564 void AsyncSocket::handleInitialReadWrite() noexcept {
1565   // Our callers should already be holding a DestructorGuard, but grab
1566   // one here just to make sure, in case one of our calling code paths ever
1567   // changes.
1568   DestructorGuard dg(this);
1569
1570   // If we have a readCallback_, make sure we enable read events.  We
1571   // may already be registered for reads if connectSuccess() set
1572   // the read calback.
1573   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1574     assert(state_ == StateEnum::ESTABLISHED);
1575     assert((shutdownFlags_ & SHUT_READ) == 0);
1576     if (!updateEventRegistration(EventHandler::READ, 0)) {
1577       assert(state_ == StateEnum::ERROR);
1578       return;
1579     }
1580     checkForImmediateRead();
1581   } else if (readCallback_ == nullptr) {
1582     // Unregister for read events.
1583     updateEventRegistration(0, EventHandler::READ);
1584   }
1585
1586   // If we have write requests pending, try to send them immediately.
1587   // Since we just finished accepting, there is a very good chance that we can
1588   // write without blocking.
1589   //
1590   // However, we only process them if EventHandler::WRITE is not already set,
1591   // which means that we're already blocked on a write attempt.  (This can
1592   // happen if connectSuccess() called write() before returning.)
1593   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1594     // Call handleWrite() to perform write processing.
1595     handleWrite();
1596   } else if (writeReqHead_ == nullptr) {
1597     // Unregister for write event.
1598     updateEventRegistration(0, EventHandler::WRITE);
1599   }
1600 }
1601
1602 void AsyncSocket::handleConnect() noexcept {
1603   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1604           << ", state=" << state_;
1605   assert(state_ == StateEnum::CONNECTING);
1606   // SHUT_WRITE can never be set while we are still connecting;
1607   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1608   // finishes
1609   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1610
1611   // In case we had a connect timeout, cancel the timeout
1612   writeTimeout_.cancelTimeout();
1613   // We don't use a persistent registration when waiting on a connect event,
1614   // so we have been automatically unregistered now.  Update eventFlags_ to
1615   // reflect reality.
1616   assert(eventFlags_ == EventHandler::WRITE);
1617   eventFlags_ = EventHandler::NONE;
1618
1619   // Call getsockopt() to check if the connect succeeded
1620   int error;
1621   socklen_t len = sizeof(error);
1622   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1623   if (rv != 0) {
1624     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1625                            withAddr("error calling getsockopt() after connect"),
1626                            errno);
1627     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1628                << fd_ << " host=" << addr_.describe()
1629                << ") exception:" << ex.what();
1630     return failConnect(__func__, ex);
1631   }
1632
1633   if (error != 0) {
1634     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1635                            "connect failed", error);
1636     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1637             << fd_ << " host=" << addr_.describe()
1638             << ") exception: " << ex.what();
1639     return failConnect(__func__, ex);
1640   }
1641
1642   // Move into STATE_ESTABLISHED
1643   state_ = StateEnum::ESTABLISHED;
1644
1645   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1646   // perform, immediately shutdown the write half of the socket.
1647   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1648     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1649     // are still connecting we just abort the connect rather than waiting for
1650     // it to complete.
1651     assert((shutdownFlags_ & SHUT_READ) == 0);
1652     ::shutdown(fd_, SHUT_WR);
1653     shutdownFlags_ |= SHUT_WRITE;
1654   }
1655
1656   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1657           << "successfully connected; state=" << state_;
1658
1659   // Remember the EventBase we are attached to, before we start invoking any
1660   // callbacks (since the callbacks may call detachEventBase()).
1661   EventBase* originalEventBase = eventBase_;
1662
1663   invokeConnectSuccess();
1664   // Note that the connect callback may have changed our state.
1665   // (set or unset the read callback, called write(), closed the socket, etc.)
1666   // The following code needs to handle these situations correctly.
1667   //
1668   // If the socket has been closed, readCallback_ and writeReqHead_ will
1669   // always be nullptr, so that will prevent us from trying to read or write.
1670   //
1671   // The main thing to check for is if eventBase_ is still originalEventBase.
1672   // If not, we have been detached from this event base, so we shouldn't
1673   // perform any more operations.
1674   if (eventBase_ != originalEventBase) {
1675     return;
1676   }
1677
1678   handleInitialReadWrite();
1679 }
1680
1681 void AsyncSocket::timeoutExpired() noexcept {
1682   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1683           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1684   DestructorGuard dg(this);
1685   assert(eventBase_->isInEventBaseThread());
1686
1687   if (state_ == StateEnum::CONNECTING) {
1688     // connect() timed out
1689     // Unregister for I/O events.
1690     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1691                            "connect timed out");
1692     failConnect(__func__, ex);
1693   } else {
1694     // a normal write operation timed out
1695     assert(state_ == StateEnum::ESTABLISHED);
1696     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1697     failWrite(__func__, ex);
1698   }
1699 }
1700
1701 ssize_t AsyncSocket::performWrite(const iovec* vec,
1702                                    uint32_t count,
1703                                    WriteFlags flags,
1704                                    uint32_t* countWritten,
1705                                    uint32_t* partialWritten) {
1706   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1707   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1708   // (since it may terminate the program if the main program doesn't explicitly
1709   // ignore it).
1710   struct msghdr msg;
1711   msg.msg_name = nullptr;
1712   msg.msg_namelen = 0;
1713   msg.msg_iov = const_cast<iovec *>(vec);
1714 #ifdef IOV_MAX // not defined on Android
1715   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1716 #else
1717   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1718 #endif
1719   msg.msg_control = nullptr;
1720   msg.msg_controllen = 0;
1721   msg.msg_flags = 0;
1722
1723   int msg_flags = MSG_DONTWAIT;
1724
1725 #ifdef MSG_NOSIGNAL // Linux-only
1726   msg_flags |= MSG_NOSIGNAL;
1727   if (isSet(flags, WriteFlags::CORK)) {
1728     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1729     // give it the rest of the data rather than immediately sending a partial
1730     // frame, even when TCP_NODELAY is enabled.
1731     msg_flags |= MSG_MORE;
1732   }
1733 #endif
1734   if (isSet(flags, WriteFlags::EOR)) {
1735     // marks that this is the last byte of a record (response)
1736     msg_flags |= MSG_EOR;
1737   }
1738   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1739   if (totalWritten < 0) {
1740     if (errno == EAGAIN) {
1741       // TCP buffer is full; we can't write any more data right now.
1742       *countWritten = 0;
1743       *partialWritten = 0;
1744       return 0;
1745     }
1746     // error
1747     *countWritten = 0;
1748     *partialWritten = 0;
1749     return -1;
1750   }
1751
1752   appBytesWritten_ += totalWritten;
1753
1754   uint32_t bytesWritten;
1755   uint32_t n;
1756   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1757     const iovec* v = vec + n;
1758     if (v->iov_len > bytesWritten) {
1759       // Partial write finished in the middle of this iovec
1760       *countWritten = n;
1761       *partialWritten = bytesWritten;
1762       return totalWritten;
1763     }
1764
1765     bytesWritten -= v->iov_len;
1766   }
1767
1768   assert(bytesWritten == 0);
1769   *countWritten = n;
1770   *partialWritten = 0;
1771   return totalWritten;
1772 }
1773
1774 /**
1775  * Re-register the EventHandler after eventFlags_ has changed.
1776  *
1777  * If an error occurs, fail() is called to move the socket into the error state
1778  * and call all currently installed callbacks.  After an error, the
1779  * AsyncSocket is completely unregistered.
1780  *
1781  * @return Returns true on succcess, or false on error.
1782  */
1783 bool AsyncSocket::updateEventRegistration() {
1784   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1785           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1786           << ", events=" << std::hex << eventFlags_;
1787   assert(eventBase_->isInEventBaseThread());
1788   if (eventFlags_ == EventHandler::NONE) {
1789     ioHandler_.unregisterHandler();
1790     return true;
1791   }
1792
1793   // Always register for persistent events, so we don't have to re-register
1794   // after being called back.
1795   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1796     eventFlags_ = EventHandler::NONE; // we're not registered after error
1797     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1798         withAddr("failed to update AsyncSocket event registration"));
1799     fail("updateEventRegistration", ex);
1800     return false;
1801   }
1802
1803   return true;
1804 }
1805
1806 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1807                                            uint16_t disable) {
1808   uint16_t oldFlags = eventFlags_;
1809   eventFlags_ |= enable;
1810   eventFlags_ &= ~disable;
1811   if (eventFlags_ == oldFlags) {
1812     return true;
1813   } else {
1814     return updateEventRegistration();
1815   }
1816 }
1817
1818 void AsyncSocket::startFail() {
1819   // startFail() should only be called once
1820   assert(state_ != StateEnum::ERROR);
1821   assert(getDestructorGuardCount() > 0);
1822   state_ = StateEnum::ERROR;
1823   // Ensure that SHUT_READ and SHUT_WRITE are set,
1824   // so all future attempts to read or write will be rejected
1825   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1826
1827   if (eventFlags_ != EventHandler::NONE) {
1828     eventFlags_ = EventHandler::NONE;
1829     ioHandler_.unregisterHandler();
1830   }
1831   writeTimeout_.cancelTimeout();
1832
1833   if (fd_ >= 0) {
1834     ioHandler_.changeHandlerFD(-1);
1835     doClose();
1836   }
1837 }
1838
1839 void AsyncSocket::finishFail() {
1840   assert(state_ == StateEnum::ERROR);
1841   assert(getDestructorGuardCount() > 0);
1842
1843   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1844                          withAddr("socket closing after error"));
1845   invokeConnectErr(ex);
1846   failAllWrites(ex);
1847
1848   if (readCallback_) {
1849     ReadCallback* callback = readCallback_;
1850     readCallback_ = nullptr;
1851     callback->readErr(ex);
1852   }
1853 }
1854
1855 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1856   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1857              << state_ << " host=" << addr_.describe()
1858              << "): failed in " << fn << "(): "
1859              << ex.what();
1860   startFail();
1861   finishFail();
1862 }
1863
1864 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1865   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1866                << state_ << " host=" << addr_.describe()
1867                << "): failed while connecting in " << fn << "(): "
1868                << ex.what();
1869   startFail();
1870
1871   invokeConnectErr(ex);
1872   finishFail();
1873 }
1874
1875 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1876   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1877                << state_ << " host=" << addr_.describe()
1878                << "): failed while reading in " << fn << "(): "
1879                << ex.what();
1880   startFail();
1881
1882   if (readCallback_ != nullptr) {
1883     ReadCallback* callback = readCallback_;
1884     readCallback_ = nullptr;
1885     callback->readErr(ex);
1886   }
1887
1888   finishFail();
1889 }
1890
1891 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1892   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1893                << state_ << " host=" << addr_.describe()
1894                << "): failed while writing in " << fn << "(): "
1895                << ex.what();
1896   startFail();
1897
1898   // Only invoke the first write callback, since the error occurred while
1899   // writing this request.  Let any other pending write callbacks be invoked in
1900   // finishFail().
1901   if (writeReqHead_ != nullptr) {
1902     WriteRequest* req = writeReqHead_;
1903     writeReqHead_ = req->getNext();
1904     WriteCallback* callback = req->getCallback();
1905     uint32_t bytesWritten = req->getTotalBytesWritten();
1906     req->destroy();
1907     if (callback) {
1908       callback->writeErr(bytesWritten, ex);
1909     }
1910   }
1911
1912   finishFail();
1913 }
1914
1915 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1916                              size_t bytesWritten,
1917                              const AsyncSocketException& ex) {
1918   // This version of failWrite() is used when the failure occurs before
1919   // we've added the callback to writeReqHead_.
1920   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1921              << state_ << " host=" << addr_.describe()
1922              <<"): failed while writing in " << fn << "(): "
1923              << ex.what();
1924   startFail();
1925
1926   if (callback != nullptr) {
1927     callback->writeErr(bytesWritten, ex);
1928   }
1929
1930   finishFail();
1931 }
1932
1933 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1934   // Invoke writeError() on all write callbacks.
1935   // This is used when writes are forcibly shutdown with write requests
1936   // pending, or when an error occurs with writes pending.
1937   while (writeReqHead_ != nullptr) {
1938     WriteRequest* req = writeReqHead_;
1939     writeReqHead_ = req->getNext();
1940     WriteCallback* callback = req->getCallback();
1941     if (callback) {
1942       callback->writeErr(req->getTotalBytesWritten(), ex);
1943     }
1944     req->destroy();
1945   }
1946 }
1947
1948 void AsyncSocket::invalidState(ConnectCallback* callback) {
1949   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1950              << "): connect() called in invalid state " << state_;
1951
1952   /*
1953    * The invalidState() methods don't use the normal failure mechanisms,
1954    * since we don't know what state we are in.  We don't want to call
1955    * startFail()/finishFail() recursively if we are already in the middle of
1956    * cleaning up.
1957    */
1958
1959   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1960                          "connect() called with socket in invalid state");
1961   connectEndTime_ = std::chrono::steady_clock::now();
1962   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1963     if (callback) {
1964       callback->connectErr(ex);
1965     }
1966   } else {
1967     // We can't use failConnect() here since connectCallback_
1968     // may already be set to another callback.  Invoke this ConnectCallback
1969     // here; any other connectCallback_ will be invoked in finishFail()
1970     startFail();
1971     if (callback) {
1972       callback->connectErr(ex);
1973     }
1974     finishFail();
1975   }
1976 }
1977
1978 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1979   connectEndTime_ = std::chrono::steady_clock::now();
1980   if (connectCallback_) {
1981     ConnectCallback* callback = connectCallback_;
1982     connectCallback_ = nullptr;
1983     callback->connectErr(ex);
1984   }
1985 }
1986
1987 void AsyncSocket::invokeConnectSuccess() {
1988   connectEndTime_ = std::chrono::steady_clock::now();
1989   if (connectCallback_) {
1990     ConnectCallback* callback = connectCallback_;
1991     connectCallback_ = nullptr;
1992     callback->connectSuccess();
1993   }
1994 }
1995
1996 void AsyncSocket::invalidState(ReadCallback* callback) {
1997   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1998              << "): setReadCallback(" << callback
1999              << ") called in invalid state " << state_;
2000
2001   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2002                          "setReadCallback() called with socket in "
2003                          "invalid state");
2004   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2005     if (callback) {
2006       callback->readErr(ex);
2007     }
2008   } else {
2009     startFail();
2010     if (callback) {
2011       callback->readErr(ex);
2012     }
2013     finishFail();
2014   }
2015 }
2016
2017 void AsyncSocket::invalidState(WriteCallback* callback) {
2018   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2019              << "): write() called in invalid state " << state_;
2020
2021   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2022                          withAddr("write() called with socket in invalid state"));
2023   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2024     if (callback) {
2025       callback->writeErr(0, ex);
2026     }
2027   } else {
2028     startFail();
2029     if (callback) {
2030       callback->writeErr(0, ex);
2031     }
2032     finishFail();
2033   }
2034 }
2035
2036 void AsyncSocket::doClose() {
2037   if (fd_ == -1) return;
2038   if (shutdownSocketSet_) {
2039     shutdownSocketSet_->close(fd_);
2040   } else {
2041     ::close(fd_);
2042   }
2043   fd_ = -1;
2044 }
2045
2046 std::ostream& operator << (std::ostream& os,
2047                            const AsyncSocket::StateEnum& state) {
2048   os << static_cast<int>(state);
2049   return os;
2050 }
2051
2052 std::string AsyncSocket::withAddr(const std::string& s) {
2053   // Don't use addr_ directly because it may not be initialized
2054   // e.g. if constructed from fd
2055   folly::SocketAddress peer, local;
2056   try {
2057     getPeerAddress(&peer);
2058     getLocalAddress(&local);
2059   } catch (const std::exception&) {
2060     // ignore
2061   } catch (...) {
2062     // ignore
2063   }
2064   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2065 }
2066
2067 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2068   bufferCallback_ = cb;
2069 }
2070
2071 } // folly