fa475faa7ba85de7ea7caf7f4efbbc704f1a2e7b
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
341                                 withAddr("failed to create socket"), errno);
342     }
343     if (shutdownSocketSet_) {
344       shutdownSocketSet_->add(fd_);
345     }
346     ioHandler_.changeHandlerFD(fd_);
347
348     setCloseOnExec();
349
350     // Put the socket in non-blocking mode
351     int flags = fcntl(fd_, F_GETFL, 0);
352     if (flags == -1) {
353       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
354                                 withAddr("failed to get socket flags"), errno);
355     }
356     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
357     if (rv == -1) {
358       throw AsyncSocketException(
359           AsyncSocketException::INTERNAL_ERROR,
360           withAddr("failed to put socket in non-blocking mode"),
361           errno);
362     }
363
364 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
365     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
366     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
367     if (rv == -1) {
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           "failed to enable F_SETNOSIGPIPE on socket",
371           errno);
372     }
373 #endif
374
375     // By default, turn on TCP_NODELAY
376     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
377     // setNoDelay() will log an error message if it fails.
378     if (address.getFamily() != AF_UNIX) {
379       (void)setNoDelay(true);
380     }
381
382     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
383             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
384
385     // bind the socket
386     if (bindAddr != anyAddress()) {
387       int one = 1;
388       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
389         doClose();
390         throw AsyncSocketException(
391           AsyncSocketException::NOT_OPEN,
392           "failed to setsockopt prior to bind on " + bindAddr.describe(),
393           errno);
394       }
395
396       bindAddr.getAddress(&addrStorage);
397
398       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
399         doClose();
400         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
401                                   "failed to bind to async socket: " +
402                                   bindAddr.describe(),
403                                   errno);
404       }
405     }
406
407     // Apply the additional options if any.
408     for (const auto& opt: options) {
409       int rv = opt.first.apply(fd_, opt.second);
410       if (rv != 0) {
411         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
412                                   withAddr("failed to set socket option"),
413                                   errno);
414       }
415     }
416
417     // Perform the connect()
418     address.getAddress(&addrStorage);
419
420     rv = ::connect(fd_, saddr, address.getActualSize());
421     if (rv < 0) {
422       if (errno == EINPROGRESS) {
423         // Connection in progress.
424         if (timeout > 0) {
425           // Start a timer in case the connection takes too long.
426           if (!writeTimeout_.scheduleTimeout(timeout)) {
427             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
428                 withAddr("failed to schedule AsyncSocket connect timeout"));
429           }
430         }
431
432         // Register for write events, so we'll
433         // be notified when the connection finishes/fails.
434         // Note that we don't register for a persistent event here.
435         assert(eventFlags_ == EventHandler::NONE);
436         eventFlags_ = EventHandler::WRITE;
437         if (!ioHandler_.registerHandler(eventFlags_)) {
438           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
439               withAddr("failed to register AsyncSocket connect handler"));
440         }
441         return;
442       } else {
443         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
444                                   "connect failed (immediately)", errno);
445       }
446     }
447
448     // If we're still here the connect() succeeded immediately.
449     // Fall through to call the callback outside of this try...catch block
450   } catch (const AsyncSocketException& ex) {
451     return failConnect(__func__, ex);
452   } catch (const std::exception& ex) {
453     // shouldn't happen, but handle it just in case
454     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
455                << "): unexpected " << typeid(ex).name() << " exception: "
456                << ex.what();
457     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
458                             withAddr(string("unexpected exception: ") +
459                                      ex.what()));
460     return failConnect(__func__, tex);
461   }
462
463   // The connection succeeded immediately
464   // The read callback may not have been set yet, and no writes may be pending
465   // yet, so we don't have to register for any events at the moment.
466   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
467   assert(readCallback_ == nullptr);
468   assert(writeReqHead_ == nullptr);
469   state_ = StateEnum::ESTABLISHED;
470   invokeConnectSuccess();
471 }
472
473 void AsyncSocket::connect(ConnectCallback* callback,
474                            const string& ip, uint16_t port,
475                            int timeout,
476                            const OptionMap &options) noexcept {
477   DestructorGuard dg(this);
478   try {
479     connectCallback_ = callback;
480     connect(callback, folly::SocketAddress(ip, port), timeout, options);
481   } catch (const std::exception& ex) {
482     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
483                             ex.what());
484     return failConnect(__func__, tex);
485   }
486 }
487
488 void AsyncSocket::cancelConnect() {
489   connectCallback_ = nullptr;
490   if (state_ == StateEnum::CONNECTING) {
491     closeNow();
492   }
493 }
494
495 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
496   sendTimeout_ = milliseconds;
497   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
498
499   // If we are currently pending on write requests, immediately update
500   // writeTimeout_ with the new value.
501   if ((eventFlags_ & EventHandler::WRITE) &&
502       (state_ != StateEnum::CONNECTING)) {
503     assert(state_ == StateEnum::ESTABLISHED);
504     assert((shutdownFlags_ & SHUT_WRITE) == 0);
505     if (sendTimeout_ > 0) {
506       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
507         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
508             withAddr("failed to reschedule send timeout in setSendTimeout"));
509         return failWrite(__func__, ex);
510       }
511     } else {
512       writeTimeout_.cancelTimeout();
513     }
514   }
515 }
516
517 void AsyncSocket::setReadCB(ReadCallback *callback) {
518   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
519           << ", callback=" << callback << ", state=" << state_;
520
521   // Short circuit if callback is the same as the existing readCallback_.
522   //
523   // Note that this is needed for proper functioning during some cleanup cases.
524   // During cleanup we allow setReadCallback(nullptr) to be called even if the
525   // read callback is already unset and we have been detached from an event
526   // base.  This check prevents us from asserting
527   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
528   if (callback == readCallback_) {
529     return;
530   }
531
532   /* We are removing a read callback */
533   if (callback == nullptr &&
534       immediateReadHandler_.isLoopCallbackScheduled()) {
535     immediateReadHandler_.cancelLoopCallback();
536   }
537
538   if (shutdownFlags_ & SHUT_READ) {
539     // Reads have already been shut down on this socket.
540     //
541     // Allow setReadCallback(nullptr) to be called in this case, but don't
542     // allow a new callback to be set.
543     //
544     // For example, setReadCallback(nullptr) can happen after an error if we
545     // invoke some other error callback before invoking readError().  The other
546     // error callback that is invoked first may go ahead and clear the read
547     // callback before we get a chance to invoke readError().
548     if (callback != nullptr) {
549       return invalidState(callback);
550     }
551     assert((eventFlags_ & EventHandler::READ) == 0);
552     readCallback_ = nullptr;
553     return;
554   }
555
556   DestructorGuard dg(this);
557   assert(eventBase_->isInEventBaseThread());
558
559   switch ((StateEnum)state_) {
560     case StateEnum::CONNECTING:
561       // For convenience, we allow the read callback to be set while we are
562       // still connecting.  We just store the callback for now.  Once the
563       // connection completes we'll register for read events.
564       readCallback_ = callback;
565       return;
566     case StateEnum::ESTABLISHED:
567     {
568       readCallback_ = callback;
569       uint16_t oldFlags = eventFlags_;
570       if (readCallback_) {
571         eventFlags_ |= EventHandler::READ;
572       } else {
573         eventFlags_ &= ~EventHandler::READ;
574       }
575
576       // Update our registration if our flags have changed
577       if (eventFlags_ != oldFlags) {
578         // We intentionally ignore the return value here.
579         // updateEventRegistration() will move us into the error state if it
580         // fails, and we don't need to do anything else here afterwards.
581         (void)updateEventRegistration();
582       }
583
584       if (readCallback_) {
585         checkForImmediateRead();
586       }
587       return;
588     }
589     case StateEnum::CLOSED:
590     case StateEnum::ERROR:
591       // We should never reach here.  SHUT_READ should always be set
592       // if we are in STATE_CLOSED or STATE_ERROR.
593       assert(false);
594       return invalidState(callback);
595     case StateEnum::UNINIT:
596       // We do not allow setReadCallback() to be called before we start
597       // connecting.
598       return invalidState(callback);
599   }
600
601   // We don't put a default case in the switch statement, so that the compiler
602   // will warn us to update the switch statement if a new state is added.
603   return invalidState(callback);
604 }
605
606 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
607   return readCallback_;
608 }
609
610 void AsyncSocket::write(WriteCallback* callback,
611                          const void* buf, size_t bytes, WriteFlags flags) {
612   iovec op;
613   op.iov_base = const_cast<void*>(buf);
614   op.iov_len = bytes;
615   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
616 }
617
618 void AsyncSocket::writev(WriteCallback* callback,
619                           const iovec* vec,
620                           size_t count,
621                           WriteFlags flags) {
622   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
623 }
624
625 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
626                               WriteFlags flags) {
627   constexpr size_t kSmallSizeMax = 64;
628   size_t count = buf->countChainElements();
629   if (count <= kSmallSizeMax) {
630     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632   } else {
633     iovec* vec = new iovec[count];
634     writeChainImpl(callback, vec, count, std::move(buf), flags);
635     delete[] vec;
636   }
637 }
638
639 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
640     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
641   size_t veclen = buf->fillIov(vec, count);
642   writeImpl(callback, vec, veclen, std::move(buf), flags);
643 }
644
645 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
646                              size_t count, unique_ptr<IOBuf>&& buf,
647                              WriteFlags flags) {
648   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
649           << ", callback=" << callback << ", count=" << count
650           << ", state=" << state_;
651   DestructorGuard dg(this);
652   unique_ptr<IOBuf>ioBuf(std::move(buf));
653   assert(eventBase_->isInEventBaseThread());
654
655   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
656     // No new writes may be performed after the write side of the socket has
657     // been shutdown.
658     //
659     // We could just call callback->writeError() here to fail just this write.
660     // However, fail hard and use invalidState() to fail all outstanding
661     // callbacks and move the socket into the error state.  There's most likely
662     // a bug in the caller's code, so we abort everything rather than trying to
663     // proceed as best we can.
664     return invalidState(callback);
665   }
666
667   uint32_t countWritten = 0;
668   uint32_t partialWritten = 0;
669   int bytesWritten = 0;
670   bool mustRegister = false;
671   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
672     if (writeReqHead_ == nullptr) {
673       // If we are established and there are no other writes pending,
674       // we can attempt to perform the write immediately.
675       assert(writeReqTail_ == nullptr);
676       assert((eventFlags_ & EventHandler::WRITE) == 0);
677
678       bytesWritten = performWrite(vec, count, flags,
679                                   &countWritten, &partialWritten);
680       if (bytesWritten < 0) {
681         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
682                                withAddr("writev failed"), errno);
683         return failWrite(__func__, callback, 0, ex);
684       } else if (countWritten == count) {
685         // We successfully wrote everything.
686         // Invoke the callback and return.
687         if (callback) {
688           callback->writeSuccess();
689         }
690         return;
691       } else { // continue writing the next writeReq
692         if (bufferCallback_) {
693           bufferCallback_->onEgressBuffered();
694         }
695       }
696       mustRegister = true;
697     }
698   } else if (!connecting()) {
699     // Invalid state for writing
700     return invalidState(callback);
701   }
702
703   // Create a new WriteRequest to add to the queue
704   WriteRequest* req;
705   try {
706     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
707                                         count - countWritten, partialWritten,
708                                         bytesWritten, std::move(ioBuf), flags);
709   } catch (const std::exception& ex) {
710     // we mainly expect to catch std::bad_alloc here
711     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
712         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
713     return failWrite(__func__, callback, bytesWritten, tex);
714   }
715   req->consume();
716   if (writeReqTail_ == nullptr) {
717     assert(writeReqHead_ == nullptr);
718     writeReqHead_ = writeReqTail_ = req;
719   } else {
720     writeReqTail_->append(req);
721     writeReqTail_ = req;
722   }
723
724   // Register for write events if are established and not currently
725   // waiting on write events
726   if (mustRegister) {
727     assert(state_ == StateEnum::ESTABLISHED);
728     assert((eventFlags_ & EventHandler::WRITE) == 0);
729     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
730       assert(state_ == StateEnum::ERROR);
731       return;
732     }
733     if (sendTimeout_ > 0) {
734       // Schedule a timeout to fire if the write takes too long.
735       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
736         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
737                                withAddr("failed to schedule send timeout"));
738         return failWrite(__func__, ex);
739       }
740     }
741   }
742 }
743
744 void AsyncSocket::writeRequest(WriteRequest* req) {
745   if (writeReqTail_ == nullptr) {
746     assert(writeReqHead_ == nullptr);
747     writeReqHead_ = writeReqTail_ = req;
748     req->start();
749   } else {
750     writeReqTail_->append(req);
751     writeReqTail_ = req;
752   }
753 }
754
755 void AsyncSocket::close() {
756   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
757           << ", state=" << state_ << ", shutdownFlags="
758           << std::hex << (int) shutdownFlags_;
759
760   // close() is only different from closeNow() when there are pending writes
761   // that need to drain before we can close.  In all other cases, just call
762   // closeNow().
763   //
764   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
765   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
766   // is still running.  (e.g., If there are multiple pending writes, and we
767   // call writeError() on the first one, it may call close().  In this case we
768   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
769   // writes will still be in the queue.)
770   //
771   // We only need to drain pending writes if we are still in STATE_CONNECTING
772   // or STATE_ESTABLISHED
773   if ((writeReqHead_ == nullptr) ||
774       !(state_ == StateEnum::CONNECTING ||
775       state_ == StateEnum::ESTABLISHED)) {
776     closeNow();
777     return;
778   }
779
780   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
781   // destroyed until close() returns.
782   DestructorGuard dg(this);
783   assert(eventBase_->isInEventBaseThread());
784
785   // Since there are write requests pending, we have to set the
786   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
787   // connect finishes and we finish writing these requests.
788   //
789   // Set SHUT_READ to indicate that reads are shut down, and set the
790   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
791   // pending writes complete.
792   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
793
794   // If a read callback is set, invoke readEOF() immediately to inform it that
795   // the socket has been closed and no more data can be read.
796   if (readCallback_) {
797     // Disable reads if they are enabled
798     if (!updateEventRegistration(0, EventHandler::READ)) {
799       // We're now in the error state; callbacks have been cleaned up
800       assert(state_ == StateEnum::ERROR);
801       assert(readCallback_ == nullptr);
802     } else {
803       ReadCallback* callback = readCallback_;
804       readCallback_ = nullptr;
805       callback->readEOF();
806     }
807   }
808 }
809
810 void AsyncSocket::closeNow() {
811   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
812           << ", state=" << state_ << ", shutdownFlags="
813           << std::hex << (int) shutdownFlags_;
814   DestructorGuard dg(this);
815   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
816
817   switch (state_) {
818     case StateEnum::ESTABLISHED:
819     case StateEnum::CONNECTING:
820     {
821       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
822       state_ = StateEnum::CLOSED;
823
824       // If the write timeout was set, cancel it.
825       writeTimeout_.cancelTimeout();
826
827       // If we are registered for I/O events, unregister.
828       if (eventFlags_ != EventHandler::NONE) {
829         eventFlags_ = EventHandler::NONE;
830         if (!updateEventRegistration()) {
831           // We will have been moved into the error state.
832           assert(state_ == StateEnum::ERROR);
833           return;
834         }
835       }
836
837       if (immediateReadHandler_.isLoopCallbackScheduled()) {
838         immediateReadHandler_.cancelLoopCallback();
839       }
840
841       if (fd_ >= 0) {
842         ioHandler_.changeHandlerFD(-1);
843         doClose();
844       }
845
846       invokeConnectErr(socketClosedLocallyEx);
847
848       failAllWrites(socketClosedLocallyEx);
849
850       if (readCallback_) {
851         ReadCallback* callback = readCallback_;
852         readCallback_ = nullptr;
853         callback->readEOF();
854       }
855       return;
856     }
857     case StateEnum::CLOSED:
858       // Do nothing.  It's possible that we are being called recursively
859       // from inside a callback that we invoked inside another call to close()
860       // that is still running.
861       return;
862     case StateEnum::ERROR:
863       // Do nothing.  The error handling code has performed (or is performing)
864       // cleanup.
865       return;
866     case StateEnum::UNINIT:
867       assert(eventFlags_ == EventHandler::NONE);
868       assert(connectCallback_ == nullptr);
869       assert(readCallback_ == nullptr);
870       assert(writeReqHead_ == nullptr);
871       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
872       state_ = StateEnum::CLOSED;
873       return;
874   }
875
876   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
877               << ") called in unknown state " << state_;
878 }
879
880 void AsyncSocket::closeWithReset() {
881   // Enable SO_LINGER, with the linger timeout set to 0.
882   // This will trigger a TCP reset when we close the socket.
883   if (fd_ >= 0) {
884     struct linger optLinger = {1, 0};
885     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
886       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
887               << "on " << fd_ << ": errno=" << errno;
888     }
889   }
890
891   // Then let closeNow() take care of the rest
892   closeNow();
893 }
894
895 void AsyncSocket::shutdownWrite() {
896   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
897           << ", state=" << state_ << ", shutdownFlags="
898           << std::hex << (int) shutdownFlags_;
899
900   // If there are no pending writes, shutdownWrite() is identical to
901   // shutdownWriteNow().
902   if (writeReqHead_ == nullptr) {
903     shutdownWriteNow();
904     return;
905   }
906
907   assert(eventBase_->isInEventBaseThread());
908
909   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
910   // shutdown will be performed once all writes complete.
911   shutdownFlags_ |= SHUT_WRITE_PENDING;
912 }
913
914 void AsyncSocket::shutdownWriteNow() {
915   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
916           << ", fd=" << fd_ << ", state=" << state_
917           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
918
919   if (shutdownFlags_ & SHUT_WRITE) {
920     // Writes are already shutdown; nothing else to do.
921     return;
922   }
923
924   // If SHUT_READ is already set, just call closeNow() to completely
925   // close the socket.  This can happen if close() was called with writes
926   // pending, and then shutdownWriteNow() is called before all pending writes
927   // complete.
928   if (shutdownFlags_ & SHUT_READ) {
929     closeNow();
930     return;
931   }
932
933   DestructorGuard dg(this);
934   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
935
936   switch (static_cast<StateEnum>(state_)) {
937     case StateEnum::ESTABLISHED:
938     {
939       shutdownFlags_ |= SHUT_WRITE;
940
941       // If the write timeout was set, cancel it.
942       writeTimeout_.cancelTimeout();
943
944       // If we are registered for write events, unregister.
945       if (!updateEventRegistration(0, EventHandler::WRITE)) {
946         // We will have been moved into the error state.
947         assert(state_ == StateEnum::ERROR);
948         return;
949       }
950
951       // Shutdown writes on the file descriptor
952       ::shutdown(fd_, SHUT_WR);
953
954       // Immediately fail all write requests
955       failAllWrites(socketShutdownForWritesEx);
956       return;
957     }
958     case StateEnum::CONNECTING:
959     {
960       // Set the SHUT_WRITE_PENDING flag.
961       // When the connection completes, it will check this flag,
962       // shutdown the write half of the socket, and then set SHUT_WRITE.
963       shutdownFlags_ |= SHUT_WRITE_PENDING;
964
965       // Immediately fail all write requests
966       failAllWrites(socketShutdownForWritesEx);
967       return;
968     }
969     case StateEnum::UNINIT:
970       // Callers normally shouldn't call shutdownWriteNow() before the socket
971       // even starts connecting.  Nonetheless, go ahead and set
972       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
973       // immediately shut down the write side of the socket.
974       shutdownFlags_ |= SHUT_WRITE_PENDING;
975       return;
976     case StateEnum::CLOSED:
977     case StateEnum::ERROR:
978       // We should never get here.  SHUT_WRITE should always be set
979       // in STATE_CLOSED and STATE_ERROR.
980       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
981                  << ", fd=" << fd_ << ") in unexpected state " << state_
982                  << " with SHUT_WRITE not set ("
983                  << std::hex << (int) shutdownFlags_ << ")";
984       assert(false);
985       return;
986   }
987
988   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
989               << fd_ << ") called in unknown state " << state_;
990 }
991
992 bool AsyncSocket::readable() const {
993   if (fd_ == -1) {
994     return false;
995   }
996   struct pollfd fds[1];
997   fds[0].fd = fd_;
998   fds[0].events = POLLIN;
999   fds[0].revents = 0;
1000   int rc = poll(fds, 1, 0);
1001   return rc == 1;
1002 }
1003
1004 bool AsyncSocket::isPending() const {
1005   return ioHandler_.isPending();
1006 }
1007
1008 bool AsyncSocket::hangup() const {
1009   if (fd_ == -1) {
1010     // sanity check, no one should ask for hangup if we are not connected.
1011     assert(false);
1012     return false;
1013   }
1014 #ifdef POLLRDHUP // Linux-only
1015   struct pollfd fds[1];
1016   fds[0].fd = fd_;
1017   fds[0].events = POLLRDHUP|POLLHUP;
1018   fds[0].revents = 0;
1019   poll(fds, 1, 0);
1020   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1021 #else
1022   return false;
1023 #endif
1024 }
1025
1026 bool AsyncSocket::good() const {
1027   return ((state_ == StateEnum::CONNECTING ||
1028           state_ == StateEnum::ESTABLISHED) &&
1029           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1030 }
1031
1032 bool AsyncSocket::error() const {
1033   return (state_ == StateEnum::ERROR);
1034 }
1035
1036 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1037   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1038           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1039           << ", state=" << state_ << ", events="
1040           << std::hex << eventFlags_ << ")";
1041   assert(eventBase_ == nullptr);
1042   assert(eventBase->isInEventBaseThread());
1043
1044   eventBase_ = eventBase;
1045   ioHandler_.attachEventBase(eventBase);
1046   writeTimeout_.attachEventBase(eventBase);
1047 }
1048
1049 void AsyncSocket::detachEventBase() {
1050   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1051           << ", old evb=" << eventBase_ << ", state=" << state_
1052           << ", events=" << std::hex << eventFlags_ << ")";
1053   assert(eventBase_ != nullptr);
1054   assert(eventBase_->isInEventBaseThread());
1055
1056   eventBase_ = nullptr;
1057   ioHandler_.detachEventBase();
1058   writeTimeout_.detachEventBase();
1059 }
1060
1061 bool AsyncSocket::isDetachable() const {
1062   DCHECK(eventBase_ != nullptr);
1063   DCHECK(eventBase_->isInEventBaseThread());
1064
1065   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1066 }
1067
1068 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1069   address->setFromLocalAddress(fd_);
1070 }
1071
1072 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1073   if (!addr_.isInitialized()) {
1074     addr_.setFromPeerAddress(fd_);
1075   }
1076   *address = addr_;
1077 }
1078
1079 int AsyncSocket::setNoDelay(bool noDelay) {
1080   if (fd_ < 0) {
1081     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1082                << this << "(state=" << state_ << ")";
1083     return EINVAL;
1084
1085   }
1086
1087   int value = noDelay ? 1 : 0;
1088   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1089     int errnoCopy = errno;
1090     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1091             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1092             << strerror(errnoCopy);
1093     return errnoCopy;
1094   }
1095
1096   return 0;
1097 }
1098
1099 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1100
1101   #ifndef TCP_CONGESTION
1102   #define TCP_CONGESTION  13
1103   #endif
1104
1105   if (fd_ < 0) {
1106     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1107                << "socket " << this << "(state=" << state_ << ")";
1108     return EINVAL;
1109
1110   }
1111
1112   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1113         cname.length() + 1) != 0) {
1114     int errnoCopy = errno;
1115     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1116             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1117             << strerror(errnoCopy);
1118     return errnoCopy;
1119   }
1120
1121   return 0;
1122 }
1123
1124 int AsyncSocket::setQuickAck(bool quickack) {
1125   if (fd_ < 0) {
1126     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1127                << this << "(state=" << state_ << ")";
1128     return EINVAL;
1129
1130   }
1131
1132 #ifdef TCP_QUICKACK // Linux-only
1133   int value = quickack ? 1 : 0;
1134   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1135     int errnoCopy = errno;
1136     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1137             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1138             << strerror(errnoCopy);
1139     return errnoCopy;
1140   }
1141
1142   return 0;
1143 #else
1144   return ENOSYS;
1145 #endif
1146 }
1147
1148 int AsyncSocket::setSendBufSize(size_t bufsize) {
1149   if (fd_ < 0) {
1150     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1151                << this << "(state=" << state_ << ")";
1152     return EINVAL;
1153   }
1154
1155   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1156     int errnoCopy = errno;
1157     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1158             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1159             << strerror(errnoCopy);
1160     return errnoCopy;
1161   }
1162
1163   return 0;
1164 }
1165
1166 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1167   if (fd_ < 0) {
1168     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1169                << this << "(state=" << state_ << ")";
1170     return EINVAL;
1171   }
1172
1173   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1174     int errnoCopy = errno;
1175     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1176             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1177             << strerror(errnoCopy);
1178     return errnoCopy;
1179   }
1180
1181   return 0;
1182 }
1183
1184 int AsyncSocket::setTCPProfile(int profd) {
1185   if (fd_ < 0) {
1186     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1187                << this << "(state=" << state_ << ")";
1188     return EINVAL;
1189   }
1190
1191   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1192     int errnoCopy = errno;
1193     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1194             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1195             << strerror(errnoCopy);
1196     return errnoCopy;
1197   }
1198
1199   return 0;
1200 }
1201
1202 void AsyncSocket::setPersistentCork(bool cork) {
1203   if (setCork(cork) == 0) {
1204     persistentCork_ = cork;
1205   }
1206 }
1207
1208 int AsyncSocket::setCork(bool cork) {
1209 #ifdef TCP_CORK
1210   if (fd_ < 0) {
1211     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1212             << this << "(stats=" << state_ << ")";
1213     return EINVAL;
1214   }
1215
1216   if (corked_ == cork) {
1217     return 0;
1218   }
1219
1220   int flag = cork ? 1 : 0;
1221   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1222     int errnoCopy = errno;
1223     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1224             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1225             << folly::errnoStr(errnoCopy);
1226     return errnoCopy;
1227   }
1228   corked_ = cork;
1229 #endif
1230   return 0;
1231 }
1232
1233 void AsyncSocket::ioReady(uint16_t events) noexcept {
1234   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1235           << ", events=" << std::hex << events << ", state=" << state_;
1236   DestructorGuard dg(this);
1237   assert(events & EventHandler::READ_WRITE);
1238   assert(eventBase_->isInEventBaseThread());
1239
1240   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1241   if (relevantEvents == EventHandler::READ) {
1242     handleRead();
1243   } else if (relevantEvents == EventHandler::WRITE) {
1244     handleWrite();
1245   } else if (relevantEvents == EventHandler::READ_WRITE) {
1246     EventBase* originalEventBase = eventBase_;
1247     // If both read and write events are ready, process writes first.
1248     handleWrite();
1249
1250     // Return now if handleWrite() detached us from our EventBase
1251     if (eventBase_ != originalEventBase) {
1252       return;
1253     }
1254
1255     // Only call handleRead() if a read callback is still installed.
1256     // (It's possible that the read callback was uninstalled during
1257     // handleWrite().)
1258     if (readCallback_) {
1259       handleRead();
1260     }
1261   } else {
1262     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1263                << std::hex << events << "(this=" << this << ")";
1264     abort();
1265   }
1266 }
1267
1268 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1269   VLOG(5) << "AsyncSocket::performRead() this=" << this
1270           << ", buf=" << *buf << ", buflen=" << *buflen;
1271
1272   int recvFlags = 0;
1273   if (peek_) {
1274     recvFlags |= MSG_PEEK;
1275   }
1276
1277   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1278   if (bytes < 0) {
1279     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1280       // No more data to read right now.
1281       return READ_BLOCKING;
1282     } else {
1283       return READ_ERROR;
1284     }
1285   } else {
1286     appBytesReceived_ += bytes;
1287     return bytes;
1288   }
1289 }
1290
1291 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1292   // no matter what, buffer should be preapared for non-ssl socket
1293   CHECK(readCallback_);
1294   readCallback_->getReadBuffer(buf, buflen);
1295 }
1296
1297 void AsyncSocket::handleRead() noexcept {
1298   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1299           << ", state=" << state_;
1300   assert(state_ == StateEnum::ESTABLISHED);
1301   assert((shutdownFlags_ & SHUT_READ) == 0);
1302   assert(readCallback_ != nullptr);
1303   assert(eventFlags_ & EventHandler::READ);
1304
1305   // Loop until:
1306   // - a read attempt would block
1307   // - readCallback_ is uninstalled
1308   // - the number of loop iterations exceeds the optional maximum
1309   // - this AsyncSocket is moved to another EventBase
1310   //
1311   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1312   // which is why need to check for it here.
1313   //
1314   // The last bullet point is slightly subtle.  readDataAvailable() may also
1315   // detach this socket from this EventBase.  However, before
1316   // readDataAvailable() returns another thread may pick it up, attach it to
1317   // a different EventBase, and install another readCallback_.  We need to
1318   // exit immediately after readDataAvailable() returns if the eventBase_ has
1319   // changed.  (The caller must perform some sort of locking to transfer the
1320   // AsyncSocket between threads properly.  This will be sufficient to ensure
1321   // that this thread sees the updated eventBase_ variable after
1322   // readDataAvailable() returns.)
1323   uint16_t numReads = 0;
1324   EventBase* originalEventBase = eventBase_;
1325   while (readCallback_ && eventBase_ == originalEventBase) {
1326     // Get the buffer to read into.
1327     void* buf = nullptr;
1328     size_t buflen = 0, offset = 0;
1329     try {
1330       prepareReadBuffer(&buf, &buflen);
1331       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1332     } catch (const AsyncSocketException& ex) {
1333       return failRead(__func__, ex);
1334     } catch (const std::exception& ex) {
1335       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1336                               string("ReadCallback::getReadBuffer() "
1337                                      "threw exception: ") +
1338                               ex.what());
1339       return failRead(__func__, tex);
1340     } catch (...) {
1341       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1342                              "ReadCallback::getReadBuffer() threw "
1343                              "non-exception type");
1344       return failRead(__func__, ex);
1345     }
1346     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1347       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1348                              "ReadCallback::getReadBuffer() returned "
1349                              "empty buffer");
1350       return failRead(__func__, ex);
1351     }
1352
1353     // Perform the read
1354     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1355     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1356             << bytesRead << " bytes";
1357     if (bytesRead > 0) {
1358       if (!isBufferMovable_) {
1359         readCallback_->readDataAvailable(bytesRead);
1360       } else {
1361         CHECK(kOpenSslModeMoveBufferOwnership);
1362         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1363                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1364                 << ", offset=" << offset;
1365         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1366         readBuf->trimStart(offset);
1367         readBuf->trimEnd(buflen - offset - bytesRead);
1368         readCallback_->readBufferAvailable(std::move(readBuf));
1369       }
1370
1371       // Fall through and continue around the loop if the read
1372       // completely filled the available buffer.
1373       // Note that readCallback_ may have been uninstalled or changed inside
1374       // readDataAvailable().
1375       if (size_t(bytesRead) < buflen) {
1376         return;
1377       }
1378     } else if (bytesRead == READ_BLOCKING) {
1379         // No more data to read right now.
1380         return;
1381     } else if (bytesRead == READ_ERROR) {
1382       readErr_ = READ_ERROR;
1383       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1384                              withAddr("recv() failed"), errno);
1385       return failRead(__func__, ex);
1386     } else {
1387       assert(bytesRead == READ_EOF);
1388       readErr_ = READ_EOF;
1389       // EOF
1390       shutdownFlags_ |= SHUT_READ;
1391       if (!updateEventRegistration(0, EventHandler::READ)) {
1392         // we've already been moved into STATE_ERROR
1393         assert(state_ == StateEnum::ERROR);
1394         assert(readCallback_ == nullptr);
1395         return;
1396       }
1397
1398       ReadCallback* callback = readCallback_;
1399       readCallback_ = nullptr;
1400       callback->readEOF();
1401       return;
1402     }
1403     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1404       if (readCallback_ != nullptr) {
1405         // We might still have data in the socket.
1406         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1407         scheduleImmediateRead();
1408       }
1409       return;
1410     }
1411   }
1412 }
1413
1414 /**
1415  * This function attempts to write as much data as possible, until no more data
1416  * can be written.
1417  *
1418  * - If it sends all available data, it unregisters for write events, and stops
1419  *   the writeTimeout_.
1420  *
1421  * - If not all of the data can be sent immediately, it reschedules
1422  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1423  *   registered for write events.
1424  */
1425 void AsyncSocket::handleWrite() noexcept {
1426   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1427           << ", state=" << state_;
1428   if (state_ == StateEnum::CONNECTING) {
1429     handleConnect();
1430     return;
1431   }
1432
1433   // Normal write
1434   assert(state_ == StateEnum::ESTABLISHED);
1435   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1436   assert(writeReqHead_ != nullptr);
1437
1438   // Loop until we run out of write requests,
1439   // or until this socket is moved to another EventBase.
1440   // (See the comment in handleRead() explaining how this can happen.)
1441   EventBase* originalEventBase = eventBase_;
1442   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1443     if (!writeReqHead_->performWrite()) {
1444       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1445                              withAddr("writev() failed"), errno);
1446       return failWrite(__func__, ex);
1447     } else if (writeReqHead_->isComplete()) {
1448       // We finished this request
1449       WriteRequest* req = writeReqHead_;
1450       writeReqHead_ = req->getNext();
1451
1452       if (writeReqHead_ == nullptr) {
1453         writeReqTail_ = nullptr;
1454         // This is the last write request.
1455         // Unregister for write events and cancel the send timer
1456         // before we invoke the callback.  We have to update the state properly
1457         // before calling the callback, since it may want to detach us from
1458         // the EventBase.
1459         if (eventFlags_ & EventHandler::WRITE) {
1460           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1461             assert(state_ == StateEnum::ERROR);
1462             return;
1463           }
1464           // Stop the send timeout
1465           writeTimeout_.cancelTimeout();
1466         }
1467         assert(!writeTimeout_.isScheduled());
1468
1469         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1470         // we finish sending the last write request.
1471         //
1472         // We have to do this before invoking writeSuccess(), since
1473         // writeSuccess() may detach us from our EventBase.
1474         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1475           assert(connectCallback_ == nullptr);
1476           shutdownFlags_ |= SHUT_WRITE;
1477
1478           if (shutdownFlags_ & SHUT_READ) {
1479             // Reads have already been shutdown.  Fully close the socket and
1480             // move to STATE_CLOSED.
1481             //
1482             // Note: This code currently moves us to STATE_CLOSED even if
1483             // close() hasn't ever been called.  This can occur if we have
1484             // received EOF from the peer and shutdownWrite() has been called
1485             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1486             // case, until close() is actually called?  I can't think of a
1487             // reason why we would need to do so.  No other operations besides
1488             // calling close() or destroying the socket can be performed at
1489             // this point.
1490             assert(readCallback_ == nullptr);
1491             state_ = StateEnum::CLOSED;
1492             if (fd_ >= 0) {
1493               ioHandler_.changeHandlerFD(-1);
1494               doClose();
1495             }
1496           } else {
1497             // Reads are still enabled, so we are only doing a half-shutdown
1498             ::shutdown(fd_, SHUT_WR);
1499           }
1500         }
1501       }
1502
1503       // Invoke the callback
1504       WriteCallback* callback = req->getCallback();
1505       req->destroy();
1506       if (callback) {
1507         callback->writeSuccess();
1508       }
1509       // We'll continue around the loop, trying to write another request
1510     } else {
1511       // Partial write.
1512       if (bufferCallback_) {
1513         bufferCallback_->onEgressBuffered();
1514       }
1515       writeReqHead_->consume();
1516       // Stop after a partial write; it's highly likely that a subsequent write
1517       // attempt will just return EAGAIN.
1518       //
1519       // Ensure that we are registered for write events.
1520       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1521         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1522           assert(state_ == StateEnum::ERROR);
1523           return;
1524         }
1525       }
1526
1527       // Reschedule the send timeout, since we have made some write progress.
1528       if (sendTimeout_ > 0) {
1529         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1530           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1531               withAddr("failed to reschedule write timeout"));
1532           return failWrite(__func__, ex);
1533         }
1534       }
1535       return;
1536     }
1537   }
1538   if (!writeReqHead_ && bufferCallback_) {
1539     bufferCallback_->onEgressBufferCleared();
1540   }
1541 }
1542
1543 void AsyncSocket::checkForImmediateRead() noexcept {
1544   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1545   // (However, note that some subclasses do override this method.)
1546   //
1547   // Simply calling handleRead() here would be bad, as this would call
1548   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1549   // buffer even though no data may be available.  This would waste lots of
1550   // memory, since the buffer will sit around unused until the socket actually
1551   // becomes readable.
1552   //
1553   // Checking if the socket is readable now also seems like it would probably
1554   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1555   // would just waste an extra system call.  Even if it is readable, waiting to
1556   // find out from libevent on the next event loop doesn't seem that bad.
1557 }
1558
1559 void AsyncSocket::handleInitialReadWrite() noexcept {
1560   // Our callers should already be holding a DestructorGuard, but grab
1561   // one here just to make sure, in case one of our calling code paths ever
1562   // changes.
1563   DestructorGuard dg(this);
1564
1565   // If we have a readCallback_, make sure we enable read events.  We
1566   // may already be registered for reads if connectSuccess() set
1567   // the read calback.
1568   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1569     assert(state_ == StateEnum::ESTABLISHED);
1570     assert((shutdownFlags_ & SHUT_READ) == 0);
1571     if (!updateEventRegistration(EventHandler::READ, 0)) {
1572       assert(state_ == StateEnum::ERROR);
1573       return;
1574     }
1575     checkForImmediateRead();
1576   } else if (readCallback_ == nullptr) {
1577     // Unregister for read events.
1578     updateEventRegistration(0, EventHandler::READ);
1579   }
1580
1581   // If we have write requests pending, try to send them immediately.
1582   // Since we just finished accepting, there is a very good chance that we can
1583   // write without blocking.
1584   //
1585   // However, we only process them if EventHandler::WRITE is not already set,
1586   // which means that we're already blocked on a write attempt.  (This can
1587   // happen if connectSuccess() called write() before returning.)
1588   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1589     // Call handleWrite() to perform write processing.
1590     handleWrite();
1591   } else if (writeReqHead_ == nullptr) {
1592     // Unregister for write event.
1593     updateEventRegistration(0, EventHandler::WRITE);
1594   }
1595 }
1596
1597 void AsyncSocket::handleConnect() noexcept {
1598   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1599           << ", state=" << state_;
1600   assert(state_ == StateEnum::CONNECTING);
1601   // SHUT_WRITE can never be set while we are still connecting;
1602   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1603   // finishes
1604   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1605
1606   // In case we had a connect timeout, cancel the timeout
1607   writeTimeout_.cancelTimeout();
1608   // We don't use a persistent registration when waiting on a connect event,
1609   // so we have been automatically unregistered now.  Update eventFlags_ to
1610   // reflect reality.
1611   assert(eventFlags_ == EventHandler::WRITE);
1612   eventFlags_ = EventHandler::NONE;
1613
1614   // Call getsockopt() to check if the connect succeeded
1615   int error;
1616   socklen_t len = sizeof(error);
1617   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1618   if (rv != 0) {
1619     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1620                            withAddr("error calling getsockopt() after connect"),
1621                            errno);
1622     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1623                << fd_ << " host=" << addr_.describe()
1624                << ") exception:" << ex.what();
1625     return failConnect(__func__, ex);
1626   }
1627
1628   if (error != 0) {
1629     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1630                            "connect failed", error);
1631     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1632             << fd_ << " host=" << addr_.describe()
1633             << ") exception: " << ex.what();
1634     return failConnect(__func__, ex);
1635   }
1636
1637   // Move into STATE_ESTABLISHED
1638   state_ = StateEnum::ESTABLISHED;
1639
1640   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1641   // perform, immediately shutdown the write half of the socket.
1642   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1643     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1644     // are still connecting we just abort the connect rather than waiting for
1645     // it to complete.
1646     assert((shutdownFlags_ & SHUT_READ) == 0);
1647     ::shutdown(fd_, SHUT_WR);
1648     shutdownFlags_ |= SHUT_WRITE;
1649   }
1650
1651   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1652           << "successfully connected; state=" << state_;
1653
1654   // Remember the EventBase we are attached to, before we start invoking any
1655   // callbacks (since the callbacks may call detachEventBase()).
1656   EventBase* originalEventBase = eventBase_;
1657
1658   invokeConnectSuccess();
1659   // Note that the connect callback may have changed our state.
1660   // (set or unset the read callback, called write(), closed the socket, etc.)
1661   // The following code needs to handle these situations correctly.
1662   //
1663   // If the socket has been closed, readCallback_ and writeReqHead_ will
1664   // always be nullptr, so that will prevent us from trying to read or write.
1665   //
1666   // The main thing to check for is if eventBase_ is still originalEventBase.
1667   // If not, we have been detached from this event base, so we shouldn't
1668   // perform any more operations.
1669   if (eventBase_ != originalEventBase) {
1670     return;
1671   }
1672
1673   handleInitialReadWrite();
1674 }
1675
1676 void AsyncSocket::timeoutExpired() noexcept {
1677   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1678           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1679   DestructorGuard dg(this);
1680   assert(eventBase_->isInEventBaseThread());
1681
1682   if (state_ == StateEnum::CONNECTING) {
1683     // connect() timed out
1684     // Unregister for I/O events.
1685     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1686                            "connect timed out");
1687     failConnect(__func__, ex);
1688   } else {
1689     // a normal write operation timed out
1690     assert(state_ == StateEnum::ESTABLISHED);
1691     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1692     failWrite(__func__, ex);
1693   }
1694 }
1695
1696 ssize_t AsyncSocket::performWrite(const iovec* vec,
1697                                    uint32_t count,
1698                                    WriteFlags flags,
1699                                    uint32_t* countWritten,
1700                                    uint32_t* partialWritten) {
1701   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1702   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1703   // (since it may terminate the program if the main program doesn't explicitly
1704   // ignore it).
1705   struct msghdr msg;
1706   msg.msg_name = nullptr;
1707   msg.msg_namelen = 0;
1708   msg.msg_iov = const_cast<iovec *>(vec);
1709 #ifdef IOV_MAX // not defined on Android
1710   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1711 #else
1712   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1713 #endif
1714   msg.msg_control = nullptr;
1715   msg.msg_controllen = 0;
1716   msg.msg_flags = 0;
1717
1718   int msg_flags = MSG_DONTWAIT;
1719
1720 #ifdef MSG_NOSIGNAL // Linux-only
1721   msg_flags |= MSG_NOSIGNAL;
1722   if (isSet(flags, WriteFlags::CORK)) {
1723     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1724     // give it the rest of the data rather than immediately sending a partial
1725     // frame, even when TCP_NODELAY is enabled.
1726     msg_flags |= MSG_MORE;
1727   }
1728 #endif
1729   if (isSet(flags, WriteFlags::EOR)) {
1730     // marks that this is the last byte of a record (response)
1731     msg_flags |= MSG_EOR;
1732   }
1733   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1734   if (totalWritten < 0) {
1735     if (errno == EAGAIN) {
1736       // TCP buffer is full; we can't write any more data right now.
1737       *countWritten = 0;
1738       *partialWritten = 0;
1739       return 0;
1740     }
1741     // error
1742     *countWritten = 0;
1743     *partialWritten = 0;
1744     return -1;
1745   }
1746
1747   appBytesWritten_ += totalWritten;
1748
1749   uint32_t bytesWritten;
1750   uint32_t n;
1751   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1752     const iovec* v = vec + n;
1753     if (v->iov_len > bytesWritten) {
1754       // Partial write finished in the middle of this iovec
1755       *countWritten = n;
1756       *partialWritten = bytesWritten;
1757       return totalWritten;
1758     }
1759
1760     bytesWritten -= v->iov_len;
1761   }
1762
1763   assert(bytesWritten == 0);
1764   *countWritten = n;
1765   *partialWritten = 0;
1766   return totalWritten;
1767 }
1768
1769 /**
1770  * Re-register the EventHandler after eventFlags_ has changed.
1771  *
1772  * If an error occurs, fail() is called to move the socket into the error state
1773  * and call all currently installed callbacks.  After an error, the
1774  * AsyncSocket is completely unregistered.
1775  *
1776  * @return Returns true on succcess, or false on error.
1777  */
1778 bool AsyncSocket::updateEventRegistration() {
1779   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1780           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1781           << ", events=" << std::hex << eventFlags_;
1782   assert(eventBase_->isInEventBaseThread());
1783   if (eventFlags_ == EventHandler::NONE) {
1784     ioHandler_.unregisterHandler();
1785     return true;
1786   }
1787
1788   // Always register for persistent events, so we don't have to re-register
1789   // after being called back.
1790   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1791     eventFlags_ = EventHandler::NONE; // we're not registered after error
1792     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1793         withAddr("failed to update AsyncSocket event registration"));
1794     fail("updateEventRegistration", ex);
1795     return false;
1796   }
1797
1798   return true;
1799 }
1800
1801 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1802                                            uint16_t disable) {
1803   uint16_t oldFlags = eventFlags_;
1804   eventFlags_ |= enable;
1805   eventFlags_ &= ~disable;
1806   if (eventFlags_ == oldFlags) {
1807     return true;
1808   } else {
1809     return updateEventRegistration();
1810   }
1811 }
1812
1813 void AsyncSocket::startFail() {
1814   // startFail() should only be called once
1815   assert(state_ != StateEnum::ERROR);
1816   assert(getDestructorGuardCount() > 0);
1817   state_ = StateEnum::ERROR;
1818   // Ensure that SHUT_READ and SHUT_WRITE are set,
1819   // so all future attempts to read or write will be rejected
1820   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1821
1822   if (eventFlags_ != EventHandler::NONE) {
1823     eventFlags_ = EventHandler::NONE;
1824     ioHandler_.unregisterHandler();
1825   }
1826   writeTimeout_.cancelTimeout();
1827
1828   if (fd_ >= 0) {
1829     ioHandler_.changeHandlerFD(-1);
1830     doClose();
1831   }
1832 }
1833
1834 void AsyncSocket::finishFail() {
1835   assert(state_ == StateEnum::ERROR);
1836   assert(getDestructorGuardCount() > 0);
1837
1838   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1839                          withAddr("socket closing after error"));
1840   invokeConnectErr(ex);
1841   failAllWrites(ex);
1842
1843   if (readCallback_) {
1844     ReadCallback* callback = readCallback_;
1845     readCallback_ = nullptr;
1846     callback->readErr(ex);
1847   }
1848 }
1849
1850 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1851   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1852              << state_ << " host=" << addr_.describe()
1853              << "): failed in " << fn << "(): "
1854              << ex.what();
1855   startFail();
1856   finishFail();
1857 }
1858
1859 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1860   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1861                << state_ << " host=" << addr_.describe()
1862                << "): failed while connecting in " << fn << "(): "
1863                << ex.what();
1864   startFail();
1865
1866   invokeConnectErr(ex);
1867   finishFail();
1868 }
1869
1870 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1871   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1872                << state_ << " host=" << addr_.describe()
1873                << "): failed while reading in " << fn << "(): "
1874                << ex.what();
1875   startFail();
1876
1877   if (readCallback_ != nullptr) {
1878     ReadCallback* callback = readCallback_;
1879     readCallback_ = nullptr;
1880     callback->readErr(ex);
1881   }
1882
1883   finishFail();
1884 }
1885
1886 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1887   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1888                << state_ << " host=" << addr_.describe()
1889                << "): failed while writing in " << fn << "(): "
1890                << ex.what();
1891   startFail();
1892
1893   // Only invoke the first write callback, since the error occurred while
1894   // writing this request.  Let any other pending write callbacks be invoked in
1895   // finishFail().
1896   if (writeReqHead_ != nullptr) {
1897     WriteRequest* req = writeReqHead_;
1898     writeReqHead_ = req->getNext();
1899     WriteCallback* callback = req->getCallback();
1900     uint32_t bytesWritten = req->getTotalBytesWritten();
1901     req->destroy();
1902     if (callback) {
1903       callback->writeErr(bytesWritten, ex);
1904     }
1905   }
1906
1907   finishFail();
1908 }
1909
1910 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1911                              size_t bytesWritten,
1912                              const AsyncSocketException& ex) {
1913   // This version of failWrite() is used when the failure occurs before
1914   // we've added the callback to writeReqHead_.
1915   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1916              << state_ << " host=" << addr_.describe()
1917              <<"): failed while writing in " << fn << "(): "
1918              << ex.what();
1919   startFail();
1920
1921   if (callback != nullptr) {
1922     callback->writeErr(bytesWritten, ex);
1923   }
1924
1925   finishFail();
1926 }
1927
1928 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1929   // Invoke writeError() on all write callbacks.
1930   // This is used when writes are forcibly shutdown with write requests
1931   // pending, or when an error occurs with writes pending.
1932   while (writeReqHead_ != nullptr) {
1933     WriteRequest* req = writeReqHead_;
1934     writeReqHead_ = req->getNext();
1935     WriteCallback* callback = req->getCallback();
1936     if (callback) {
1937       callback->writeErr(req->getTotalBytesWritten(), ex);
1938     }
1939     req->destroy();
1940   }
1941 }
1942
1943 void AsyncSocket::invalidState(ConnectCallback* callback) {
1944   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1945              << "): connect() called in invalid state " << state_;
1946
1947   /*
1948    * The invalidState() methods don't use the normal failure mechanisms,
1949    * since we don't know what state we are in.  We don't want to call
1950    * startFail()/finishFail() recursively if we are already in the middle of
1951    * cleaning up.
1952    */
1953
1954   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1955                          "connect() called with socket in invalid state");
1956   connectEndTime_ = std::chrono::steady_clock::now();
1957   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1958     if (callback) {
1959       callback->connectErr(ex);
1960     }
1961   } else {
1962     // We can't use failConnect() here since connectCallback_
1963     // may already be set to another callback.  Invoke this ConnectCallback
1964     // here; any other connectCallback_ will be invoked in finishFail()
1965     startFail();
1966     if (callback) {
1967       callback->connectErr(ex);
1968     }
1969     finishFail();
1970   }
1971 }
1972
1973 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1974   connectEndTime_ = std::chrono::steady_clock::now();
1975   if (connectCallback_) {
1976     ConnectCallback* callback = connectCallback_;
1977     connectCallback_ = nullptr;
1978     callback->connectErr(ex);
1979   }
1980 }
1981
1982 void AsyncSocket::invokeConnectSuccess() {
1983   connectEndTime_ = std::chrono::steady_clock::now();
1984   if (connectCallback_) {
1985     ConnectCallback* callback = connectCallback_;
1986     connectCallback_ = nullptr;
1987     callback->connectSuccess();
1988   }
1989 }
1990
1991 void AsyncSocket::invalidState(ReadCallback* callback) {
1992   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1993              << "): setReadCallback(" << callback
1994              << ") called in invalid state " << state_;
1995
1996   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1997                          "setReadCallback() called with socket in "
1998                          "invalid state");
1999   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2000     if (callback) {
2001       callback->readErr(ex);
2002     }
2003   } else {
2004     startFail();
2005     if (callback) {
2006       callback->readErr(ex);
2007     }
2008     finishFail();
2009   }
2010 }
2011
2012 void AsyncSocket::invalidState(WriteCallback* callback) {
2013   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2014              << "): write() called in invalid state " << state_;
2015
2016   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2017                          withAddr("write() called with socket in invalid state"));
2018   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2019     if (callback) {
2020       callback->writeErr(0, ex);
2021     }
2022   } else {
2023     startFail();
2024     if (callback) {
2025       callback->writeErr(0, ex);
2026     }
2027     finishFail();
2028   }
2029 }
2030
2031 void AsyncSocket::doClose() {
2032   if (fd_ == -1) return;
2033   if (shutdownSocketSet_) {
2034     shutdownSocketSet_->close(fd_);
2035   } else {
2036     ::close(fd_);
2037   }
2038   fd_ = -1;
2039 }
2040
2041 std::ostream& operator << (std::ostream& os,
2042                            const AsyncSocket::StateEnum& state) {
2043   os << static_cast<int>(state);
2044   return os;
2045 }
2046
2047 std::string AsyncSocket::withAddr(const std::string& s) {
2048   // Don't use addr_ directly because it may not be initialized
2049   // e.g. if constructed from fd
2050   folly::SocketAddress peer, local;
2051   try {
2052     getPeerAddress(&peer);
2053     getLocalAddress(&local);
2054   } catch (const std::exception&) {
2055     // ignore
2056   } catch (...) {
2057     // ignore
2058   }
2059   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2060 }
2061
2062 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2063   bufferCallback_ = cb;
2064 }
2065
2066 } // folly