Include <folly/portability/Fcntl.h> where needed
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/SysUio.h>
25
26 #include <poll.h>
27 #include <errno.h>
28 #include <limits.h>
29 #include <unistd.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <boost/preprocessor/control/if.hpp>
36
37 using std::string;
38 using std::unique_ptr;
39
40 namespace folly {
41
42 // static members initializers
43 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
44
45 const AsyncSocketException socketClosedLocallyEx(
46     AsyncSocketException::END_OF_FILE, "socket closed locally");
47 const AsyncSocketException socketShutdownForWritesEx(
48     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
49
50 // TODO: It might help performance to provide a version of BytesWriteRequest that
51 // users could derive from, so we can avoid the extra allocation for each call
52 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
53 // protocols are currently templatized for transports.
54 //
55 // We would need the version for external users where they provide the iovec
56 // storage space, and only our internal version would allocate it at the end of
57 // the WriteRequest.
58
59 /* The default WriteRequest implementation, used for write(), writev() and
60  * writeChain()
61  *
62  * A new BytesWriteRequest operation is allocated on the heap for all write
63  * operations that cannot be completed immediately.
64  */
65 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
66  public:
67   static BytesWriteRequest* newRequest(AsyncSocket* socket,
68                                        WriteCallback* callback,
69                                        const iovec* ops,
70                                        uint32_t opCount,
71                                        uint32_t partialWritten,
72                                        uint32_t bytesWritten,
73                                        unique_ptr<IOBuf>&& ioBuf,
74                                        WriteFlags flags) {
75     assert(opCount > 0);
76     // Since we put a variable size iovec array at the end
77     // of each BytesWriteRequest, we have to manually allocate the memory.
78     void* buf = malloc(sizeof(BytesWriteRequest) +
79                        (opCount * sizeof(struct iovec)));
80     if (buf == nullptr) {
81       throw std::bad_alloc();
82     }
83
84     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
85                                       partialWritten, bytesWritten,
86                                       std::move(ioBuf), flags);
87   }
88
89   void destroy() override {
90     this->~BytesWriteRequest();
91     free(this);
92   }
93
94   WriteResult performWrite() override {
95     WriteFlags writeFlags = flags_;
96     if (getNext() != nullptr) {
97       writeFlags = writeFlags | WriteFlags::CORK;
98     }
99     return socket_->performWrite(
100         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     auto errnoCopy = errno;
301     throw AsyncSocketException(
302         AsyncSocketException::INTERNAL_ERROR,
303         withAddr("failed to set close-on-exec flag"),
304         errnoCopy);
305   }
306 }
307
308 void AsyncSocket::connect(ConnectCallback* callback,
309                            const folly::SocketAddress& address,
310                            int timeout,
311                            const OptionMap &options,
312                            const folly::SocketAddress& bindAddr) noexcept {
313   DestructorGuard dg(this);
314   assert(eventBase_->isInEventBaseThread());
315
316   addr_ = address;
317
318   // Make sure we're in the uninitialized state
319   if (state_ != StateEnum::UNINIT) {
320     return invalidState(callback);
321   }
322
323   connectTimeout_ = std::chrono::milliseconds(timeout);
324   connectStartTime_ = std::chrono::steady_clock::now();
325   // Make connect end time at least >= connectStartTime.
326   connectEndTime_ = connectStartTime_;
327
328   assert(fd_ == -1);
329   state_ = StateEnum::CONNECTING;
330   connectCallback_ = callback;
331
332   sockaddr_storage addrStorage;
333   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
334
335   try {
336     // Create the socket
337     // Technically the first parameter should actually be a protocol family
338     // constant (PF_xxx) rather than an address family (AF_xxx), but the
339     // distinction is mainly just historical.  In pretty much all
340     // implementations the PF_foo and AF_foo constants are identical.
341     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
342     if (fd_ < 0) {
343       auto errnoCopy = errno;
344       throw AsyncSocketException(
345           AsyncSocketException::INTERNAL_ERROR,
346           withAddr("failed to create socket"),
347           errnoCopy);
348     }
349     if (shutdownSocketSet_) {
350       shutdownSocketSet_->add(fd_);
351     }
352     ioHandler_.changeHandlerFD(fd_);
353
354     setCloseOnExec();
355
356     // Put the socket in non-blocking mode
357     int flags = fcntl(fd_, F_GETFL, 0);
358     if (flags == -1) {
359       auto errnoCopy = errno;
360       throw AsyncSocketException(
361           AsyncSocketException::INTERNAL_ERROR,
362           withAddr("failed to get socket flags"),
363           errnoCopy);
364     }
365     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
366     if (rv == -1) {
367       auto errnoCopy = errno;
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           withAddr("failed to put socket in non-blocking mode"),
371           errnoCopy);
372     }
373
374 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
375     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
376     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
377     if (rv == -1) {
378       auto errnoCopy = errno;
379       throw AsyncSocketException(
380           AsyncSocketException::INTERNAL_ERROR,
381           "failed to enable F_SETNOSIGPIPE on socket",
382           errnoCopy);
383     }
384 #endif
385
386     // By default, turn on TCP_NODELAY
387     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
388     // setNoDelay() will log an error message if it fails.
389     if (address.getFamily() != AF_UNIX) {
390       (void)setNoDelay(true);
391     }
392
393     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
394             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
395
396     // bind the socket
397     if (bindAddr != anyAddress()) {
398       int one = 1;
399       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
400         auto errnoCopy = errno;
401         doClose();
402         throw AsyncSocketException(
403             AsyncSocketException::NOT_OPEN,
404             "failed to setsockopt prior to bind on " + bindAddr.describe(),
405             errnoCopy);
406       }
407
408       bindAddr.getAddress(&addrStorage);
409
410       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
411         auto errnoCopy = errno;
412         doClose();
413         throw AsyncSocketException(
414             AsyncSocketException::NOT_OPEN,
415             "failed to bind to async socket: " + bindAddr.describe(),
416             errnoCopy);
417       }
418     }
419
420     // Apply the additional options if any.
421     for (const auto& opt: options) {
422       int rv = opt.first.apply(fd_, opt.second);
423       if (rv != 0) {
424         auto errnoCopy = errno;
425         throw AsyncSocketException(
426             AsyncSocketException::INTERNAL_ERROR,
427             withAddr("failed to set socket option"),
428             errnoCopy);
429       }
430     }
431
432     // Perform the connect()
433     address.getAddress(&addrStorage);
434
435     rv = ::connect(fd_, saddr, address.getActualSize());
436     if (rv < 0) {
437       auto errnoCopy = errno;
438       if (errnoCopy == EINPROGRESS) {
439         // Connection in progress.
440         if (timeout > 0) {
441           // Start a timer in case the connection takes too long.
442           if (!writeTimeout_.scheduleTimeout(timeout)) {
443             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
444                 withAddr("failed to schedule AsyncSocket connect timeout"));
445           }
446         }
447
448         // Register for write events, so we'll
449         // be notified when the connection finishes/fails.
450         // Note that we don't register for a persistent event here.
451         assert(eventFlags_ == EventHandler::NONE);
452         eventFlags_ = EventHandler::WRITE;
453         if (!ioHandler_.registerHandler(eventFlags_)) {
454           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
455               withAddr("failed to register AsyncSocket connect handler"));
456         }
457         return;
458       } else {
459         throw AsyncSocketException(
460             AsyncSocketException::NOT_OPEN,
461             "connect failed (immediately)",
462             errnoCopy);
463       }
464     }
465
466     // If we're still here the connect() succeeded immediately.
467     // Fall through to call the callback outside of this try...catch block
468   } catch (const AsyncSocketException& ex) {
469     return failConnect(__func__, ex);
470   } catch (const std::exception& ex) {
471     // shouldn't happen, but handle it just in case
472     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
473                << "): unexpected " << typeid(ex).name() << " exception: "
474                << ex.what();
475     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
476                             withAddr(string("unexpected exception: ") +
477                                      ex.what()));
478     return failConnect(__func__, tex);
479   }
480
481   // The connection succeeded immediately
482   // The read callback may not have been set yet, and no writes may be pending
483   // yet, so we don't have to register for any events at the moment.
484   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
485   assert(readCallback_ == nullptr);
486   assert(writeReqHead_ == nullptr);
487   state_ = StateEnum::ESTABLISHED;
488   invokeConnectSuccess();
489 }
490
491 void AsyncSocket::connect(ConnectCallback* callback,
492                            const string& ip, uint16_t port,
493                            int timeout,
494                            const OptionMap &options) noexcept {
495   DestructorGuard dg(this);
496   try {
497     connectCallback_ = callback;
498     connect(callback, folly::SocketAddress(ip, port), timeout, options);
499   } catch (const std::exception& ex) {
500     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
501                             ex.what());
502     return failConnect(__func__, tex);
503   }
504 }
505
506 void AsyncSocket::cancelConnect() {
507   connectCallback_ = nullptr;
508   if (state_ == StateEnum::CONNECTING) {
509     closeNow();
510   }
511 }
512
513 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
514   sendTimeout_ = milliseconds;
515   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
516
517   // If we are currently pending on write requests, immediately update
518   // writeTimeout_ with the new value.
519   if ((eventFlags_ & EventHandler::WRITE) &&
520       (state_ != StateEnum::CONNECTING)) {
521     assert(state_ == StateEnum::ESTABLISHED);
522     assert((shutdownFlags_ & SHUT_WRITE) == 0);
523     if (sendTimeout_ > 0) {
524       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
525         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
526             withAddr("failed to reschedule send timeout in setSendTimeout"));
527         return failWrite(__func__, ex);
528       }
529     } else {
530       writeTimeout_.cancelTimeout();
531     }
532   }
533 }
534
535 void AsyncSocket::setReadCB(ReadCallback *callback) {
536   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
537           << ", callback=" << callback << ", state=" << state_;
538
539   // Short circuit if callback is the same as the existing readCallback_.
540   //
541   // Note that this is needed for proper functioning during some cleanup cases.
542   // During cleanup we allow setReadCallback(nullptr) to be called even if the
543   // read callback is already unset and we have been detached from an event
544   // base.  This check prevents us from asserting
545   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
546   if (callback == readCallback_) {
547     return;
548   }
549
550   /* We are removing a read callback */
551   if (callback == nullptr &&
552       immediateReadHandler_.isLoopCallbackScheduled()) {
553     immediateReadHandler_.cancelLoopCallback();
554   }
555
556   if (shutdownFlags_ & SHUT_READ) {
557     // Reads have already been shut down on this socket.
558     //
559     // Allow setReadCallback(nullptr) to be called in this case, but don't
560     // allow a new callback to be set.
561     //
562     // For example, setReadCallback(nullptr) can happen after an error if we
563     // invoke some other error callback before invoking readError().  The other
564     // error callback that is invoked first may go ahead and clear the read
565     // callback before we get a chance to invoke readError().
566     if (callback != nullptr) {
567       return invalidState(callback);
568     }
569     assert((eventFlags_ & EventHandler::READ) == 0);
570     readCallback_ = nullptr;
571     return;
572   }
573
574   DestructorGuard dg(this);
575   assert(eventBase_->isInEventBaseThread());
576
577   switch ((StateEnum)state_) {
578     case StateEnum::CONNECTING:
579       // For convenience, we allow the read callback to be set while we are
580       // still connecting.  We just store the callback for now.  Once the
581       // connection completes we'll register for read events.
582       readCallback_ = callback;
583       return;
584     case StateEnum::ESTABLISHED:
585     {
586       readCallback_ = callback;
587       uint16_t oldFlags = eventFlags_;
588       if (readCallback_) {
589         eventFlags_ |= EventHandler::READ;
590       } else {
591         eventFlags_ &= ~EventHandler::READ;
592       }
593
594       // Update our registration if our flags have changed
595       if (eventFlags_ != oldFlags) {
596         // We intentionally ignore the return value here.
597         // updateEventRegistration() will move us into the error state if it
598         // fails, and we don't need to do anything else here afterwards.
599         (void)updateEventRegistration();
600       }
601
602       if (readCallback_) {
603         checkForImmediateRead();
604       }
605       return;
606     }
607     case StateEnum::CLOSED:
608     case StateEnum::ERROR:
609       // We should never reach here.  SHUT_READ should always be set
610       // if we are in STATE_CLOSED or STATE_ERROR.
611       assert(false);
612       return invalidState(callback);
613     case StateEnum::UNINIT:
614       // We do not allow setReadCallback() to be called before we start
615       // connecting.
616       return invalidState(callback);
617   }
618
619   // We don't put a default case in the switch statement, so that the compiler
620   // will warn us to update the switch statement if a new state is added.
621   return invalidState(callback);
622 }
623
624 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
625   return readCallback_;
626 }
627
628 void AsyncSocket::write(WriteCallback* callback,
629                          const void* buf, size_t bytes, WriteFlags flags) {
630   iovec op;
631   op.iov_base = const_cast<void*>(buf);
632   op.iov_len = bytes;
633   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
634 }
635
636 void AsyncSocket::writev(WriteCallback* callback,
637                           const iovec* vec,
638                           size_t count,
639                           WriteFlags flags) {
640   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
641 }
642
643 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
644                               WriteFlags flags) {
645   constexpr size_t kSmallSizeMax = 64;
646   size_t count = buf->countChainElements();
647   if (count <= kSmallSizeMax) {
648     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
649     writeChainImpl(callback, vec, count, std::move(buf), flags);
650   } else {
651     iovec* vec = new iovec[count];
652     writeChainImpl(callback, vec, count, std::move(buf), flags);
653     delete[] vec;
654   }
655 }
656
657 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
658     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
659   size_t veclen = buf->fillIov(vec, count);
660   writeImpl(callback, vec, veclen, std::move(buf), flags);
661 }
662
663 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
664                              size_t count, unique_ptr<IOBuf>&& buf,
665                              WriteFlags flags) {
666   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
667           << ", callback=" << callback << ", count=" << count
668           << ", state=" << state_;
669   DestructorGuard dg(this);
670   unique_ptr<IOBuf>ioBuf(std::move(buf));
671   assert(eventBase_->isInEventBaseThread());
672
673   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
674     // No new writes may be performed after the write side of the socket has
675     // been shutdown.
676     //
677     // We could just call callback->writeError() here to fail just this write.
678     // However, fail hard and use invalidState() to fail all outstanding
679     // callbacks and move the socket into the error state.  There's most likely
680     // a bug in the caller's code, so we abort everything rather than trying to
681     // proceed as best we can.
682     return invalidState(callback);
683   }
684
685   uint32_t countWritten = 0;
686   uint32_t partialWritten = 0;
687   int bytesWritten = 0;
688   bool mustRegister = false;
689   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
690     if (writeReqHead_ == nullptr) {
691       // If we are established and there are no other writes pending,
692       // we can attempt to perform the write immediately.
693       assert(writeReqTail_ == nullptr);
694       assert((eventFlags_ & EventHandler::WRITE) == 0);
695
696       auto writeResult =
697           performWrite(vec, count, flags, &countWritten, &partialWritten);
698       bytesWritten = writeResult.writeReturn;
699       if (bytesWritten < 0) {
700         auto errnoCopy = errno;
701         if (writeResult.exception) {
702           return failWrite(__func__, callback, 0, *writeResult.exception);
703         }
704         AsyncSocketException ex(
705             AsyncSocketException::INTERNAL_ERROR,
706             withAddr("writev failed"),
707             errnoCopy);
708         return failWrite(__func__, callback, 0, ex);
709       } else if (countWritten == count) {
710         // We successfully wrote everything.
711         // Invoke the callback and return.
712         if (callback) {
713           callback->writeSuccess();
714         }
715         return;
716       } else { // continue writing the next writeReq
717         if (bufferCallback_) {
718           bufferCallback_->onEgressBuffered();
719         }
720       }
721       mustRegister = true;
722     }
723   } else if (!connecting()) {
724     // Invalid state for writing
725     return invalidState(callback);
726   }
727
728   // Create a new WriteRequest to add to the queue
729   WriteRequest* req;
730   try {
731     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
732                                         count - countWritten, partialWritten,
733                                         bytesWritten, std::move(ioBuf), flags);
734   } catch (const std::exception& ex) {
735     // we mainly expect to catch std::bad_alloc here
736     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
737         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
738     return failWrite(__func__, callback, bytesWritten, tex);
739   }
740   req->consume();
741   if (writeReqTail_ == nullptr) {
742     assert(writeReqHead_ == nullptr);
743     writeReqHead_ = writeReqTail_ = req;
744   } else {
745     writeReqTail_->append(req);
746     writeReqTail_ = req;
747   }
748
749   // Register for write events if are established and not currently
750   // waiting on write events
751   if (mustRegister) {
752     assert(state_ == StateEnum::ESTABLISHED);
753     assert((eventFlags_ & EventHandler::WRITE) == 0);
754     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
755       assert(state_ == StateEnum::ERROR);
756       return;
757     }
758     if (sendTimeout_ > 0) {
759       // Schedule a timeout to fire if the write takes too long.
760       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
761         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
762                                withAddr("failed to schedule send timeout"));
763         return failWrite(__func__, ex);
764       }
765     }
766   }
767 }
768
769 void AsyncSocket::writeRequest(WriteRequest* req) {
770   if (writeReqTail_ == nullptr) {
771     assert(writeReqHead_ == nullptr);
772     writeReqHead_ = writeReqTail_ = req;
773     req->start();
774   } else {
775     writeReqTail_->append(req);
776     writeReqTail_ = req;
777   }
778 }
779
780 void AsyncSocket::close() {
781   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
782           << ", state=" << state_ << ", shutdownFlags="
783           << std::hex << (int) shutdownFlags_;
784
785   // close() is only different from closeNow() when there are pending writes
786   // that need to drain before we can close.  In all other cases, just call
787   // closeNow().
788   //
789   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
790   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
791   // is still running.  (e.g., If there are multiple pending writes, and we
792   // call writeError() on the first one, it may call close().  In this case we
793   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
794   // writes will still be in the queue.)
795   //
796   // We only need to drain pending writes if we are still in STATE_CONNECTING
797   // or STATE_ESTABLISHED
798   if ((writeReqHead_ == nullptr) ||
799       !(state_ == StateEnum::CONNECTING ||
800       state_ == StateEnum::ESTABLISHED)) {
801     closeNow();
802     return;
803   }
804
805   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
806   // destroyed until close() returns.
807   DestructorGuard dg(this);
808   assert(eventBase_->isInEventBaseThread());
809
810   // Since there are write requests pending, we have to set the
811   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
812   // connect finishes and we finish writing these requests.
813   //
814   // Set SHUT_READ to indicate that reads are shut down, and set the
815   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
816   // pending writes complete.
817   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
818
819   // If a read callback is set, invoke readEOF() immediately to inform it that
820   // the socket has been closed and no more data can be read.
821   if (readCallback_) {
822     // Disable reads if they are enabled
823     if (!updateEventRegistration(0, EventHandler::READ)) {
824       // We're now in the error state; callbacks have been cleaned up
825       assert(state_ == StateEnum::ERROR);
826       assert(readCallback_ == nullptr);
827     } else {
828       ReadCallback* callback = readCallback_;
829       readCallback_ = nullptr;
830       callback->readEOF();
831     }
832   }
833 }
834
835 void AsyncSocket::closeNow() {
836   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
837           << ", state=" << state_ << ", shutdownFlags="
838           << std::hex << (int) shutdownFlags_;
839   DestructorGuard dg(this);
840   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
841
842   switch (state_) {
843     case StateEnum::ESTABLISHED:
844     case StateEnum::CONNECTING:
845     {
846       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
847       state_ = StateEnum::CLOSED;
848
849       // If the write timeout was set, cancel it.
850       writeTimeout_.cancelTimeout();
851
852       // If we are registered for I/O events, unregister.
853       if (eventFlags_ != EventHandler::NONE) {
854         eventFlags_ = EventHandler::NONE;
855         if (!updateEventRegistration()) {
856           // We will have been moved into the error state.
857           assert(state_ == StateEnum::ERROR);
858           return;
859         }
860       }
861
862       if (immediateReadHandler_.isLoopCallbackScheduled()) {
863         immediateReadHandler_.cancelLoopCallback();
864       }
865
866       if (fd_ >= 0) {
867         ioHandler_.changeHandlerFD(-1);
868         doClose();
869       }
870
871       invokeConnectErr(socketClosedLocallyEx);
872
873       failAllWrites(socketClosedLocallyEx);
874
875       if (readCallback_) {
876         ReadCallback* callback = readCallback_;
877         readCallback_ = nullptr;
878         callback->readEOF();
879       }
880       return;
881     }
882     case StateEnum::CLOSED:
883       // Do nothing.  It's possible that we are being called recursively
884       // from inside a callback that we invoked inside another call to close()
885       // that is still running.
886       return;
887     case StateEnum::ERROR:
888       // Do nothing.  The error handling code has performed (or is performing)
889       // cleanup.
890       return;
891     case StateEnum::UNINIT:
892       assert(eventFlags_ == EventHandler::NONE);
893       assert(connectCallback_ == nullptr);
894       assert(readCallback_ == nullptr);
895       assert(writeReqHead_ == nullptr);
896       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
897       state_ = StateEnum::CLOSED;
898       return;
899   }
900
901   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
902               << ") called in unknown state " << state_;
903 }
904
905 void AsyncSocket::closeWithReset() {
906   // Enable SO_LINGER, with the linger timeout set to 0.
907   // This will trigger a TCP reset when we close the socket.
908   if (fd_ >= 0) {
909     struct linger optLinger = {1, 0};
910     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
911       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
912               << "on " << fd_ << ": errno=" << errno;
913     }
914   }
915
916   // Then let closeNow() take care of the rest
917   closeNow();
918 }
919
920 void AsyncSocket::shutdownWrite() {
921   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
922           << ", state=" << state_ << ", shutdownFlags="
923           << std::hex << (int) shutdownFlags_;
924
925   // If there are no pending writes, shutdownWrite() is identical to
926   // shutdownWriteNow().
927   if (writeReqHead_ == nullptr) {
928     shutdownWriteNow();
929     return;
930   }
931
932   assert(eventBase_->isInEventBaseThread());
933
934   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
935   // shutdown will be performed once all writes complete.
936   shutdownFlags_ |= SHUT_WRITE_PENDING;
937 }
938
939 void AsyncSocket::shutdownWriteNow() {
940   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
941           << ", fd=" << fd_ << ", state=" << state_
942           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
943
944   if (shutdownFlags_ & SHUT_WRITE) {
945     // Writes are already shutdown; nothing else to do.
946     return;
947   }
948
949   // If SHUT_READ is already set, just call closeNow() to completely
950   // close the socket.  This can happen if close() was called with writes
951   // pending, and then shutdownWriteNow() is called before all pending writes
952   // complete.
953   if (shutdownFlags_ & SHUT_READ) {
954     closeNow();
955     return;
956   }
957
958   DestructorGuard dg(this);
959   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
960
961   switch (static_cast<StateEnum>(state_)) {
962     case StateEnum::ESTABLISHED:
963     {
964       shutdownFlags_ |= SHUT_WRITE;
965
966       // If the write timeout was set, cancel it.
967       writeTimeout_.cancelTimeout();
968
969       // If we are registered for write events, unregister.
970       if (!updateEventRegistration(0, EventHandler::WRITE)) {
971         // We will have been moved into the error state.
972         assert(state_ == StateEnum::ERROR);
973         return;
974       }
975
976       // Shutdown writes on the file descriptor
977       ::shutdown(fd_, SHUT_WR);
978
979       // Immediately fail all write requests
980       failAllWrites(socketShutdownForWritesEx);
981       return;
982     }
983     case StateEnum::CONNECTING:
984     {
985       // Set the SHUT_WRITE_PENDING flag.
986       // When the connection completes, it will check this flag,
987       // shutdown the write half of the socket, and then set SHUT_WRITE.
988       shutdownFlags_ |= SHUT_WRITE_PENDING;
989
990       // Immediately fail all write requests
991       failAllWrites(socketShutdownForWritesEx);
992       return;
993     }
994     case StateEnum::UNINIT:
995       // Callers normally shouldn't call shutdownWriteNow() before the socket
996       // even starts connecting.  Nonetheless, go ahead and set
997       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
998       // immediately shut down the write side of the socket.
999       shutdownFlags_ |= SHUT_WRITE_PENDING;
1000       return;
1001     case StateEnum::CLOSED:
1002     case StateEnum::ERROR:
1003       // We should never get here.  SHUT_WRITE should always be set
1004       // in STATE_CLOSED and STATE_ERROR.
1005       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1006                  << ", fd=" << fd_ << ") in unexpected state " << state_
1007                  << " with SHUT_WRITE not set ("
1008                  << std::hex << (int) shutdownFlags_ << ")";
1009       assert(false);
1010       return;
1011   }
1012
1013   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1014               << fd_ << ") called in unknown state " << state_;
1015 }
1016
1017 bool AsyncSocket::readable() const {
1018   if (fd_ == -1) {
1019     return false;
1020   }
1021   struct pollfd fds[1];
1022   fds[0].fd = fd_;
1023   fds[0].events = POLLIN;
1024   fds[0].revents = 0;
1025   int rc = poll(fds, 1, 0);
1026   return rc == 1;
1027 }
1028
1029 bool AsyncSocket::isPending() const {
1030   return ioHandler_.isPending();
1031 }
1032
1033 bool AsyncSocket::hangup() const {
1034   if (fd_ == -1) {
1035     // sanity check, no one should ask for hangup if we are not connected.
1036     assert(false);
1037     return false;
1038   }
1039 #ifdef POLLRDHUP // Linux-only
1040   struct pollfd fds[1];
1041   fds[0].fd = fd_;
1042   fds[0].events = POLLRDHUP|POLLHUP;
1043   fds[0].revents = 0;
1044   poll(fds, 1, 0);
1045   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1046 #else
1047   return false;
1048 #endif
1049 }
1050
1051 bool AsyncSocket::good() const {
1052   return ((state_ == StateEnum::CONNECTING ||
1053           state_ == StateEnum::ESTABLISHED) &&
1054           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1055 }
1056
1057 bool AsyncSocket::error() const {
1058   return (state_ == StateEnum::ERROR);
1059 }
1060
1061 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1062   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1063           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1064           << ", state=" << state_ << ", events="
1065           << std::hex << eventFlags_ << ")";
1066   assert(eventBase_ == nullptr);
1067   assert(eventBase->isInEventBaseThread());
1068
1069   eventBase_ = eventBase;
1070   ioHandler_.attachEventBase(eventBase);
1071   writeTimeout_.attachEventBase(eventBase);
1072 }
1073
1074 void AsyncSocket::detachEventBase() {
1075   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1076           << ", old evb=" << eventBase_ << ", state=" << state_
1077           << ", events=" << std::hex << eventFlags_ << ")";
1078   assert(eventBase_ != nullptr);
1079   assert(eventBase_->isInEventBaseThread());
1080
1081   eventBase_ = nullptr;
1082   ioHandler_.detachEventBase();
1083   writeTimeout_.detachEventBase();
1084 }
1085
1086 bool AsyncSocket::isDetachable() const {
1087   DCHECK(eventBase_ != nullptr);
1088   DCHECK(eventBase_->isInEventBaseThread());
1089
1090   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1091 }
1092
1093 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1094   if (!localAddr_.isInitialized()) {
1095     localAddr_.setFromLocalAddress(fd_);
1096   }
1097   *address = localAddr_;
1098 }
1099
1100 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1101   if (!addr_.isInitialized()) {
1102     addr_.setFromPeerAddress(fd_);
1103   }
1104   *address = addr_;
1105 }
1106
1107 int AsyncSocket::setNoDelay(bool noDelay) {
1108   if (fd_ < 0) {
1109     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1110                << this << "(state=" << state_ << ")";
1111     return EINVAL;
1112
1113   }
1114
1115   int value = noDelay ? 1 : 0;
1116   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1117     int errnoCopy = errno;
1118     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1119             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1120             << strerror(errnoCopy);
1121     return errnoCopy;
1122   }
1123
1124   return 0;
1125 }
1126
1127 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1128
1129   #ifndef TCP_CONGESTION
1130   #define TCP_CONGESTION  13
1131   #endif
1132
1133   if (fd_ < 0) {
1134     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1135                << "socket " << this << "(state=" << state_ << ")";
1136     return EINVAL;
1137
1138   }
1139
1140   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1141         cname.length() + 1) != 0) {
1142     int errnoCopy = errno;
1143     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1144             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1145             << strerror(errnoCopy);
1146     return errnoCopy;
1147   }
1148
1149   return 0;
1150 }
1151
1152 int AsyncSocket::setQuickAck(bool quickack) {
1153   if (fd_ < 0) {
1154     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1155                << this << "(state=" << state_ << ")";
1156     return EINVAL;
1157
1158   }
1159
1160 #ifdef TCP_QUICKACK // Linux-only
1161   int value = quickack ? 1 : 0;
1162   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1163     int errnoCopy = errno;
1164     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1165             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1166             << strerror(errnoCopy);
1167     return errnoCopy;
1168   }
1169
1170   return 0;
1171 #else
1172   return ENOSYS;
1173 #endif
1174 }
1175
1176 int AsyncSocket::setSendBufSize(size_t bufsize) {
1177   if (fd_ < 0) {
1178     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1179                << this << "(state=" << state_ << ")";
1180     return EINVAL;
1181   }
1182
1183   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1184     int errnoCopy = errno;
1185     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1186             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1187             << strerror(errnoCopy);
1188     return errnoCopy;
1189   }
1190
1191   return 0;
1192 }
1193
1194 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1195   if (fd_ < 0) {
1196     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1197                << this << "(state=" << state_ << ")";
1198     return EINVAL;
1199   }
1200
1201   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1202     int errnoCopy = errno;
1203     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1204             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1205             << strerror(errnoCopy);
1206     return errnoCopy;
1207   }
1208
1209   return 0;
1210 }
1211
1212 int AsyncSocket::setTCPProfile(int profd) {
1213   if (fd_ < 0) {
1214     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1215                << this << "(state=" << state_ << ")";
1216     return EINVAL;
1217   }
1218
1219   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1220     int errnoCopy = errno;
1221     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1222             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1223             << strerror(errnoCopy);
1224     return errnoCopy;
1225   }
1226
1227   return 0;
1228 }
1229
1230 void AsyncSocket::ioReady(uint16_t events) noexcept {
1231   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1232           << ", events=" << std::hex << events << ", state=" << state_;
1233   DestructorGuard dg(this);
1234   assert(events & EventHandler::READ_WRITE);
1235   assert(eventBase_->isInEventBaseThread());
1236
1237   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1238   if (relevantEvents == EventHandler::READ) {
1239     handleRead();
1240   } else if (relevantEvents == EventHandler::WRITE) {
1241     handleWrite();
1242   } else if (relevantEvents == EventHandler::READ_WRITE) {
1243     EventBase* originalEventBase = eventBase_;
1244     // If both read and write events are ready, process writes first.
1245     handleWrite();
1246
1247     // Return now if handleWrite() detached us from our EventBase
1248     if (eventBase_ != originalEventBase) {
1249       return;
1250     }
1251
1252     // Only call handleRead() if a read callback is still installed.
1253     // (It's possible that the read callback was uninstalled during
1254     // handleWrite().)
1255     if (readCallback_) {
1256       handleRead();
1257     }
1258   } else {
1259     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1260                << std::hex << events << "(this=" << this << ")";
1261     abort();
1262   }
1263 }
1264
1265 AsyncSocket::ReadResult
1266 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1267   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1268           << ", buflen=" << *buflen;
1269
1270   int recvFlags = 0;
1271   if (peek_) {
1272     recvFlags |= MSG_PEEK;
1273   }
1274
1275   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1276   if (bytes < 0) {
1277     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1278       // No more data to read right now.
1279       return ReadResult(READ_BLOCKING);
1280     } else {
1281       return ReadResult(READ_ERROR);
1282     }
1283   } else {
1284     appBytesReceived_ += bytes;
1285     return ReadResult(bytes);
1286   }
1287 }
1288
1289 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1290   // no matter what, buffer should be preapared for non-ssl socket
1291   CHECK(readCallback_);
1292   readCallback_->getReadBuffer(buf, buflen);
1293 }
1294
1295 void AsyncSocket::handleRead() noexcept {
1296   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1297           << ", state=" << state_;
1298   assert(state_ == StateEnum::ESTABLISHED);
1299   assert((shutdownFlags_ & SHUT_READ) == 0);
1300   assert(readCallback_ != nullptr);
1301   assert(eventFlags_ & EventHandler::READ);
1302
1303   // Loop until:
1304   // - a read attempt would block
1305   // - readCallback_ is uninstalled
1306   // - the number of loop iterations exceeds the optional maximum
1307   // - this AsyncSocket is moved to another EventBase
1308   //
1309   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1310   // which is why need to check for it here.
1311   //
1312   // The last bullet point is slightly subtle.  readDataAvailable() may also
1313   // detach this socket from this EventBase.  However, before
1314   // readDataAvailable() returns another thread may pick it up, attach it to
1315   // a different EventBase, and install another readCallback_.  We need to
1316   // exit immediately after readDataAvailable() returns if the eventBase_ has
1317   // changed.  (The caller must perform some sort of locking to transfer the
1318   // AsyncSocket between threads properly.  This will be sufficient to ensure
1319   // that this thread sees the updated eventBase_ variable after
1320   // readDataAvailable() returns.)
1321   uint16_t numReads = 0;
1322   EventBase* originalEventBase = eventBase_;
1323   while (readCallback_ && eventBase_ == originalEventBase) {
1324     // Get the buffer to read into.
1325     void* buf = nullptr;
1326     size_t buflen = 0, offset = 0;
1327     try {
1328       prepareReadBuffer(&buf, &buflen);
1329       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1330     } catch (const AsyncSocketException& ex) {
1331       return failRead(__func__, ex);
1332     } catch (const std::exception& ex) {
1333       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1334                               string("ReadCallback::getReadBuffer() "
1335                                      "threw exception: ") +
1336                               ex.what());
1337       return failRead(__func__, tex);
1338     } catch (...) {
1339       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1340                              "ReadCallback::getReadBuffer() threw "
1341                              "non-exception type");
1342       return failRead(__func__, ex);
1343     }
1344     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1345       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1346                              "ReadCallback::getReadBuffer() returned "
1347                              "empty buffer");
1348       return failRead(__func__, ex);
1349     }
1350
1351     // Perform the read
1352     auto readResult = performRead(&buf, &buflen, &offset);
1353     auto bytesRead = readResult.readReturn;
1354     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1355             << bytesRead << " bytes";
1356     if (bytesRead > 0) {
1357       if (!isBufferMovable_) {
1358         readCallback_->readDataAvailable(bytesRead);
1359       } else {
1360         CHECK(kOpenSslModeMoveBufferOwnership);
1361         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1362                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1363                 << ", offset=" << offset;
1364         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1365         readBuf->trimStart(offset);
1366         readBuf->trimEnd(buflen - offset - bytesRead);
1367         readCallback_->readBufferAvailable(std::move(readBuf));
1368       }
1369
1370       // Fall through and continue around the loop if the read
1371       // completely filled the available buffer.
1372       // Note that readCallback_ may have been uninstalled or changed inside
1373       // readDataAvailable().
1374       if (size_t(bytesRead) < buflen) {
1375         return;
1376       }
1377     } else if (bytesRead == READ_BLOCKING) {
1378         // No more data to read right now.
1379         return;
1380     } else if (bytesRead == READ_ERROR) {
1381       readErr_ = READ_ERROR;
1382       if (readResult.exception) {
1383         return failRead(__func__, *readResult.exception);
1384       }
1385       auto errnoCopy = errno;
1386       AsyncSocketException ex(
1387           AsyncSocketException::INTERNAL_ERROR,
1388           withAddr("recv() failed"),
1389           errnoCopy);
1390       return failRead(__func__, ex);
1391     } else {
1392       assert(bytesRead == READ_EOF);
1393       readErr_ = READ_EOF;
1394       // EOF
1395       shutdownFlags_ |= SHUT_READ;
1396       if (!updateEventRegistration(0, EventHandler::READ)) {
1397         // we've already been moved into STATE_ERROR
1398         assert(state_ == StateEnum::ERROR);
1399         assert(readCallback_ == nullptr);
1400         return;
1401       }
1402
1403       ReadCallback* callback = readCallback_;
1404       readCallback_ = nullptr;
1405       callback->readEOF();
1406       return;
1407     }
1408     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1409       if (readCallback_ != nullptr) {
1410         // We might still have data in the socket.
1411         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1412         scheduleImmediateRead();
1413       }
1414       return;
1415     }
1416   }
1417 }
1418
1419 /**
1420  * This function attempts to write as much data as possible, until no more data
1421  * can be written.
1422  *
1423  * - If it sends all available data, it unregisters for write events, and stops
1424  *   the writeTimeout_.
1425  *
1426  * - If not all of the data can be sent immediately, it reschedules
1427  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1428  *   registered for write events.
1429  */
1430 void AsyncSocket::handleWrite() noexcept {
1431   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1432           << ", state=" << state_;
1433   DestructorGuard dg(this);
1434
1435   if (state_ == StateEnum::CONNECTING) {
1436     handleConnect();
1437     return;
1438   }
1439
1440   // Normal write
1441   assert(state_ == StateEnum::ESTABLISHED);
1442   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1443   assert(writeReqHead_ != nullptr);
1444
1445   // Loop until we run out of write requests,
1446   // or until this socket is moved to another EventBase.
1447   // (See the comment in handleRead() explaining how this can happen.)
1448   EventBase* originalEventBase = eventBase_;
1449   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1450     auto writeResult = writeReqHead_->performWrite();
1451     if (writeResult.writeReturn < 0) {
1452       if (writeResult.exception) {
1453         return failWrite(__func__, *writeResult.exception);
1454       }
1455       auto errnoCopy = errno;
1456       AsyncSocketException ex(
1457           AsyncSocketException::INTERNAL_ERROR,
1458           withAddr("writev() failed"),
1459           errnoCopy);
1460       return failWrite(__func__, ex);
1461     } else if (writeReqHead_->isComplete()) {
1462       // We finished this request
1463       WriteRequest* req = writeReqHead_;
1464       writeReqHead_ = req->getNext();
1465
1466       if (writeReqHead_ == nullptr) {
1467         writeReqTail_ = nullptr;
1468         // This is the last write request.
1469         // Unregister for write events and cancel the send timer
1470         // before we invoke the callback.  We have to update the state properly
1471         // before calling the callback, since it may want to detach us from
1472         // the EventBase.
1473         if (eventFlags_ & EventHandler::WRITE) {
1474           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1475             assert(state_ == StateEnum::ERROR);
1476             return;
1477           }
1478           // Stop the send timeout
1479           writeTimeout_.cancelTimeout();
1480         }
1481         assert(!writeTimeout_.isScheduled());
1482
1483         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1484         // we finish sending the last write request.
1485         //
1486         // We have to do this before invoking writeSuccess(), since
1487         // writeSuccess() may detach us from our EventBase.
1488         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1489           assert(connectCallback_ == nullptr);
1490           shutdownFlags_ |= SHUT_WRITE;
1491
1492           if (shutdownFlags_ & SHUT_READ) {
1493             // Reads have already been shutdown.  Fully close the socket and
1494             // move to STATE_CLOSED.
1495             //
1496             // Note: This code currently moves us to STATE_CLOSED even if
1497             // close() hasn't ever been called.  This can occur if we have
1498             // received EOF from the peer and shutdownWrite() has been called
1499             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1500             // case, until close() is actually called?  I can't think of a
1501             // reason why we would need to do so.  No other operations besides
1502             // calling close() or destroying the socket can be performed at
1503             // this point.
1504             assert(readCallback_ == nullptr);
1505             state_ = StateEnum::CLOSED;
1506             if (fd_ >= 0) {
1507               ioHandler_.changeHandlerFD(-1);
1508               doClose();
1509             }
1510           } else {
1511             // Reads are still enabled, so we are only doing a half-shutdown
1512             ::shutdown(fd_, SHUT_WR);
1513           }
1514         }
1515       }
1516
1517       // Invoke the callback
1518       WriteCallback* callback = req->getCallback();
1519       req->destroy();
1520       if (callback) {
1521         callback->writeSuccess();
1522       }
1523       // We'll continue around the loop, trying to write another request
1524     } else {
1525       // Partial write.
1526       if (bufferCallback_) {
1527         bufferCallback_->onEgressBuffered();
1528       }
1529       writeReqHead_->consume();
1530       // Stop after a partial write; it's highly likely that a subsequent write
1531       // attempt will just return EAGAIN.
1532       //
1533       // Ensure that we are registered for write events.
1534       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1535         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1536           assert(state_ == StateEnum::ERROR);
1537           return;
1538         }
1539       }
1540
1541       // Reschedule the send timeout, since we have made some write progress.
1542       if (sendTimeout_ > 0) {
1543         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1544           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1545               withAddr("failed to reschedule write timeout"));
1546           return failWrite(__func__, ex);
1547         }
1548       }
1549       return;
1550     }
1551   }
1552   if (!writeReqHead_ && bufferCallback_) {
1553     bufferCallback_->onEgressBufferCleared();
1554   }
1555 }
1556
1557 void AsyncSocket::checkForImmediateRead() noexcept {
1558   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1559   // (However, note that some subclasses do override this method.)
1560   //
1561   // Simply calling handleRead() here would be bad, as this would call
1562   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1563   // buffer even though no data may be available.  This would waste lots of
1564   // memory, since the buffer will sit around unused until the socket actually
1565   // becomes readable.
1566   //
1567   // Checking if the socket is readable now also seems like it would probably
1568   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1569   // would just waste an extra system call.  Even if it is readable, waiting to
1570   // find out from libevent on the next event loop doesn't seem that bad.
1571 }
1572
1573 void AsyncSocket::handleInitialReadWrite() noexcept {
1574   // Our callers should already be holding a DestructorGuard, but grab
1575   // one here just to make sure, in case one of our calling code paths ever
1576   // changes.
1577   DestructorGuard dg(this);
1578
1579   // If we have a readCallback_, make sure we enable read events.  We
1580   // may already be registered for reads if connectSuccess() set
1581   // the read calback.
1582   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1583     assert(state_ == StateEnum::ESTABLISHED);
1584     assert((shutdownFlags_ & SHUT_READ) == 0);
1585     if (!updateEventRegistration(EventHandler::READ, 0)) {
1586       assert(state_ == StateEnum::ERROR);
1587       return;
1588     }
1589     checkForImmediateRead();
1590   } else if (readCallback_ == nullptr) {
1591     // Unregister for read events.
1592     updateEventRegistration(0, EventHandler::READ);
1593   }
1594
1595   // If we have write requests pending, try to send them immediately.
1596   // Since we just finished accepting, there is a very good chance that we can
1597   // write without blocking.
1598   //
1599   // However, we only process them if EventHandler::WRITE is not already set,
1600   // which means that we're already blocked on a write attempt.  (This can
1601   // happen if connectSuccess() called write() before returning.)
1602   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1603     // Call handleWrite() to perform write processing.
1604     handleWrite();
1605   } else if (writeReqHead_ == nullptr) {
1606     // Unregister for write event.
1607     updateEventRegistration(0, EventHandler::WRITE);
1608   }
1609 }
1610
1611 void AsyncSocket::handleConnect() noexcept {
1612   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1613           << ", state=" << state_;
1614   assert(state_ == StateEnum::CONNECTING);
1615   // SHUT_WRITE can never be set while we are still connecting;
1616   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1617   // finishes
1618   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1619
1620   // In case we had a connect timeout, cancel the timeout
1621   writeTimeout_.cancelTimeout();
1622   // We don't use a persistent registration when waiting on a connect event,
1623   // so we have been automatically unregistered now.  Update eventFlags_ to
1624   // reflect reality.
1625   assert(eventFlags_ == EventHandler::WRITE);
1626   eventFlags_ = EventHandler::NONE;
1627
1628   // Call getsockopt() to check if the connect succeeded
1629   int error;
1630   socklen_t len = sizeof(error);
1631   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1632   if (rv != 0) {
1633     auto errnoCopy = errno;
1634     AsyncSocketException ex(
1635         AsyncSocketException::INTERNAL_ERROR,
1636         withAddr("error calling getsockopt() after connect"),
1637         errnoCopy);
1638     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1639                << fd_ << " host=" << addr_.describe()
1640                << ") exception:" << ex.what();
1641     return failConnect(__func__, ex);
1642   }
1643
1644   if (error != 0) {
1645     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1646                            "connect failed", error);
1647     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1648             << fd_ << " host=" << addr_.describe()
1649             << ") exception: " << ex.what();
1650     return failConnect(__func__, ex);
1651   }
1652
1653   // Move into STATE_ESTABLISHED
1654   state_ = StateEnum::ESTABLISHED;
1655
1656   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1657   // perform, immediately shutdown the write half of the socket.
1658   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1659     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1660     // are still connecting we just abort the connect rather than waiting for
1661     // it to complete.
1662     assert((shutdownFlags_ & SHUT_READ) == 0);
1663     ::shutdown(fd_, SHUT_WR);
1664     shutdownFlags_ |= SHUT_WRITE;
1665   }
1666
1667   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1668           << "successfully connected; state=" << state_;
1669
1670   // Remember the EventBase we are attached to, before we start invoking any
1671   // callbacks (since the callbacks may call detachEventBase()).
1672   EventBase* originalEventBase = eventBase_;
1673
1674   invokeConnectSuccess();
1675   // Note that the connect callback may have changed our state.
1676   // (set or unset the read callback, called write(), closed the socket, etc.)
1677   // The following code needs to handle these situations correctly.
1678   //
1679   // If the socket has been closed, readCallback_ and writeReqHead_ will
1680   // always be nullptr, so that will prevent us from trying to read or write.
1681   //
1682   // The main thing to check for is if eventBase_ is still originalEventBase.
1683   // If not, we have been detached from this event base, so we shouldn't
1684   // perform any more operations.
1685   if (eventBase_ != originalEventBase) {
1686     return;
1687   }
1688
1689   handleInitialReadWrite();
1690 }
1691
1692 void AsyncSocket::timeoutExpired() noexcept {
1693   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1694           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1695   DestructorGuard dg(this);
1696   assert(eventBase_->isInEventBaseThread());
1697
1698   if (state_ == StateEnum::CONNECTING) {
1699     // connect() timed out
1700     // Unregister for I/O events.
1701     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1702                            "connect timed out");
1703     failConnect(__func__, ex);
1704   } else {
1705     // a normal write operation timed out
1706     assert(state_ == StateEnum::ESTABLISHED);
1707     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1708     failWrite(__func__, ex);
1709   }
1710 }
1711
1712 AsyncSocket::WriteResult AsyncSocket::performWrite(
1713     const iovec* vec,
1714     uint32_t count,
1715     WriteFlags flags,
1716     uint32_t* countWritten,
1717     uint32_t* partialWritten) {
1718   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1719   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1720   // (since it may terminate the program if the main program doesn't explicitly
1721   // ignore it).
1722   struct msghdr msg;
1723   msg.msg_name = nullptr;
1724   msg.msg_namelen = 0;
1725   msg.msg_iov = const_cast<iovec *>(vec);
1726   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1727   msg.msg_control = nullptr;
1728   msg.msg_controllen = 0;
1729   msg.msg_flags = 0;
1730
1731   int msg_flags = MSG_DONTWAIT;
1732
1733 #ifdef MSG_NOSIGNAL // Linux-only
1734   msg_flags |= MSG_NOSIGNAL;
1735   if (isSet(flags, WriteFlags::CORK)) {
1736     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1737     // give it the rest of the data rather than immediately sending a partial
1738     // frame, even when TCP_NODELAY is enabled.
1739     msg_flags |= MSG_MORE;
1740   }
1741 #endif
1742   if (isSet(flags, WriteFlags::EOR)) {
1743     // marks that this is the last byte of a record (response)
1744     msg_flags |= MSG_EOR;
1745   }
1746   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1747   if (totalWritten < 0) {
1748     if (errno == EAGAIN) {
1749       // TCP buffer is full; we can't write any more data right now.
1750       *countWritten = 0;
1751       *partialWritten = 0;
1752       return WriteResult(0);
1753     }
1754     // error
1755     *countWritten = 0;
1756     *partialWritten = 0;
1757     return WriteResult(WRITE_ERROR);
1758   }
1759
1760   appBytesWritten_ += totalWritten;
1761
1762   uint32_t bytesWritten;
1763   uint32_t n;
1764   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1765     const iovec* v = vec + n;
1766     if (v->iov_len > bytesWritten) {
1767       // Partial write finished in the middle of this iovec
1768       *countWritten = n;
1769       *partialWritten = bytesWritten;
1770       return WriteResult(totalWritten);
1771     }
1772
1773     bytesWritten -= v->iov_len;
1774   }
1775
1776   assert(bytesWritten == 0);
1777   *countWritten = n;
1778   *partialWritten = 0;
1779   return WriteResult(totalWritten);
1780 }
1781
1782 /**
1783  * Re-register the EventHandler after eventFlags_ has changed.
1784  *
1785  * If an error occurs, fail() is called to move the socket into the error state
1786  * and call all currently installed callbacks.  After an error, the
1787  * AsyncSocket is completely unregistered.
1788  *
1789  * @return Returns true on succcess, or false on error.
1790  */
1791 bool AsyncSocket::updateEventRegistration() {
1792   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1793           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1794           << ", events=" << std::hex << eventFlags_;
1795   assert(eventBase_->isInEventBaseThread());
1796   if (eventFlags_ == EventHandler::NONE) {
1797     ioHandler_.unregisterHandler();
1798     return true;
1799   }
1800
1801   // Always register for persistent events, so we don't have to re-register
1802   // after being called back.
1803   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1804     eventFlags_ = EventHandler::NONE; // we're not registered after error
1805     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1806         withAddr("failed to update AsyncSocket event registration"));
1807     fail("updateEventRegistration", ex);
1808     return false;
1809   }
1810
1811   return true;
1812 }
1813
1814 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1815                                            uint16_t disable) {
1816   uint16_t oldFlags = eventFlags_;
1817   eventFlags_ |= enable;
1818   eventFlags_ &= ~disable;
1819   if (eventFlags_ == oldFlags) {
1820     return true;
1821   } else {
1822     return updateEventRegistration();
1823   }
1824 }
1825
1826 void AsyncSocket::startFail() {
1827   // startFail() should only be called once
1828   assert(state_ != StateEnum::ERROR);
1829   assert(getDestructorGuardCount() > 0);
1830   state_ = StateEnum::ERROR;
1831   // Ensure that SHUT_READ and SHUT_WRITE are set,
1832   // so all future attempts to read or write will be rejected
1833   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1834
1835   if (eventFlags_ != EventHandler::NONE) {
1836     eventFlags_ = EventHandler::NONE;
1837     ioHandler_.unregisterHandler();
1838   }
1839   writeTimeout_.cancelTimeout();
1840
1841   if (fd_ >= 0) {
1842     ioHandler_.changeHandlerFD(-1);
1843     doClose();
1844   }
1845 }
1846
1847 void AsyncSocket::finishFail() {
1848   assert(state_ == StateEnum::ERROR);
1849   assert(getDestructorGuardCount() > 0);
1850
1851   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1852                          withAddr("socket closing after error"));
1853   invokeConnectErr(ex);
1854   failAllWrites(ex);
1855
1856   if (readCallback_) {
1857     ReadCallback* callback = readCallback_;
1858     readCallback_ = nullptr;
1859     callback->readErr(ex);
1860   }
1861 }
1862
1863 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1864   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1865              << state_ << " host=" << addr_.describe()
1866              << "): failed in " << fn << "(): "
1867              << ex.what();
1868   startFail();
1869   finishFail();
1870 }
1871
1872 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1873   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1874                << state_ << " host=" << addr_.describe()
1875                << "): failed while connecting in " << fn << "(): "
1876                << ex.what();
1877   startFail();
1878
1879   invokeConnectErr(ex);
1880   finishFail();
1881 }
1882
1883 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1884   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1885                << state_ << " host=" << addr_.describe()
1886                << "): failed while reading in " << fn << "(): "
1887                << ex.what();
1888   startFail();
1889
1890   if (readCallback_ != nullptr) {
1891     ReadCallback* callback = readCallback_;
1892     readCallback_ = nullptr;
1893     callback->readErr(ex);
1894   }
1895
1896   finishFail();
1897 }
1898
1899 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1900   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1901                << state_ << " host=" << addr_.describe()
1902                << "): failed while writing in " << fn << "(): "
1903                << ex.what();
1904   startFail();
1905
1906   // Only invoke the first write callback, since the error occurred while
1907   // writing this request.  Let any other pending write callbacks be invoked in
1908   // finishFail().
1909   if (writeReqHead_ != nullptr) {
1910     WriteRequest* req = writeReqHead_;
1911     writeReqHead_ = req->getNext();
1912     WriteCallback* callback = req->getCallback();
1913     uint32_t bytesWritten = req->getTotalBytesWritten();
1914     req->destroy();
1915     if (callback) {
1916       callback->writeErr(bytesWritten, ex);
1917     }
1918   }
1919
1920   finishFail();
1921 }
1922
1923 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1924                              size_t bytesWritten,
1925                              const AsyncSocketException& ex) {
1926   // This version of failWrite() is used when the failure occurs before
1927   // we've added the callback to writeReqHead_.
1928   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1929              << state_ << " host=" << addr_.describe()
1930              <<"): failed while writing in " << fn << "(): "
1931              << ex.what();
1932   startFail();
1933
1934   if (callback != nullptr) {
1935     callback->writeErr(bytesWritten, ex);
1936   }
1937
1938   finishFail();
1939 }
1940
1941 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1942   // Invoke writeError() on all write callbacks.
1943   // This is used when writes are forcibly shutdown with write requests
1944   // pending, or when an error occurs with writes pending.
1945   while (writeReqHead_ != nullptr) {
1946     WriteRequest* req = writeReqHead_;
1947     writeReqHead_ = req->getNext();
1948     WriteCallback* callback = req->getCallback();
1949     if (callback) {
1950       callback->writeErr(req->getTotalBytesWritten(), ex);
1951     }
1952     req->destroy();
1953   }
1954 }
1955
1956 void AsyncSocket::invalidState(ConnectCallback* callback) {
1957   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1958              << "): connect() called in invalid state " << state_;
1959
1960   /*
1961    * The invalidState() methods don't use the normal failure mechanisms,
1962    * since we don't know what state we are in.  We don't want to call
1963    * startFail()/finishFail() recursively if we are already in the middle of
1964    * cleaning up.
1965    */
1966
1967   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1968                          "connect() called with socket in invalid state");
1969   connectEndTime_ = std::chrono::steady_clock::now();
1970   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1971     if (callback) {
1972       callback->connectErr(ex);
1973     }
1974   } else {
1975     // We can't use failConnect() here since connectCallback_
1976     // may already be set to another callback.  Invoke this ConnectCallback
1977     // here; any other connectCallback_ will be invoked in finishFail()
1978     startFail();
1979     if (callback) {
1980       callback->connectErr(ex);
1981     }
1982     finishFail();
1983   }
1984 }
1985
1986 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1987   connectEndTime_ = std::chrono::steady_clock::now();
1988   if (connectCallback_) {
1989     ConnectCallback* callback = connectCallback_;
1990     connectCallback_ = nullptr;
1991     callback->connectErr(ex);
1992   }
1993 }
1994
1995 void AsyncSocket::invokeConnectSuccess() {
1996   connectEndTime_ = std::chrono::steady_clock::now();
1997   if (connectCallback_) {
1998     ConnectCallback* callback = connectCallback_;
1999     connectCallback_ = nullptr;
2000     callback->connectSuccess();
2001   }
2002 }
2003
2004 void AsyncSocket::invalidState(ReadCallback* callback) {
2005   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2006              << "): setReadCallback(" << callback
2007              << ") called in invalid state " << state_;
2008
2009   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2010                          "setReadCallback() called with socket in "
2011                          "invalid state");
2012   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2013     if (callback) {
2014       callback->readErr(ex);
2015     }
2016   } else {
2017     startFail();
2018     if (callback) {
2019       callback->readErr(ex);
2020     }
2021     finishFail();
2022   }
2023 }
2024
2025 void AsyncSocket::invalidState(WriteCallback* callback) {
2026   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2027              << "): write() called in invalid state " << state_;
2028
2029   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2030                          withAddr("write() called with socket in invalid state"));
2031   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2032     if (callback) {
2033       callback->writeErr(0, ex);
2034     }
2035   } else {
2036     startFail();
2037     if (callback) {
2038       callback->writeErr(0, ex);
2039     }
2040     finishFail();
2041   }
2042 }
2043
2044 void AsyncSocket::doClose() {
2045   if (fd_ == -1) return;
2046   if (shutdownSocketSet_) {
2047     shutdownSocketSet_->close(fd_);
2048   } else {
2049     ::close(fd_);
2050   }
2051   fd_ = -1;
2052 }
2053
2054 std::ostream& operator << (std::ostream& os,
2055                            const AsyncSocket::StateEnum& state) {
2056   os << static_cast<int>(state);
2057   return os;
2058 }
2059
2060 std::string AsyncSocket::withAddr(const std::string& s) {
2061   // Don't use addr_ directly because it may not be initialized
2062   // e.g. if constructed from fd
2063   folly::SocketAddress peer, local;
2064   try {
2065     getPeerAddress(&peer);
2066     getLocalAddress(&local);
2067   } catch (const std::exception&) {
2068     // ignore
2069   } catch (...) {
2070     // ignore
2071   }
2072   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2073 }
2074
2075 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2076   bufferCallback_ = cb;
2077 }
2078
2079 } // folly