0fe8e88613c013b759771b476ed6b7db33344d3c
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/Sockets.h>
25 #include <folly/portability/SysUio.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <boost/preprocessor/control/if.hpp>
33
34 using std::string;
35 using std::unique_ptr;
36
37 namespace folly {
38
39 // static members initializers
40 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
41
42 const AsyncSocketException socketClosedLocallyEx(
43     AsyncSocketException::END_OF_FILE, "socket closed locally");
44 const AsyncSocketException socketShutdownForWritesEx(
45     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
46
47 // TODO: It might help performance to provide a version of BytesWriteRequest that
48 // users could derive from, so we can avoid the extra allocation for each call
49 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
50 // protocols are currently templatized for transports.
51 //
52 // We would need the version for external users where they provide the iovec
53 // storage space, and only our internal version would allocate it at the end of
54 // the WriteRequest.
55
56 /* The default WriteRequest implementation, used for write(), writev() and
57  * writeChain()
58  *
59  * A new BytesWriteRequest operation is allocated on the heap for all write
60  * operations that cannot be completed immediately.
61  */
62 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
63  public:
64   static BytesWriteRequest* newRequest(AsyncSocket* socket,
65                                        WriteCallback* callback,
66                                        const iovec* ops,
67                                        uint32_t opCount,
68                                        uint32_t partialWritten,
69                                        uint32_t bytesWritten,
70                                        unique_ptr<IOBuf>&& ioBuf,
71                                        WriteFlags flags) {
72     assert(opCount > 0);
73     // Since we put a variable size iovec array at the end
74     // of each BytesWriteRequest, we have to manually allocate the memory.
75     void* buf = malloc(sizeof(BytesWriteRequest) +
76                        (opCount * sizeof(struct iovec)));
77     if (buf == nullptr) {
78       throw std::bad_alloc();
79     }
80
81     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
82                                       partialWritten, bytesWritten,
83                                       std::move(ioBuf), flags);
84   }
85
86   void destroy() override {
87     this->~BytesWriteRequest();
88     free(this);
89   }
90
91   WriteResult performWrite() override {
92     WriteFlags writeFlags = flags_;
93     if (getNext() != nullptr) {
94       writeFlags = writeFlags | WriteFlags::CORK;
95     }
96     return socket_->performWrite(
97         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
98   }
99
100   bool isComplete() override {
101     return opsWritten_ == getOpCount();
102   }
103
104   void consume() override {
105     // Advance opIndex_ forward by opsWritten_
106     opIndex_ += opsWritten_;
107     assert(opIndex_ < opCount_);
108
109     // If we've finished writing any IOBufs, release them
110     if (ioBuf_) {
111       for (uint32_t i = opsWritten_; i != 0; --i) {
112         assert(ioBuf_);
113         ioBuf_ = ioBuf_->pop();
114       }
115     }
116
117     // Move partialBytes_ forward into the current iovec buffer
118     struct iovec* currentOp = writeOps_ + opIndex_;
119     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
120     currentOp->iov_base =
121       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
122     currentOp->iov_len -= partialBytes_;
123
124     // Increment the totalBytesWritten_ count by bytesWritten_;
125     totalBytesWritten_ += bytesWritten_;
126   }
127
128  private:
129   BytesWriteRequest(AsyncSocket* socket,
130                     WriteCallback* callback,
131                     const struct iovec* ops,
132                     uint32_t opCount,
133                     uint32_t partialBytes,
134                     uint32_t bytesWritten,
135                     unique_ptr<IOBuf>&& ioBuf,
136                     WriteFlags flags)
137     : AsyncSocket::WriteRequest(socket, callback)
138     , opCount_(opCount)
139     , opIndex_(0)
140     , flags_(flags)
141     , ioBuf_(std::move(ioBuf))
142     , opsWritten_(0)
143     , partialBytes_(partialBytes)
144     , bytesWritten_(bytesWritten) {
145     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
146   }
147
148   // private destructor, to ensure callers use destroy()
149   ~BytesWriteRequest() override = default;
150
151   const struct iovec* getOps() const {
152     assert(opCount_ > opIndex_);
153     return writeOps_ + opIndex_;
154   }
155
156   uint32_t getOpCount() const {
157     assert(opCount_ > opIndex_);
158     return opCount_ - opIndex_;
159   }
160
161   uint32_t opCount_;            ///< number of entries in writeOps_
162   uint32_t opIndex_;            ///< current index into writeOps_
163   WriteFlags flags_;            ///< set for WriteFlags
164   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
165
166   // for consume(), how much we wrote on the last write
167   uint32_t opsWritten_;         ///< complete ops written
168   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
169   ssize_t bytesWritten_;        ///< bytes written altogether
170
171   struct iovec writeOps_[];     ///< write operation(s) list
172 };
173
174 AsyncSocket::AsyncSocket()
175   : eventBase_(nullptr)
176   , writeTimeout_(this, nullptr)
177   , ioHandler_(this, nullptr)
178   , immediateReadHandler_(this) {
179   VLOG(5) << "new AsyncSocket()";
180   init();
181 }
182
183 AsyncSocket::AsyncSocket(EventBase* evb)
184   : eventBase_(evb)
185   , writeTimeout_(this, evb)
186   , ioHandler_(this, evb)
187   , immediateReadHandler_(this) {
188   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
189   init();
190 }
191
192 AsyncSocket::AsyncSocket(EventBase* evb,
193                            const folly::SocketAddress& address,
194                            uint32_t connectTimeout)
195   : AsyncSocket(evb) {
196   connect(nullptr, address, connectTimeout);
197 }
198
199 AsyncSocket::AsyncSocket(EventBase* evb,
200                            const std::string& ip,
201                            uint16_t port,
202                            uint32_t connectTimeout)
203   : AsyncSocket(evb) {
204   connect(nullptr, ip, port, connectTimeout);
205 }
206
207 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
208   : eventBase_(evb)
209   , writeTimeout_(this, evb)
210   , ioHandler_(this, evb, fd)
211   , immediateReadHandler_(this) {
212   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
213           << fd << ")";
214   init();
215   fd_ = fd;
216   setCloseOnExec();
217   state_ = StateEnum::ESTABLISHED;
218 }
219
220 // init() method, since constructor forwarding isn't supported in most
221 // compilers yet.
222 void AsyncSocket::init() {
223   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
224   shutdownFlags_ = 0;
225   state_ = StateEnum::UNINIT;
226   eventFlags_ = EventHandler::NONE;
227   fd_ = -1;
228   sendTimeout_ = 0;
229   maxReadsPerEvent_ = 16;
230   connectCallback_ = nullptr;
231   readCallback_ = nullptr;
232   writeReqHead_ = nullptr;
233   writeReqTail_ = nullptr;
234   shutdownSocketSet_ = nullptr;
235   appBytesWritten_ = 0;
236   appBytesReceived_ = 0;
237 }
238
239 AsyncSocket::~AsyncSocket() {
240   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
241           << ", evb=" << eventBase_ << ", fd=" << fd_
242           << ", state=" << state_ << ")";
243 }
244
245 void AsyncSocket::destroy() {
246   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
247           << ", fd=" << fd_ << ", state=" << state_;
248   // When destroy is called, close the socket immediately
249   closeNow();
250
251   // Then call DelayedDestruction::destroy() to take care of
252   // whether or not we need immediate or delayed destruction
253   DelayedDestruction::destroy();
254 }
255
256 int AsyncSocket::detachFd() {
257   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
258           << ", evb=" << eventBase_ << ", state=" << state_
259           << ", events=" << std::hex << eventFlags_ << ")";
260   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
261   // actually close the descriptor.
262   if (shutdownSocketSet_) {
263     shutdownSocketSet_->remove(fd_);
264   }
265   int fd = fd_;
266   fd_ = -1;
267   // Call closeNow() to invoke all pending callbacks with an error.
268   closeNow();
269   // Update the EventHandler to stop using this fd.
270   // This can only be done after closeNow() unregisters the handler.
271   ioHandler_.changeHandlerFD(-1);
272   return fd;
273 }
274
275 const folly::SocketAddress& AsyncSocket::anyAddress() {
276   static const folly::SocketAddress anyAddress =
277     folly::SocketAddress("0.0.0.0", 0);
278   return anyAddress;
279 }
280
281 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
282   if (shutdownSocketSet_ == newSS) {
283     return;
284   }
285   if (shutdownSocketSet_ && fd_ != -1) {
286     shutdownSocketSet_->remove(fd_);
287   }
288   shutdownSocketSet_ = newSS;
289   if (shutdownSocketSet_ && fd_ != -1) {
290     shutdownSocketSet_->add(fd_);
291   }
292 }
293
294 void AsyncSocket::setCloseOnExec() {
295   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
296   if (rv != 0) {
297     auto errnoCopy = errno;
298     throw AsyncSocketException(
299         AsyncSocketException::INTERNAL_ERROR,
300         withAddr("failed to set close-on-exec flag"),
301         errnoCopy);
302   }
303 }
304
305 void AsyncSocket::connect(ConnectCallback* callback,
306                            const folly::SocketAddress& address,
307                            int timeout,
308                            const OptionMap &options,
309                            const folly::SocketAddress& bindAddr) noexcept {
310   DestructorGuard dg(this);
311   assert(eventBase_->isInEventBaseThread());
312
313   addr_ = address;
314
315   // Make sure we're in the uninitialized state
316   if (state_ != StateEnum::UNINIT) {
317     return invalidState(callback);
318   }
319
320   connectTimeout_ = std::chrono::milliseconds(timeout);
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       auto errnoCopy = errno;
341       throw AsyncSocketException(
342           AsyncSocketException::INTERNAL_ERROR,
343           withAddr("failed to create socket"),
344           errnoCopy);
345     }
346     if (shutdownSocketSet_) {
347       shutdownSocketSet_->add(fd_);
348     }
349     ioHandler_.changeHandlerFD(fd_);
350
351     setCloseOnExec();
352
353     // Put the socket in non-blocking mode
354     int flags = fcntl(fd_, F_GETFL, 0);
355     if (flags == -1) {
356       auto errnoCopy = errno;
357       throw AsyncSocketException(
358           AsyncSocketException::INTERNAL_ERROR,
359           withAddr("failed to get socket flags"),
360           errnoCopy);
361     }
362     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
363     if (rv == -1) {
364       auto errnoCopy = errno;
365       throw AsyncSocketException(
366           AsyncSocketException::INTERNAL_ERROR,
367           withAddr("failed to put socket in non-blocking mode"),
368           errnoCopy);
369     }
370
371 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
372     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
373     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
374     if (rv == -1) {
375       auto errnoCopy = errno;
376       throw AsyncSocketException(
377           AsyncSocketException::INTERNAL_ERROR,
378           "failed to enable F_SETNOSIGPIPE on socket",
379           errnoCopy);
380     }
381 #endif
382
383     // By default, turn on TCP_NODELAY
384     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
385     // setNoDelay() will log an error message if it fails.
386     if (address.getFamily() != AF_UNIX) {
387       (void)setNoDelay(true);
388     }
389
390     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
391             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
392
393     // bind the socket
394     if (bindAddr != anyAddress()) {
395       int one = 1;
396       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
397         auto errnoCopy = errno;
398         doClose();
399         throw AsyncSocketException(
400             AsyncSocketException::NOT_OPEN,
401             "failed to setsockopt prior to bind on " + bindAddr.describe(),
402             errnoCopy);
403       }
404
405       bindAddr.getAddress(&addrStorage);
406
407       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
408         auto errnoCopy = errno;
409         doClose();
410         throw AsyncSocketException(
411             AsyncSocketException::NOT_OPEN,
412             "failed to bind to async socket: " + bindAddr.describe(),
413             errnoCopy);
414       }
415     }
416
417     // Apply the additional options if any.
418     for (const auto& opt: options) {
419       int rv = opt.first.apply(fd_, opt.second);
420       if (rv != 0) {
421         auto errnoCopy = errno;
422         throw AsyncSocketException(
423             AsyncSocketException::INTERNAL_ERROR,
424             withAddr("failed to set socket option"),
425             errnoCopy);
426       }
427     }
428
429     // Perform the connect()
430     address.getAddress(&addrStorage);
431
432     rv = ::connect(fd_, saddr, address.getActualSize());
433     if (rv < 0) {
434       auto errnoCopy = errno;
435       if (errnoCopy == EINPROGRESS) {
436         // Connection in progress.
437         if (timeout > 0) {
438           // Start a timer in case the connection takes too long.
439           if (!writeTimeout_.scheduleTimeout(timeout)) {
440             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
441                 withAddr("failed to schedule AsyncSocket connect timeout"));
442           }
443         }
444
445         // Register for write events, so we'll
446         // be notified when the connection finishes/fails.
447         // Note that we don't register for a persistent event here.
448         assert(eventFlags_ == EventHandler::NONE);
449         eventFlags_ = EventHandler::WRITE;
450         if (!ioHandler_.registerHandler(eventFlags_)) {
451           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
452               withAddr("failed to register AsyncSocket connect handler"));
453         }
454         return;
455       } else {
456         throw AsyncSocketException(
457             AsyncSocketException::NOT_OPEN,
458             "connect failed (immediately)",
459             errnoCopy);
460       }
461     }
462
463     // If we're still here the connect() succeeded immediately.
464     // Fall through to call the callback outside of this try...catch block
465   } catch (const AsyncSocketException& ex) {
466     return failConnect(__func__, ex);
467   } catch (const std::exception& ex) {
468     // shouldn't happen, but handle it just in case
469     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
470                << "): unexpected " << typeid(ex).name() << " exception: "
471                << ex.what();
472     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
473                             withAddr(string("unexpected exception: ") +
474                                      ex.what()));
475     return failConnect(__func__, tex);
476   }
477
478   // The connection succeeded immediately
479   // The read callback may not have been set yet, and no writes may be pending
480   // yet, so we don't have to register for any events at the moment.
481   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
482   assert(readCallback_ == nullptr);
483   assert(writeReqHead_ == nullptr);
484   state_ = StateEnum::ESTABLISHED;
485   invokeConnectSuccess();
486 }
487
488 void AsyncSocket::connect(ConnectCallback* callback,
489                            const string& ip, uint16_t port,
490                            int timeout,
491                            const OptionMap &options) noexcept {
492   DestructorGuard dg(this);
493   try {
494     connectCallback_ = callback;
495     connect(callback, folly::SocketAddress(ip, port), timeout, options);
496   } catch (const std::exception& ex) {
497     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
498                             ex.what());
499     return failConnect(__func__, tex);
500   }
501 }
502
503 void AsyncSocket::cancelConnect() {
504   connectCallback_ = nullptr;
505   if (state_ == StateEnum::CONNECTING) {
506     closeNow();
507   }
508 }
509
510 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
511   sendTimeout_ = milliseconds;
512   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
513
514   // If we are currently pending on write requests, immediately update
515   // writeTimeout_ with the new value.
516   if ((eventFlags_ & EventHandler::WRITE) &&
517       (state_ != StateEnum::CONNECTING)) {
518     assert(state_ == StateEnum::ESTABLISHED);
519     assert((shutdownFlags_ & SHUT_WRITE) == 0);
520     if (sendTimeout_ > 0) {
521       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
522         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
523             withAddr("failed to reschedule send timeout in setSendTimeout"));
524         return failWrite(__func__, ex);
525       }
526     } else {
527       writeTimeout_.cancelTimeout();
528     }
529   }
530 }
531
532 void AsyncSocket::setReadCB(ReadCallback *callback) {
533   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
534           << ", callback=" << callback << ", state=" << state_;
535
536   // Short circuit if callback is the same as the existing readCallback_.
537   //
538   // Note that this is needed for proper functioning during some cleanup cases.
539   // During cleanup we allow setReadCallback(nullptr) to be called even if the
540   // read callback is already unset and we have been detached from an event
541   // base.  This check prevents us from asserting
542   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
543   if (callback == readCallback_) {
544     return;
545   }
546
547   /* We are removing a read callback */
548   if (callback == nullptr &&
549       immediateReadHandler_.isLoopCallbackScheduled()) {
550     immediateReadHandler_.cancelLoopCallback();
551   }
552
553   if (shutdownFlags_ & SHUT_READ) {
554     // Reads have already been shut down on this socket.
555     //
556     // Allow setReadCallback(nullptr) to be called in this case, but don't
557     // allow a new callback to be set.
558     //
559     // For example, setReadCallback(nullptr) can happen after an error if we
560     // invoke some other error callback before invoking readError().  The other
561     // error callback that is invoked first may go ahead and clear the read
562     // callback before we get a chance to invoke readError().
563     if (callback != nullptr) {
564       return invalidState(callback);
565     }
566     assert((eventFlags_ & EventHandler::READ) == 0);
567     readCallback_ = nullptr;
568     return;
569   }
570
571   DestructorGuard dg(this);
572   assert(eventBase_->isInEventBaseThread());
573
574   switch ((StateEnum)state_) {
575     case StateEnum::CONNECTING:
576       // For convenience, we allow the read callback to be set while we are
577       // still connecting.  We just store the callback for now.  Once the
578       // connection completes we'll register for read events.
579       readCallback_ = callback;
580       return;
581     case StateEnum::ESTABLISHED:
582     {
583       readCallback_ = callback;
584       uint16_t oldFlags = eventFlags_;
585       if (readCallback_) {
586         eventFlags_ |= EventHandler::READ;
587       } else {
588         eventFlags_ &= ~EventHandler::READ;
589       }
590
591       // Update our registration if our flags have changed
592       if (eventFlags_ != oldFlags) {
593         // We intentionally ignore the return value here.
594         // updateEventRegistration() will move us into the error state if it
595         // fails, and we don't need to do anything else here afterwards.
596         (void)updateEventRegistration();
597       }
598
599       if (readCallback_) {
600         checkForImmediateRead();
601       }
602       return;
603     }
604     case StateEnum::CLOSED:
605     case StateEnum::ERROR:
606       // We should never reach here.  SHUT_READ should always be set
607       // if we are in STATE_CLOSED or STATE_ERROR.
608       assert(false);
609       return invalidState(callback);
610     case StateEnum::UNINIT:
611       // We do not allow setReadCallback() to be called before we start
612       // connecting.
613       return invalidState(callback);
614   }
615
616   // We don't put a default case in the switch statement, so that the compiler
617   // will warn us to update the switch statement if a new state is added.
618   return invalidState(callback);
619 }
620
621 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
622   return readCallback_;
623 }
624
625 void AsyncSocket::write(WriteCallback* callback,
626                          const void* buf, size_t bytes, WriteFlags flags) {
627   iovec op;
628   op.iov_base = const_cast<void*>(buf);
629   op.iov_len = bytes;
630   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
631 }
632
633 void AsyncSocket::writev(WriteCallback* callback,
634                           const iovec* vec,
635                           size_t count,
636                           WriteFlags flags) {
637   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
638 }
639
640 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
641                               WriteFlags flags) {
642   constexpr size_t kSmallSizeMax = 64;
643   size_t count = buf->countChainElements();
644   if (count <= kSmallSizeMax) {
645     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
646     writeChainImpl(callback, vec, count, std::move(buf), flags);
647   } else {
648     iovec* vec = new iovec[count];
649     writeChainImpl(callback, vec, count, std::move(buf), flags);
650     delete[] vec;
651   }
652 }
653
654 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
655     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
656   size_t veclen = buf->fillIov(vec, count);
657   writeImpl(callback, vec, veclen, std::move(buf), flags);
658 }
659
660 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
661                              size_t count, unique_ptr<IOBuf>&& buf,
662                              WriteFlags flags) {
663   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
664           << ", callback=" << callback << ", count=" << count
665           << ", state=" << state_;
666   DestructorGuard dg(this);
667   unique_ptr<IOBuf>ioBuf(std::move(buf));
668   assert(eventBase_->isInEventBaseThread());
669
670   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
671     // No new writes may be performed after the write side of the socket has
672     // been shutdown.
673     //
674     // We could just call callback->writeError() here to fail just this write.
675     // However, fail hard and use invalidState() to fail all outstanding
676     // callbacks and move the socket into the error state.  There's most likely
677     // a bug in the caller's code, so we abort everything rather than trying to
678     // proceed as best we can.
679     return invalidState(callback);
680   }
681
682   uint32_t countWritten = 0;
683   uint32_t partialWritten = 0;
684   int bytesWritten = 0;
685   bool mustRegister = false;
686   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
687     if (writeReqHead_ == nullptr) {
688       // If we are established and there are no other writes pending,
689       // we can attempt to perform the write immediately.
690       assert(writeReqTail_ == nullptr);
691       assert((eventFlags_ & EventHandler::WRITE) == 0);
692
693       auto writeResult =
694           performWrite(vec, count, flags, &countWritten, &partialWritten);
695       bytesWritten = writeResult.writeReturn;
696       if (bytesWritten < 0) {
697         auto errnoCopy = errno;
698         if (writeResult.exception) {
699           return failWrite(__func__, callback, 0, *writeResult.exception);
700         }
701         AsyncSocketException ex(
702             AsyncSocketException::INTERNAL_ERROR,
703             withAddr("writev failed"),
704             errnoCopy);
705         return failWrite(__func__, callback, 0, ex);
706       } else if (countWritten == count) {
707         // We successfully wrote everything.
708         // Invoke the callback and return.
709         if (callback) {
710           callback->writeSuccess();
711         }
712         return;
713       } else { // continue writing the next writeReq
714         if (bufferCallback_) {
715           bufferCallback_->onEgressBuffered();
716         }
717       }
718       mustRegister = true;
719     }
720   } else if (!connecting()) {
721     // Invalid state for writing
722     return invalidState(callback);
723   }
724
725   // Create a new WriteRequest to add to the queue
726   WriteRequest* req;
727   try {
728     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
729                                         count - countWritten, partialWritten,
730                                         bytesWritten, std::move(ioBuf), flags);
731   } catch (const std::exception& ex) {
732     // we mainly expect to catch std::bad_alloc here
733     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
734         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
735     return failWrite(__func__, callback, bytesWritten, tex);
736   }
737   req->consume();
738   if (writeReqTail_ == nullptr) {
739     assert(writeReqHead_ == nullptr);
740     writeReqHead_ = writeReqTail_ = req;
741   } else {
742     writeReqTail_->append(req);
743     writeReqTail_ = req;
744   }
745
746   // Register for write events if are established and not currently
747   // waiting on write events
748   if (mustRegister) {
749     assert(state_ == StateEnum::ESTABLISHED);
750     assert((eventFlags_ & EventHandler::WRITE) == 0);
751     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
752       assert(state_ == StateEnum::ERROR);
753       return;
754     }
755     if (sendTimeout_ > 0) {
756       // Schedule a timeout to fire if the write takes too long.
757       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
758         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
759                                withAddr("failed to schedule send timeout"));
760         return failWrite(__func__, ex);
761       }
762     }
763   }
764 }
765
766 void AsyncSocket::writeRequest(WriteRequest* req) {
767   if (writeReqTail_ == nullptr) {
768     assert(writeReqHead_ == nullptr);
769     writeReqHead_ = writeReqTail_ = req;
770     req->start();
771   } else {
772     writeReqTail_->append(req);
773     writeReqTail_ = req;
774   }
775 }
776
777 void AsyncSocket::close() {
778   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
779           << ", state=" << state_ << ", shutdownFlags="
780           << std::hex << (int) shutdownFlags_;
781
782   // close() is only different from closeNow() when there are pending writes
783   // that need to drain before we can close.  In all other cases, just call
784   // closeNow().
785   //
786   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
787   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
788   // is still running.  (e.g., If there are multiple pending writes, and we
789   // call writeError() on the first one, it may call close().  In this case we
790   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
791   // writes will still be in the queue.)
792   //
793   // We only need to drain pending writes if we are still in STATE_CONNECTING
794   // or STATE_ESTABLISHED
795   if ((writeReqHead_ == nullptr) ||
796       !(state_ == StateEnum::CONNECTING ||
797       state_ == StateEnum::ESTABLISHED)) {
798     closeNow();
799     return;
800   }
801
802   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
803   // destroyed until close() returns.
804   DestructorGuard dg(this);
805   assert(eventBase_->isInEventBaseThread());
806
807   // Since there are write requests pending, we have to set the
808   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
809   // connect finishes and we finish writing these requests.
810   //
811   // Set SHUT_READ to indicate that reads are shut down, and set the
812   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
813   // pending writes complete.
814   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
815
816   // If a read callback is set, invoke readEOF() immediately to inform it that
817   // the socket has been closed and no more data can be read.
818   if (readCallback_) {
819     // Disable reads if they are enabled
820     if (!updateEventRegistration(0, EventHandler::READ)) {
821       // We're now in the error state; callbacks have been cleaned up
822       assert(state_ == StateEnum::ERROR);
823       assert(readCallback_ == nullptr);
824     } else {
825       ReadCallback* callback = readCallback_;
826       readCallback_ = nullptr;
827       callback->readEOF();
828     }
829   }
830 }
831
832 void AsyncSocket::closeNow() {
833   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
834           << ", state=" << state_ << ", shutdownFlags="
835           << std::hex << (int) shutdownFlags_;
836   DestructorGuard dg(this);
837   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
838
839   switch (state_) {
840     case StateEnum::ESTABLISHED:
841     case StateEnum::CONNECTING:
842     {
843       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
844       state_ = StateEnum::CLOSED;
845
846       // If the write timeout was set, cancel it.
847       writeTimeout_.cancelTimeout();
848
849       // If we are registered for I/O events, unregister.
850       if (eventFlags_ != EventHandler::NONE) {
851         eventFlags_ = EventHandler::NONE;
852         if (!updateEventRegistration()) {
853           // We will have been moved into the error state.
854           assert(state_ == StateEnum::ERROR);
855           return;
856         }
857       }
858
859       if (immediateReadHandler_.isLoopCallbackScheduled()) {
860         immediateReadHandler_.cancelLoopCallback();
861       }
862
863       if (fd_ >= 0) {
864         ioHandler_.changeHandlerFD(-1);
865         doClose();
866       }
867
868       invokeConnectErr(socketClosedLocallyEx);
869
870       failAllWrites(socketClosedLocallyEx);
871
872       if (readCallback_) {
873         ReadCallback* callback = readCallback_;
874         readCallback_ = nullptr;
875         callback->readEOF();
876       }
877       return;
878     }
879     case StateEnum::CLOSED:
880       // Do nothing.  It's possible that we are being called recursively
881       // from inside a callback that we invoked inside another call to close()
882       // that is still running.
883       return;
884     case StateEnum::ERROR:
885       // Do nothing.  The error handling code has performed (or is performing)
886       // cleanup.
887       return;
888     case StateEnum::UNINIT:
889       assert(eventFlags_ == EventHandler::NONE);
890       assert(connectCallback_ == nullptr);
891       assert(readCallback_ == nullptr);
892       assert(writeReqHead_ == nullptr);
893       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
894       state_ = StateEnum::CLOSED;
895       return;
896   }
897
898   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
899               << ") called in unknown state " << state_;
900 }
901
902 void AsyncSocket::closeWithReset() {
903   // Enable SO_LINGER, with the linger timeout set to 0.
904   // This will trigger a TCP reset when we close the socket.
905   if (fd_ >= 0) {
906     struct linger optLinger = {1, 0};
907     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
908       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
909               << "on " << fd_ << ": errno=" << errno;
910     }
911   }
912
913   // Then let closeNow() take care of the rest
914   closeNow();
915 }
916
917 void AsyncSocket::shutdownWrite() {
918   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
919           << ", state=" << state_ << ", shutdownFlags="
920           << std::hex << (int) shutdownFlags_;
921
922   // If there are no pending writes, shutdownWrite() is identical to
923   // shutdownWriteNow().
924   if (writeReqHead_ == nullptr) {
925     shutdownWriteNow();
926     return;
927   }
928
929   assert(eventBase_->isInEventBaseThread());
930
931   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
932   // shutdown will be performed once all writes complete.
933   shutdownFlags_ |= SHUT_WRITE_PENDING;
934 }
935
936 void AsyncSocket::shutdownWriteNow() {
937   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
938           << ", fd=" << fd_ << ", state=" << state_
939           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
940
941   if (shutdownFlags_ & SHUT_WRITE) {
942     // Writes are already shutdown; nothing else to do.
943     return;
944   }
945
946   // If SHUT_READ is already set, just call closeNow() to completely
947   // close the socket.  This can happen if close() was called with writes
948   // pending, and then shutdownWriteNow() is called before all pending writes
949   // complete.
950   if (shutdownFlags_ & SHUT_READ) {
951     closeNow();
952     return;
953   }
954
955   DestructorGuard dg(this);
956   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
957
958   switch (static_cast<StateEnum>(state_)) {
959     case StateEnum::ESTABLISHED:
960     {
961       shutdownFlags_ |= SHUT_WRITE;
962
963       // If the write timeout was set, cancel it.
964       writeTimeout_.cancelTimeout();
965
966       // If we are registered for write events, unregister.
967       if (!updateEventRegistration(0, EventHandler::WRITE)) {
968         // We will have been moved into the error state.
969         assert(state_ == StateEnum::ERROR);
970         return;
971       }
972
973       // Shutdown writes on the file descriptor
974       ::shutdown(fd_, SHUT_WR);
975
976       // Immediately fail all write requests
977       failAllWrites(socketShutdownForWritesEx);
978       return;
979     }
980     case StateEnum::CONNECTING:
981     {
982       // Set the SHUT_WRITE_PENDING flag.
983       // When the connection completes, it will check this flag,
984       // shutdown the write half of the socket, and then set SHUT_WRITE.
985       shutdownFlags_ |= SHUT_WRITE_PENDING;
986
987       // Immediately fail all write requests
988       failAllWrites(socketShutdownForWritesEx);
989       return;
990     }
991     case StateEnum::UNINIT:
992       // Callers normally shouldn't call shutdownWriteNow() before the socket
993       // even starts connecting.  Nonetheless, go ahead and set
994       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
995       // immediately shut down the write side of the socket.
996       shutdownFlags_ |= SHUT_WRITE_PENDING;
997       return;
998     case StateEnum::CLOSED:
999     case StateEnum::ERROR:
1000       // We should never get here.  SHUT_WRITE should always be set
1001       // in STATE_CLOSED and STATE_ERROR.
1002       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1003                  << ", fd=" << fd_ << ") in unexpected state " << state_
1004                  << " with SHUT_WRITE not set ("
1005                  << std::hex << (int) shutdownFlags_ << ")";
1006       assert(false);
1007       return;
1008   }
1009
1010   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1011               << fd_ << ") called in unknown state " << state_;
1012 }
1013
1014 bool AsyncSocket::readable() const {
1015   if (fd_ == -1) {
1016     return false;
1017   }
1018   struct pollfd fds[1];
1019   fds[0].fd = fd_;
1020   fds[0].events = POLLIN;
1021   fds[0].revents = 0;
1022   int rc = poll(fds, 1, 0);
1023   return rc == 1;
1024 }
1025
1026 bool AsyncSocket::isPending() const {
1027   return ioHandler_.isPending();
1028 }
1029
1030 bool AsyncSocket::hangup() const {
1031   if (fd_ == -1) {
1032     // sanity check, no one should ask for hangup if we are not connected.
1033     assert(false);
1034     return false;
1035   }
1036 #ifdef POLLRDHUP // Linux-only
1037   struct pollfd fds[1];
1038   fds[0].fd = fd_;
1039   fds[0].events = POLLRDHUP|POLLHUP;
1040   fds[0].revents = 0;
1041   poll(fds, 1, 0);
1042   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1043 #else
1044   return false;
1045 #endif
1046 }
1047
1048 bool AsyncSocket::good() const {
1049   return ((state_ == StateEnum::CONNECTING ||
1050           state_ == StateEnum::ESTABLISHED) &&
1051           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1052 }
1053
1054 bool AsyncSocket::error() const {
1055   return (state_ == StateEnum::ERROR);
1056 }
1057
1058 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1059   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1060           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1061           << ", state=" << state_ << ", events="
1062           << std::hex << eventFlags_ << ")";
1063   assert(eventBase_ == nullptr);
1064   assert(eventBase->isInEventBaseThread());
1065
1066   eventBase_ = eventBase;
1067   ioHandler_.attachEventBase(eventBase);
1068   writeTimeout_.attachEventBase(eventBase);
1069 }
1070
1071 void AsyncSocket::detachEventBase() {
1072   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1073           << ", old evb=" << eventBase_ << ", state=" << state_
1074           << ", events=" << std::hex << eventFlags_ << ")";
1075   assert(eventBase_ != nullptr);
1076   assert(eventBase_->isInEventBaseThread());
1077
1078   eventBase_ = nullptr;
1079   ioHandler_.detachEventBase();
1080   writeTimeout_.detachEventBase();
1081 }
1082
1083 bool AsyncSocket::isDetachable() const {
1084   DCHECK(eventBase_ != nullptr);
1085   DCHECK(eventBase_->isInEventBaseThread());
1086
1087   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1088 }
1089
1090 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1091   if (!localAddr_.isInitialized()) {
1092     localAddr_.setFromLocalAddress(fd_);
1093   }
1094   *address = localAddr_;
1095 }
1096
1097 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1098   if (!addr_.isInitialized()) {
1099     addr_.setFromPeerAddress(fd_);
1100   }
1101   *address = addr_;
1102 }
1103
1104 int AsyncSocket::setNoDelay(bool noDelay) {
1105   if (fd_ < 0) {
1106     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1107                << this << "(state=" << state_ << ")";
1108     return EINVAL;
1109
1110   }
1111
1112   int value = noDelay ? 1 : 0;
1113   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1114     int errnoCopy = errno;
1115     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1116             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1117             << strerror(errnoCopy);
1118     return errnoCopy;
1119   }
1120
1121   return 0;
1122 }
1123
1124 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1125
1126   #ifndef TCP_CONGESTION
1127   #define TCP_CONGESTION  13
1128   #endif
1129
1130   if (fd_ < 0) {
1131     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1132                << "socket " << this << "(state=" << state_ << ")";
1133     return EINVAL;
1134
1135   }
1136
1137   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1138         cname.length() + 1) != 0) {
1139     int errnoCopy = errno;
1140     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1141             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1142             << strerror(errnoCopy);
1143     return errnoCopy;
1144   }
1145
1146   return 0;
1147 }
1148
1149 int AsyncSocket::setQuickAck(bool quickack) {
1150   if (fd_ < 0) {
1151     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1152                << this << "(state=" << state_ << ")";
1153     return EINVAL;
1154
1155   }
1156
1157 #ifdef TCP_QUICKACK // Linux-only
1158   int value = quickack ? 1 : 0;
1159   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1160     int errnoCopy = errno;
1161     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1162             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1163             << strerror(errnoCopy);
1164     return errnoCopy;
1165   }
1166
1167   return 0;
1168 #else
1169   return ENOSYS;
1170 #endif
1171 }
1172
1173 int AsyncSocket::setSendBufSize(size_t bufsize) {
1174   if (fd_ < 0) {
1175     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1176                << this << "(state=" << state_ << ")";
1177     return EINVAL;
1178   }
1179
1180   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1181     int errnoCopy = errno;
1182     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1183             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1184             << strerror(errnoCopy);
1185     return errnoCopy;
1186   }
1187
1188   return 0;
1189 }
1190
1191 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1192   if (fd_ < 0) {
1193     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1194                << this << "(state=" << state_ << ")";
1195     return EINVAL;
1196   }
1197
1198   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1199     int errnoCopy = errno;
1200     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1201             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1202             << strerror(errnoCopy);
1203     return errnoCopy;
1204   }
1205
1206   return 0;
1207 }
1208
1209 int AsyncSocket::setTCPProfile(int profd) {
1210   if (fd_ < 0) {
1211     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1212                << this << "(state=" << state_ << ")";
1213     return EINVAL;
1214   }
1215
1216   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1217     int errnoCopy = errno;
1218     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1219             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1220             << strerror(errnoCopy);
1221     return errnoCopy;
1222   }
1223
1224   return 0;
1225 }
1226
1227 void AsyncSocket::ioReady(uint16_t events) noexcept {
1228   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1229           << ", events=" << std::hex << events << ", state=" << state_;
1230   DestructorGuard dg(this);
1231   assert(events & EventHandler::READ_WRITE);
1232   assert(eventBase_->isInEventBaseThread());
1233
1234   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1235   if (relevantEvents == EventHandler::READ) {
1236     handleRead();
1237   } else if (relevantEvents == EventHandler::WRITE) {
1238     handleWrite();
1239   } else if (relevantEvents == EventHandler::READ_WRITE) {
1240     EventBase* originalEventBase = eventBase_;
1241     // If both read and write events are ready, process writes first.
1242     handleWrite();
1243
1244     // Return now if handleWrite() detached us from our EventBase
1245     if (eventBase_ != originalEventBase) {
1246       return;
1247     }
1248
1249     // Only call handleRead() if a read callback is still installed.
1250     // (It's possible that the read callback was uninstalled during
1251     // handleWrite().)
1252     if (readCallback_) {
1253       handleRead();
1254     }
1255   } else {
1256     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1257                << std::hex << events << "(this=" << this << ")";
1258     abort();
1259   }
1260 }
1261
1262 AsyncSocket::ReadResult
1263 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1264   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1265           << ", buflen=" << *buflen;
1266
1267   int recvFlags = 0;
1268   if (peek_) {
1269     recvFlags |= MSG_PEEK;
1270   }
1271
1272   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1273   if (bytes < 0) {
1274     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1275       // No more data to read right now.
1276       return ReadResult(READ_BLOCKING);
1277     } else {
1278       return ReadResult(READ_ERROR);
1279     }
1280   } else {
1281     appBytesReceived_ += bytes;
1282     return ReadResult(bytes);
1283   }
1284 }
1285
1286 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1287   // no matter what, buffer should be preapared for non-ssl socket
1288   CHECK(readCallback_);
1289   readCallback_->getReadBuffer(buf, buflen);
1290 }
1291
1292 void AsyncSocket::handleRead() noexcept {
1293   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1294           << ", state=" << state_;
1295   assert(state_ == StateEnum::ESTABLISHED);
1296   assert((shutdownFlags_ & SHUT_READ) == 0);
1297   assert(readCallback_ != nullptr);
1298   assert(eventFlags_ & EventHandler::READ);
1299
1300   // Loop until:
1301   // - a read attempt would block
1302   // - readCallback_ is uninstalled
1303   // - the number of loop iterations exceeds the optional maximum
1304   // - this AsyncSocket is moved to another EventBase
1305   //
1306   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1307   // which is why need to check for it here.
1308   //
1309   // The last bullet point is slightly subtle.  readDataAvailable() may also
1310   // detach this socket from this EventBase.  However, before
1311   // readDataAvailable() returns another thread may pick it up, attach it to
1312   // a different EventBase, and install another readCallback_.  We need to
1313   // exit immediately after readDataAvailable() returns if the eventBase_ has
1314   // changed.  (The caller must perform some sort of locking to transfer the
1315   // AsyncSocket between threads properly.  This will be sufficient to ensure
1316   // that this thread sees the updated eventBase_ variable after
1317   // readDataAvailable() returns.)
1318   uint16_t numReads = 0;
1319   EventBase* originalEventBase = eventBase_;
1320   while (readCallback_ && eventBase_ == originalEventBase) {
1321     // Get the buffer to read into.
1322     void* buf = nullptr;
1323     size_t buflen = 0, offset = 0;
1324     try {
1325       prepareReadBuffer(&buf, &buflen);
1326       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1327     } catch (const AsyncSocketException& ex) {
1328       return failRead(__func__, ex);
1329     } catch (const std::exception& ex) {
1330       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1331                               string("ReadCallback::getReadBuffer() "
1332                                      "threw exception: ") +
1333                               ex.what());
1334       return failRead(__func__, tex);
1335     } catch (...) {
1336       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1337                              "ReadCallback::getReadBuffer() threw "
1338                              "non-exception type");
1339       return failRead(__func__, ex);
1340     }
1341     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1342       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1343                              "ReadCallback::getReadBuffer() returned "
1344                              "empty buffer");
1345       return failRead(__func__, ex);
1346     }
1347
1348     // Perform the read
1349     auto readResult = performRead(&buf, &buflen, &offset);
1350     auto bytesRead = readResult.readReturn;
1351     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1352             << bytesRead << " bytes";
1353     if (bytesRead > 0) {
1354       if (!isBufferMovable_) {
1355         readCallback_->readDataAvailable(bytesRead);
1356       } else {
1357         CHECK(kOpenSslModeMoveBufferOwnership);
1358         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1359                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1360                 << ", offset=" << offset;
1361         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1362         readBuf->trimStart(offset);
1363         readBuf->trimEnd(buflen - offset - bytesRead);
1364         readCallback_->readBufferAvailable(std::move(readBuf));
1365       }
1366
1367       // Fall through and continue around the loop if the read
1368       // completely filled the available buffer.
1369       // Note that readCallback_ may have been uninstalled or changed inside
1370       // readDataAvailable().
1371       if (size_t(bytesRead) < buflen) {
1372         return;
1373       }
1374     } else if (bytesRead == READ_BLOCKING) {
1375         // No more data to read right now.
1376         return;
1377     } else if (bytesRead == READ_ERROR) {
1378       readErr_ = READ_ERROR;
1379       if (readResult.exception) {
1380         return failRead(__func__, *readResult.exception);
1381       }
1382       auto errnoCopy = errno;
1383       AsyncSocketException ex(
1384           AsyncSocketException::INTERNAL_ERROR,
1385           withAddr("recv() failed"),
1386           errnoCopy);
1387       return failRead(__func__, ex);
1388     } else {
1389       assert(bytesRead == READ_EOF);
1390       readErr_ = READ_EOF;
1391       // EOF
1392       shutdownFlags_ |= SHUT_READ;
1393       if (!updateEventRegistration(0, EventHandler::READ)) {
1394         // we've already been moved into STATE_ERROR
1395         assert(state_ == StateEnum::ERROR);
1396         assert(readCallback_ == nullptr);
1397         return;
1398       }
1399
1400       ReadCallback* callback = readCallback_;
1401       readCallback_ = nullptr;
1402       callback->readEOF();
1403       return;
1404     }
1405     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1406       if (readCallback_ != nullptr) {
1407         // We might still have data in the socket.
1408         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1409         scheduleImmediateRead();
1410       }
1411       return;
1412     }
1413   }
1414 }
1415
1416 /**
1417  * This function attempts to write as much data as possible, until no more data
1418  * can be written.
1419  *
1420  * - If it sends all available data, it unregisters for write events, and stops
1421  *   the writeTimeout_.
1422  *
1423  * - If not all of the data can be sent immediately, it reschedules
1424  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1425  *   registered for write events.
1426  */
1427 void AsyncSocket::handleWrite() noexcept {
1428   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1429           << ", state=" << state_;
1430   DestructorGuard dg(this);
1431
1432   if (state_ == StateEnum::CONNECTING) {
1433     handleConnect();
1434     return;
1435   }
1436
1437   // Normal write
1438   assert(state_ == StateEnum::ESTABLISHED);
1439   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1440   assert(writeReqHead_ != nullptr);
1441
1442   // Loop until we run out of write requests,
1443   // or until this socket is moved to another EventBase.
1444   // (See the comment in handleRead() explaining how this can happen.)
1445   EventBase* originalEventBase = eventBase_;
1446   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1447     auto writeResult = writeReqHead_->performWrite();
1448     if (writeResult.writeReturn < 0) {
1449       if (writeResult.exception) {
1450         return failWrite(__func__, *writeResult.exception);
1451       }
1452       auto errnoCopy = errno;
1453       AsyncSocketException ex(
1454           AsyncSocketException::INTERNAL_ERROR,
1455           withAddr("writev() failed"),
1456           errnoCopy);
1457       return failWrite(__func__, ex);
1458     } else if (writeReqHead_->isComplete()) {
1459       // We finished this request
1460       WriteRequest* req = writeReqHead_;
1461       writeReqHead_ = req->getNext();
1462
1463       if (writeReqHead_ == nullptr) {
1464         writeReqTail_ = nullptr;
1465         // This is the last write request.
1466         // Unregister for write events and cancel the send timer
1467         // before we invoke the callback.  We have to update the state properly
1468         // before calling the callback, since it may want to detach us from
1469         // the EventBase.
1470         if (eventFlags_ & EventHandler::WRITE) {
1471           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1472             assert(state_ == StateEnum::ERROR);
1473             return;
1474           }
1475           // Stop the send timeout
1476           writeTimeout_.cancelTimeout();
1477         }
1478         assert(!writeTimeout_.isScheduled());
1479
1480         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1481         // we finish sending the last write request.
1482         //
1483         // We have to do this before invoking writeSuccess(), since
1484         // writeSuccess() may detach us from our EventBase.
1485         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1486           assert(connectCallback_ == nullptr);
1487           shutdownFlags_ |= SHUT_WRITE;
1488
1489           if (shutdownFlags_ & SHUT_READ) {
1490             // Reads have already been shutdown.  Fully close the socket and
1491             // move to STATE_CLOSED.
1492             //
1493             // Note: This code currently moves us to STATE_CLOSED even if
1494             // close() hasn't ever been called.  This can occur if we have
1495             // received EOF from the peer and shutdownWrite() has been called
1496             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1497             // case, until close() is actually called?  I can't think of a
1498             // reason why we would need to do so.  No other operations besides
1499             // calling close() or destroying the socket can be performed at
1500             // this point.
1501             assert(readCallback_ == nullptr);
1502             state_ = StateEnum::CLOSED;
1503             if (fd_ >= 0) {
1504               ioHandler_.changeHandlerFD(-1);
1505               doClose();
1506             }
1507           } else {
1508             // Reads are still enabled, so we are only doing a half-shutdown
1509             ::shutdown(fd_, SHUT_WR);
1510           }
1511         }
1512       }
1513
1514       // Invoke the callback
1515       WriteCallback* callback = req->getCallback();
1516       req->destroy();
1517       if (callback) {
1518         callback->writeSuccess();
1519       }
1520       // We'll continue around the loop, trying to write another request
1521     } else {
1522       // Partial write.
1523       if (bufferCallback_) {
1524         bufferCallback_->onEgressBuffered();
1525       }
1526       writeReqHead_->consume();
1527       // Stop after a partial write; it's highly likely that a subsequent write
1528       // attempt will just return EAGAIN.
1529       //
1530       // Ensure that we are registered for write events.
1531       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1532         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1533           assert(state_ == StateEnum::ERROR);
1534           return;
1535         }
1536       }
1537
1538       // Reschedule the send timeout, since we have made some write progress.
1539       if (sendTimeout_ > 0) {
1540         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1541           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1542               withAddr("failed to reschedule write timeout"));
1543           return failWrite(__func__, ex);
1544         }
1545       }
1546       return;
1547     }
1548   }
1549   if (!writeReqHead_ && bufferCallback_) {
1550     bufferCallback_->onEgressBufferCleared();
1551   }
1552 }
1553
1554 void AsyncSocket::checkForImmediateRead() noexcept {
1555   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1556   // (However, note that some subclasses do override this method.)
1557   //
1558   // Simply calling handleRead() here would be bad, as this would call
1559   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1560   // buffer even though no data may be available.  This would waste lots of
1561   // memory, since the buffer will sit around unused until the socket actually
1562   // becomes readable.
1563   //
1564   // Checking if the socket is readable now also seems like it would probably
1565   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1566   // would just waste an extra system call.  Even if it is readable, waiting to
1567   // find out from libevent on the next event loop doesn't seem that bad.
1568 }
1569
1570 void AsyncSocket::handleInitialReadWrite() noexcept {
1571   // Our callers should already be holding a DestructorGuard, but grab
1572   // one here just to make sure, in case one of our calling code paths ever
1573   // changes.
1574   DestructorGuard dg(this);
1575
1576   // If we have a readCallback_, make sure we enable read events.  We
1577   // may already be registered for reads if connectSuccess() set
1578   // the read calback.
1579   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1580     assert(state_ == StateEnum::ESTABLISHED);
1581     assert((shutdownFlags_ & SHUT_READ) == 0);
1582     if (!updateEventRegistration(EventHandler::READ, 0)) {
1583       assert(state_ == StateEnum::ERROR);
1584       return;
1585     }
1586     checkForImmediateRead();
1587   } else if (readCallback_ == nullptr) {
1588     // Unregister for read events.
1589     updateEventRegistration(0, EventHandler::READ);
1590   }
1591
1592   // If we have write requests pending, try to send them immediately.
1593   // Since we just finished accepting, there is a very good chance that we can
1594   // write without blocking.
1595   //
1596   // However, we only process them if EventHandler::WRITE is not already set,
1597   // which means that we're already blocked on a write attempt.  (This can
1598   // happen if connectSuccess() called write() before returning.)
1599   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1600     // Call handleWrite() to perform write processing.
1601     handleWrite();
1602   } else if (writeReqHead_ == nullptr) {
1603     // Unregister for write event.
1604     updateEventRegistration(0, EventHandler::WRITE);
1605   }
1606 }
1607
1608 void AsyncSocket::handleConnect() noexcept {
1609   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1610           << ", state=" << state_;
1611   assert(state_ == StateEnum::CONNECTING);
1612   // SHUT_WRITE can never be set while we are still connecting;
1613   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1614   // finishes
1615   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1616
1617   // In case we had a connect timeout, cancel the timeout
1618   writeTimeout_.cancelTimeout();
1619   // We don't use a persistent registration when waiting on a connect event,
1620   // so we have been automatically unregistered now.  Update eventFlags_ to
1621   // reflect reality.
1622   assert(eventFlags_ == EventHandler::WRITE);
1623   eventFlags_ = EventHandler::NONE;
1624
1625   // Call getsockopt() to check if the connect succeeded
1626   int error;
1627   socklen_t len = sizeof(error);
1628   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1629   if (rv != 0) {
1630     auto errnoCopy = errno;
1631     AsyncSocketException ex(
1632         AsyncSocketException::INTERNAL_ERROR,
1633         withAddr("error calling getsockopt() after connect"),
1634         errnoCopy);
1635     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1636                << fd_ << " host=" << addr_.describe()
1637                << ") exception:" << ex.what();
1638     return failConnect(__func__, ex);
1639   }
1640
1641   if (error != 0) {
1642     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1643                            "connect failed", error);
1644     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1645             << fd_ << " host=" << addr_.describe()
1646             << ") exception: " << ex.what();
1647     return failConnect(__func__, ex);
1648   }
1649
1650   // Move into STATE_ESTABLISHED
1651   state_ = StateEnum::ESTABLISHED;
1652
1653   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1654   // perform, immediately shutdown the write half of the socket.
1655   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1656     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1657     // are still connecting we just abort the connect rather than waiting for
1658     // it to complete.
1659     assert((shutdownFlags_ & SHUT_READ) == 0);
1660     ::shutdown(fd_, SHUT_WR);
1661     shutdownFlags_ |= SHUT_WRITE;
1662   }
1663
1664   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1665           << "successfully connected; state=" << state_;
1666
1667   // Remember the EventBase we are attached to, before we start invoking any
1668   // callbacks (since the callbacks may call detachEventBase()).
1669   EventBase* originalEventBase = eventBase_;
1670
1671   invokeConnectSuccess();
1672   // Note that the connect callback may have changed our state.
1673   // (set or unset the read callback, called write(), closed the socket, etc.)
1674   // The following code needs to handle these situations correctly.
1675   //
1676   // If the socket has been closed, readCallback_ and writeReqHead_ will
1677   // always be nullptr, so that will prevent us from trying to read or write.
1678   //
1679   // The main thing to check for is if eventBase_ is still originalEventBase.
1680   // If not, we have been detached from this event base, so we shouldn't
1681   // perform any more operations.
1682   if (eventBase_ != originalEventBase) {
1683     return;
1684   }
1685
1686   handleInitialReadWrite();
1687 }
1688
1689 void AsyncSocket::timeoutExpired() noexcept {
1690   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1691           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1692   DestructorGuard dg(this);
1693   assert(eventBase_->isInEventBaseThread());
1694
1695   if (state_ == StateEnum::CONNECTING) {
1696     // connect() timed out
1697     // Unregister for I/O events.
1698     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1699                            "connect timed out");
1700     failConnect(__func__, ex);
1701   } else {
1702     // a normal write operation timed out
1703     assert(state_ == StateEnum::ESTABLISHED);
1704     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1705     failWrite(__func__, ex);
1706   }
1707 }
1708
1709 AsyncSocket::WriteResult AsyncSocket::performWrite(
1710     const iovec* vec,
1711     uint32_t count,
1712     WriteFlags flags,
1713     uint32_t* countWritten,
1714     uint32_t* partialWritten) {
1715   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1716   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1717   // (since it may terminate the program if the main program doesn't explicitly
1718   // ignore it).
1719   struct msghdr msg;
1720   msg.msg_name = nullptr;
1721   msg.msg_namelen = 0;
1722   msg.msg_iov = const_cast<iovec *>(vec);
1723   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1724   msg.msg_control = nullptr;
1725   msg.msg_controllen = 0;
1726   msg.msg_flags = 0;
1727
1728   int msg_flags = MSG_DONTWAIT;
1729
1730 #ifdef MSG_NOSIGNAL // Linux-only
1731   msg_flags |= MSG_NOSIGNAL;
1732   if (isSet(flags, WriteFlags::CORK)) {
1733     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1734     // give it the rest of the data rather than immediately sending a partial
1735     // frame, even when TCP_NODELAY is enabled.
1736     msg_flags |= MSG_MORE;
1737   }
1738 #endif
1739   if (isSet(flags, WriteFlags::EOR)) {
1740     // marks that this is the last byte of a record (response)
1741     msg_flags |= MSG_EOR;
1742   }
1743   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1744   if (totalWritten < 0) {
1745     if (errno == EAGAIN) {
1746       // TCP buffer is full; we can't write any more data right now.
1747       *countWritten = 0;
1748       *partialWritten = 0;
1749       return WriteResult(0);
1750     }
1751     // error
1752     *countWritten = 0;
1753     *partialWritten = 0;
1754     return WriteResult(WRITE_ERROR);
1755   }
1756
1757   appBytesWritten_ += totalWritten;
1758
1759   uint32_t bytesWritten;
1760   uint32_t n;
1761   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1762     const iovec* v = vec + n;
1763     if (v->iov_len > bytesWritten) {
1764       // Partial write finished in the middle of this iovec
1765       *countWritten = n;
1766       *partialWritten = bytesWritten;
1767       return WriteResult(totalWritten);
1768     }
1769
1770     bytesWritten -= v->iov_len;
1771   }
1772
1773   assert(bytesWritten == 0);
1774   *countWritten = n;
1775   *partialWritten = 0;
1776   return WriteResult(totalWritten);
1777 }
1778
1779 /**
1780  * Re-register the EventHandler after eventFlags_ has changed.
1781  *
1782  * If an error occurs, fail() is called to move the socket into the error state
1783  * and call all currently installed callbacks.  After an error, the
1784  * AsyncSocket is completely unregistered.
1785  *
1786  * @return Returns true on succcess, or false on error.
1787  */
1788 bool AsyncSocket::updateEventRegistration() {
1789   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1790           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1791           << ", events=" << std::hex << eventFlags_;
1792   assert(eventBase_->isInEventBaseThread());
1793   if (eventFlags_ == EventHandler::NONE) {
1794     ioHandler_.unregisterHandler();
1795     return true;
1796   }
1797
1798   // Always register for persistent events, so we don't have to re-register
1799   // after being called back.
1800   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1801     eventFlags_ = EventHandler::NONE; // we're not registered after error
1802     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1803         withAddr("failed to update AsyncSocket event registration"));
1804     fail("updateEventRegistration", ex);
1805     return false;
1806   }
1807
1808   return true;
1809 }
1810
1811 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1812                                            uint16_t disable) {
1813   uint16_t oldFlags = eventFlags_;
1814   eventFlags_ |= enable;
1815   eventFlags_ &= ~disable;
1816   if (eventFlags_ == oldFlags) {
1817     return true;
1818   } else {
1819     return updateEventRegistration();
1820   }
1821 }
1822
1823 void AsyncSocket::startFail() {
1824   // startFail() should only be called once
1825   assert(state_ != StateEnum::ERROR);
1826   assert(getDestructorGuardCount() > 0);
1827   state_ = StateEnum::ERROR;
1828   // Ensure that SHUT_READ and SHUT_WRITE are set,
1829   // so all future attempts to read or write will be rejected
1830   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1831
1832   if (eventFlags_ != EventHandler::NONE) {
1833     eventFlags_ = EventHandler::NONE;
1834     ioHandler_.unregisterHandler();
1835   }
1836   writeTimeout_.cancelTimeout();
1837
1838   if (fd_ >= 0) {
1839     ioHandler_.changeHandlerFD(-1);
1840     doClose();
1841   }
1842 }
1843
1844 void AsyncSocket::finishFail() {
1845   assert(state_ == StateEnum::ERROR);
1846   assert(getDestructorGuardCount() > 0);
1847
1848   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1849                          withAddr("socket closing after error"));
1850   invokeConnectErr(ex);
1851   failAllWrites(ex);
1852
1853   if (readCallback_) {
1854     ReadCallback* callback = readCallback_;
1855     readCallback_ = nullptr;
1856     callback->readErr(ex);
1857   }
1858 }
1859
1860 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1861   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1862              << state_ << " host=" << addr_.describe()
1863              << "): failed in " << fn << "(): "
1864              << ex.what();
1865   startFail();
1866   finishFail();
1867 }
1868
1869 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1870   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1871                << state_ << " host=" << addr_.describe()
1872                << "): failed while connecting in " << fn << "(): "
1873                << ex.what();
1874   startFail();
1875
1876   invokeConnectErr(ex);
1877   finishFail();
1878 }
1879
1880 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1881   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1882                << state_ << " host=" << addr_.describe()
1883                << "): failed while reading in " << fn << "(): "
1884                << ex.what();
1885   startFail();
1886
1887   if (readCallback_ != nullptr) {
1888     ReadCallback* callback = readCallback_;
1889     readCallback_ = nullptr;
1890     callback->readErr(ex);
1891   }
1892
1893   finishFail();
1894 }
1895
1896 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1897   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1898                << state_ << " host=" << addr_.describe()
1899                << "): failed while writing in " << fn << "(): "
1900                << ex.what();
1901   startFail();
1902
1903   // Only invoke the first write callback, since the error occurred while
1904   // writing this request.  Let any other pending write callbacks be invoked in
1905   // finishFail().
1906   if (writeReqHead_ != nullptr) {
1907     WriteRequest* req = writeReqHead_;
1908     writeReqHead_ = req->getNext();
1909     WriteCallback* callback = req->getCallback();
1910     uint32_t bytesWritten = req->getTotalBytesWritten();
1911     req->destroy();
1912     if (callback) {
1913       callback->writeErr(bytesWritten, ex);
1914     }
1915   }
1916
1917   finishFail();
1918 }
1919
1920 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1921                              size_t bytesWritten,
1922                              const AsyncSocketException& ex) {
1923   // This version of failWrite() is used when the failure occurs before
1924   // we've added the callback to writeReqHead_.
1925   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1926              << state_ << " host=" << addr_.describe()
1927              <<"): failed while writing in " << fn << "(): "
1928              << ex.what();
1929   startFail();
1930
1931   if (callback != nullptr) {
1932     callback->writeErr(bytesWritten, ex);
1933   }
1934
1935   finishFail();
1936 }
1937
1938 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1939   // Invoke writeError() on all write callbacks.
1940   // This is used when writes are forcibly shutdown with write requests
1941   // pending, or when an error occurs with writes pending.
1942   while (writeReqHead_ != nullptr) {
1943     WriteRequest* req = writeReqHead_;
1944     writeReqHead_ = req->getNext();
1945     WriteCallback* callback = req->getCallback();
1946     if (callback) {
1947       callback->writeErr(req->getTotalBytesWritten(), ex);
1948     }
1949     req->destroy();
1950   }
1951 }
1952
1953 void AsyncSocket::invalidState(ConnectCallback* callback) {
1954   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1955              << "): connect() called in invalid state " << state_;
1956
1957   /*
1958    * The invalidState() methods don't use the normal failure mechanisms,
1959    * since we don't know what state we are in.  We don't want to call
1960    * startFail()/finishFail() recursively if we are already in the middle of
1961    * cleaning up.
1962    */
1963
1964   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1965                          "connect() called with socket in invalid state");
1966   connectEndTime_ = std::chrono::steady_clock::now();
1967   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1968     if (callback) {
1969       callback->connectErr(ex);
1970     }
1971   } else {
1972     // We can't use failConnect() here since connectCallback_
1973     // may already be set to another callback.  Invoke this ConnectCallback
1974     // here; any other connectCallback_ will be invoked in finishFail()
1975     startFail();
1976     if (callback) {
1977       callback->connectErr(ex);
1978     }
1979     finishFail();
1980   }
1981 }
1982
1983 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1984   connectEndTime_ = std::chrono::steady_clock::now();
1985   if (connectCallback_) {
1986     ConnectCallback* callback = connectCallback_;
1987     connectCallback_ = nullptr;
1988     callback->connectErr(ex);
1989   }
1990 }
1991
1992 void AsyncSocket::invokeConnectSuccess() {
1993   connectEndTime_ = std::chrono::steady_clock::now();
1994   if (connectCallback_) {
1995     ConnectCallback* callback = connectCallback_;
1996     connectCallback_ = nullptr;
1997     callback->connectSuccess();
1998   }
1999 }
2000
2001 void AsyncSocket::invalidState(ReadCallback* callback) {
2002   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2003              << "): setReadCallback(" << callback
2004              << ") called in invalid state " << state_;
2005
2006   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2007                          "setReadCallback() called with socket in "
2008                          "invalid state");
2009   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2010     if (callback) {
2011       callback->readErr(ex);
2012     }
2013   } else {
2014     startFail();
2015     if (callback) {
2016       callback->readErr(ex);
2017     }
2018     finishFail();
2019   }
2020 }
2021
2022 void AsyncSocket::invalidState(WriteCallback* callback) {
2023   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2024              << "): write() called in invalid state " << state_;
2025
2026   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2027                          withAddr("write() called with socket in invalid state"));
2028   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2029     if (callback) {
2030       callback->writeErr(0, ex);
2031     }
2032   } else {
2033     startFail();
2034     if (callback) {
2035       callback->writeErr(0, ex);
2036     }
2037     finishFail();
2038   }
2039 }
2040
2041 void AsyncSocket::doClose() {
2042   if (fd_ == -1) return;
2043   if (shutdownSocketSet_) {
2044     shutdownSocketSet_->close(fd_);
2045   } else {
2046     ::close(fd_);
2047   }
2048   fd_ = -1;
2049 }
2050
2051 std::ostream& operator << (std::ostream& os,
2052                            const AsyncSocket::StateEnum& state) {
2053   os << static_cast<int>(state);
2054   return os;
2055 }
2056
2057 std::string AsyncSocket::withAddr(const std::string& s) {
2058   // Don't use addr_ directly because it may not be initialized
2059   // e.g. if constructed from fd
2060   folly::SocketAddress peer, local;
2061   try {
2062     getPeerAddress(&peer);
2063     getLocalAddress(&local);
2064   } catch (const std::exception&) {
2065     // ignore
2066   } catch (...) {
2067     // ignore
2068   }
2069   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2070 }
2071
2072 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2073   bufferCallback_ = cb;
2074 }
2075
2076 } // folly