Fix compiler warnings
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/Portability.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/Sockets.h>
25 #include <folly/portability/SysUio.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <boost/preprocessor/control/if.hpp>
33
34 using std::string;
35 using std::unique_ptr;
36
37 namespace folly {
38
39 // static members initializers
40 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
41
42 const AsyncSocketException socketClosedLocallyEx(
43     AsyncSocketException::END_OF_FILE, "socket closed locally");
44 const AsyncSocketException socketShutdownForWritesEx(
45     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
46
47 // TODO: It might help performance to provide a version of BytesWriteRequest that
48 // users could derive from, so we can avoid the extra allocation for each call
49 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
50 // protocols are currently templatized for transports.
51 //
52 // We would need the version for external users where they provide the iovec
53 // storage space, and only our internal version would allocate it at the end of
54 // the WriteRequest.
55
56 /* The default WriteRequest implementation, used for write(), writev() and
57  * writeChain()
58  *
59  * A new BytesWriteRequest operation is allocated on the heap for all write
60  * operations that cannot be completed immediately.
61  */
62 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
63  public:
64   static BytesWriteRequest* newRequest(AsyncSocket* socket,
65                                        WriteCallback* callback,
66                                        const iovec* ops,
67                                        uint32_t opCount,
68                                        uint32_t partialWritten,
69                                        uint32_t bytesWritten,
70                                        unique_ptr<IOBuf>&& ioBuf,
71                                        WriteFlags flags) {
72     assert(opCount > 0);
73     // Since we put a variable size iovec array at the end
74     // of each BytesWriteRequest, we have to manually allocate the memory.
75     void* buf = malloc(sizeof(BytesWriteRequest) +
76                        (opCount * sizeof(struct iovec)));
77     if (buf == nullptr) {
78       throw std::bad_alloc();
79     }
80
81     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
82                                       partialWritten, bytesWritten,
83                                       std::move(ioBuf), flags);
84   }
85
86   void destroy() override {
87     this->~BytesWriteRequest();
88     free(this);
89   }
90
91   WriteResult performWrite() override {
92     WriteFlags writeFlags = flags_;
93     if (getNext() != nullptr) {
94       writeFlags = writeFlags | WriteFlags::CORK;
95     }
96     return socket_->performWrite(
97         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
98   }
99
100   bool isComplete() override {
101     return opsWritten_ == getOpCount();
102   }
103
104   void consume() override {
105     // Advance opIndex_ forward by opsWritten_
106     opIndex_ += opsWritten_;
107     assert(opIndex_ < opCount_);
108
109     // If we've finished writing any IOBufs, release them
110     if (ioBuf_) {
111       for (uint32_t i = opsWritten_; i != 0; --i) {
112         assert(ioBuf_);
113         ioBuf_ = ioBuf_->pop();
114       }
115     }
116
117     // Move partialBytes_ forward into the current iovec buffer
118     struct iovec* currentOp = writeOps_ + opIndex_;
119     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
120     currentOp->iov_base =
121       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
122     currentOp->iov_len -= partialBytes_;
123
124     // Increment the totalBytesWritten_ count by bytesWritten_;
125     totalBytesWritten_ += bytesWritten_;
126   }
127
128  private:
129   BytesWriteRequest(AsyncSocket* socket,
130                     WriteCallback* callback,
131                     const struct iovec* ops,
132                     uint32_t opCount,
133                     uint32_t partialBytes,
134                     uint32_t bytesWritten,
135                     unique_ptr<IOBuf>&& ioBuf,
136                     WriteFlags flags)
137     : AsyncSocket::WriteRequest(socket, callback)
138     , opCount_(opCount)
139     , opIndex_(0)
140     , flags_(flags)
141     , ioBuf_(std::move(ioBuf))
142     , opsWritten_(0)
143     , partialBytes_(partialBytes)
144     , bytesWritten_(bytesWritten) {
145     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
146   }
147
148   // private destructor, to ensure callers use destroy()
149   ~BytesWriteRequest() override = default;
150
151   const struct iovec* getOps() const {
152     assert(opCount_ > opIndex_);
153     return writeOps_ + opIndex_;
154   }
155
156   uint32_t getOpCount() const {
157     assert(opCount_ > opIndex_);
158     return opCount_ - opIndex_;
159   }
160
161   uint32_t opCount_;            ///< number of entries in writeOps_
162   uint32_t opIndex_;            ///< current index into writeOps_
163   WriteFlags flags_;            ///< set for WriteFlags
164   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
165
166   // for consume(), how much we wrote on the last write
167   uint32_t opsWritten_;         ///< complete ops written
168   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
169   ssize_t bytesWritten_;        ///< bytes written altogether
170
171   struct iovec writeOps_[];     ///< write operation(s) list
172 };
173
174 AsyncSocket::AsyncSocket()
175   : eventBase_(nullptr)
176   , writeTimeout_(this, nullptr)
177   , ioHandler_(this, nullptr)
178   , immediateReadHandler_(this) {
179   VLOG(5) << "new AsyncSocket()";
180   init();
181 }
182
183 AsyncSocket::AsyncSocket(EventBase* evb)
184   : eventBase_(evb)
185   , writeTimeout_(this, evb)
186   , ioHandler_(this, evb)
187   , immediateReadHandler_(this) {
188   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
189   init();
190 }
191
192 AsyncSocket::AsyncSocket(EventBase* evb,
193                            const folly::SocketAddress& address,
194                            uint32_t connectTimeout)
195   : AsyncSocket(evb) {
196   connect(nullptr, address, connectTimeout);
197 }
198
199 AsyncSocket::AsyncSocket(EventBase* evb,
200                            const std::string& ip,
201                            uint16_t port,
202                            uint32_t connectTimeout)
203   : AsyncSocket(evb) {
204   connect(nullptr, ip, port, connectTimeout);
205 }
206
207 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
208   : eventBase_(evb)
209   , writeTimeout_(this, evb)
210   , ioHandler_(this, evb, fd)
211   , immediateReadHandler_(this) {
212   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
213           << fd << ")";
214   init();
215   fd_ = fd;
216   setCloseOnExec();
217   state_ = StateEnum::ESTABLISHED;
218 }
219
220 // init() method, since constructor forwarding isn't supported in most
221 // compilers yet.
222 void AsyncSocket::init() {
223   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
224   shutdownFlags_ = 0;
225   state_ = StateEnum::UNINIT;
226   eventFlags_ = EventHandler::NONE;
227   fd_ = -1;
228   sendTimeout_ = 0;
229   maxReadsPerEvent_ = 16;
230   connectCallback_ = nullptr;
231   readCallback_ = nullptr;
232   writeReqHead_ = nullptr;
233   writeReqTail_ = nullptr;
234   shutdownSocketSet_ = nullptr;
235   appBytesWritten_ = 0;
236   appBytesReceived_ = 0;
237 }
238
239 AsyncSocket::~AsyncSocket() {
240   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
241           << ", evb=" << eventBase_ << ", fd=" << fd_
242           << ", state=" << state_ << ")";
243 }
244
245 void AsyncSocket::destroy() {
246   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
247           << ", fd=" << fd_ << ", state=" << state_;
248   // When destroy is called, close the socket immediately
249   closeNow();
250
251   // Then call DelayedDestruction::destroy() to take care of
252   // whether or not we need immediate or delayed destruction
253   DelayedDestruction::destroy();
254 }
255
256 int AsyncSocket::detachFd() {
257   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
258           << ", evb=" << eventBase_ << ", state=" << state_
259           << ", events=" << std::hex << eventFlags_ << ")";
260   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
261   // actually close the descriptor.
262   if (shutdownSocketSet_) {
263     shutdownSocketSet_->remove(fd_);
264   }
265   int fd = fd_;
266   fd_ = -1;
267   // Call closeNow() to invoke all pending callbacks with an error.
268   closeNow();
269   // Update the EventHandler to stop using this fd.
270   // This can only be done after closeNow() unregisters the handler.
271   ioHandler_.changeHandlerFD(-1);
272   return fd;
273 }
274
275 const folly::SocketAddress& AsyncSocket::anyAddress() {
276   static const folly::SocketAddress anyAddress =
277     folly::SocketAddress("0.0.0.0", 0);
278   return anyAddress;
279 }
280
281 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
282   if (shutdownSocketSet_ == newSS) {
283     return;
284   }
285   if (shutdownSocketSet_ && fd_ != -1) {
286     shutdownSocketSet_->remove(fd_);
287   }
288   shutdownSocketSet_ = newSS;
289   if (shutdownSocketSet_ && fd_ != -1) {
290     shutdownSocketSet_->add(fd_);
291   }
292 }
293
294 void AsyncSocket::setCloseOnExec() {
295   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
296   if (rv != 0) {
297     auto errnoCopy = errno;
298     throw AsyncSocketException(
299         AsyncSocketException::INTERNAL_ERROR,
300         withAddr("failed to set close-on-exec flag"),
301         errnoCopy);
302   }
303 }
304
305 void AsyncSocket::connect(ConnectCallback* callback,
306                            const folly::SocketAddress& address,
307                            int timeout,
308                            const OptionMap &options,
309                            const folly::SocketAddress& bindAddr) noexcept {
310   DestructorGuard dg(this);
311   assert(eventBase_->isInEventBaseThread());
312
313   addr_ = address;
314
315   // Make sure we're in the uninitialized state
316   if (state_ != StateEnum::UNINIT) {
317     return invalidState(callback);
318   }
319
320   connectTimeout_ = std::chrono::milliseconds(timeout);
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       auto errnoCopy = errno;
341       throw AsyncSocketException(
342           AsyncSocketException::INTERNAL_ERROR,
343           withAddr("failed to create socket"),
344           errnoCopy);
345     }
346     if (shutdownSocketSet_) {
347       shutdownSocketSet_->add(fd_);
348     }
349     ioHandler_.changeHandlerFD(fd_);
350
351     setCloseOnExec();
352
353     // Put the socket in non-blocking mode
354     int flags = fcntl(fd_, F_GETFL, 0);
355     if (flags == -1) {
356       auto errnoCopy = errno;
357       throw AsyncSocketException(
358           AsyncSocketException::INTERNAL_ERROR,
359           withAddr("failed to get socket flags"),
360           errnoCopy);
361     }
362     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
363     if (rv == -1) {
364       auto errnoCopy = errno;
365       throw AsyncSocketException(
366           AsyncSocketException::INTERNAL_ERROR,
367           withAddr("failed to put socket in non-blocking mode"),
368           errnoCopy);
369     }
370
371 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
372     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
373     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
374     if (rv == -1) {
375       auto errnoCopy = errno;
376       throw AsyncSocketException(
377           AsyncSocketException::INTERNAL_ERROR,
378           "failed to enable F_SETNOSIGPIPE on socket",
379           errnoCopy);
380     }
381 #endif
382
383     // By default, turn on TCP_NODELAY
384     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
385     // setNoDelay() will log an error message if it fails.
386     if (address.getFamily() != AF_UNIX) {
387       (void)setNoDelay(true);
388     }
389
390     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
391             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
392
393     // bind the socket
394     if (bindAddr != anyAddress()) {
395       int one = 1;
396       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
397         auto errnoCopy = errno;
398         doClose();
399         throw AsyncSocketException(
400             AsyncSocketException::NOT_OPEN,
401             "failed to setsockopt prior to bind on " + bindAddr.describe(),
402             errnoCopy);
403       }
404
405       bindAddr.getAddress(&addrStorage);
406
407       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
408         auto errnoCopy = errno;
409         doClose();
410         throw AsyncSocketException(
411             AsyncSocketException::NOT_OPEN,
412             "failed to bind to async socket: " + bindAddr.describe(),
413             errnoCopy);
414       }
415     }
416
417     // Apply the additional options if any.
418     for (const auto& opt: options) {
419       int rv = opt.first.apply(fd_, opt.second);
420       if (rv != 0) {
421         auto errnoCopy = errno;
422         throw AsyncSocketException(
423             AsyncSocketException::INTERNAL_ERROR,
424             withAddr("failed to set socket option"),
425             errnoCopy);
426       }
427     }
428
429     // Perform the connect()
430     address.getAddress(&addrStorage);
431
432     if (tfoEnabled_) {
433       state_ = StateEnum::FAST_OPEN;
434       tfoAttempted_ = true;
435     } else {
436       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
437         return;
438       }
439     }
440
441     // If we're still here the connect() succeeded immediately.
442     // Fall through to call the callback outside of this try...catch block
443   } catch (const AsyncSocketException& ex) {
444     return failConnect(__func__, ex);
445   } catch (const std::exception& ex) {
446     // shouldn't happen, but handle it just in case
447     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
448                << "): unexpected " << typeid(ex).name() << " exception: "
449                << ex.what();
450     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
451                             withAddr(string("unexpected exception: ") +
452                                      ex.what()));
453     return failConnect(__func__, tex);
454   }
455
456   // The connection succeeded immediately
457   // The read callback may not have been set yet, and no writes may be pending
458   // yet, so we don't have to register for any events at the moment.
459   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
460   assert(readCallback_ == nullptr);
461   assert(writeReqHead_ == nullptr);
462   if (state_ != StateEnum::FAST_OPEN) {
463     state_ = StateEnum::ESTABLISHED;
464   }
465   invokeConnectSuccess();
466 }
467
468 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
469   int rv = ::connect(fd_, saddr, len);
470   if (rv < 0) {
471     auto errnoCopy = errno;
472     if (errnoCopy == EINPROGRESS) {
473       scheduleConnectTimeoutAndRegisterForEvents();
474     } else {
475       throw AsyncSocketException(
476           AsyncSocketException::NOT_OPEN,
477           "connect failed (immediately)",
478           errnoCopy);
479     }
480   }
481   return rv;
482 }
483
484 void AsyncSocket::scheduleConnectTimeoutAndRegisterForEvents() {
485   // Connection in progress.
486   int timeout = connectTimeout_.count();
487   if (timeout > 0) {
488     // Start a timer in case the connection takes too long.
489     if (!writeTimeout_.scheduleTimeout(timeout)) {
490       throw AsyncSocketException(
491           AsyncSocketException::INTERNAL_ERROR,
492           withAddr("failed to schedule AsyncSocket connect timeout"));
493     }
494   }
495
496   // Register for write events, so we'll
497   // be notified when the connection finishes/fails.
498   // Note that we don't register for a persistent event here.
499   assert(eventFlags_ == EventHandler::NONE);
500   eventFlags_ = EventHandler::WRITE;
501   if (!ioHandler_.registerHandler(eventFlags_)) {
502     throw AsyncSocketException(
503         AsyncSocketException::INTERNAL_ERROR,
504         withAddr("failed to register AsyncSocket connect handler"));
505   }
506 }
507
508 void AsyncSocket::connect(ConnectCallback* callback,
509                            const string& ip, uint16_t port,
510                            int timeout,
511                            const OptionMap &options) noexcept {
512   DestructorGuard dg(this);
513   try {
514     connectCallback_ = callback;
515     connect(callback, folly::SocketAddress(ip, port), timeout, options);
516   } catch (const std::exception& ex) {
517     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
518                             ex.what());
519     return failConnect(__func__, tex);
520   }
521 }
522
523 void AsyncSocket::cancelConnect() {
524   connectCallback_ = nullptr;
525   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
526     closeNow();
527   }
528 }
529
530 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
531   sendTimeout_ = milliseconds;
532   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
533
534   // If we are currently pending on write requests, immediately update
535   // writeTimeout_ with the new value.
536   if ((eventFlags_ & EventHandler::WRITE) &&
537       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
538     assert(state_ == StateEnum::ESTABLISHED);
539     assert((shutdownFlags_ & SHUT_WRITE) == 0);
540     if (sendTimeout_ > 0) {
541       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
542         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
543             withAddr("failed to reschedule send timeout in setSendTimeout"));
544         return failWrite(__func__, ex);
545       }
546     } else {
547       writeTimeout_.cancelTimeout();
548     }
549   }
550 }
551
552 void AsyncSocket::setReadCB(ReadCallback *callback) {
553   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
554           << ", callback=" << callback << ", state=" << state_;
555
556   // Short circuit if callback is the same as the existing readCallback_.
557   //
558   // Note that this is needed for proper functioning during some cleanup cases.
559   // During cleanup we allow setReadCallback(nullptr) to be called even if the
560   // read callback is already unset and we have been detached from an event
561   // base.  This check prevents us from asserting
562   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
563   if (callback == readCallback_) {
564     return;
565   }
566
567   /* We are removing a read callback */
568   if (callback == nullptr &&
569       immediateReadHandler_.isLoopCallbackScheduled()) {
570     immediateReadHandler_.cancelLoopCallback();
571   }
572
573   if (shutdownFlags_ & SHUT_READ) {
574     // Reads have already been shut down on this socket.
575     //
576     // Allow setReadCallback(nullptr) to be called in this case, but don't
577     // allow a new callback to be set.
578     //
579     // For example, setReadCallback(nullptr) can happen after an error if we
580     // invoke some other error callback before invoking readError().  The other
581     // error callback that is invoked first may go ahead and clear the read
582     // callback before we get a chance to invoke readError().
583     if (callback != nullptr) {
584       return invalidState(callback);
585     }
586     assert((eventFlags_ & EventHandler::READ) == 0);
587     readCallback_ = nullptr;
588     return;
589   }
590
591   DestructorGuard dg(this);
592   assert(eventBase_->isInEventBaseThread());
593
594   switch ((StateEnum)state_) {
595     case StateEnum::CONNECTING:
596     case StateEnum::FAST_OPEN:
597       // For convenience, we allow the read callback to be set while we are
598       // still connecting.  We just store the callback for now.  Once the
599       // connection completes we'll register for read events.
600       readCallback_ = callback;
601       return;
602     case StateEnum::ESTABLISHED:
603     {
604       readCallback_ = callback;
605       uint16_t oldFlags = eventFlags_;
606       if (readCallback_) {
607         eventFlags_ |= EventHandler::READ;
608       } else {
609         eventFlags_ &= ~EventHandler::READ;
610       }
611
612       // Update our registration if our flags have changed
613       if (eventFlags_ != oldFlags) {
614         // We intentionally ignore the return value here.
615         // updateEventRegistration() will move us into the error state if it
616         // fails, and we don't need to do anything else here afterwards.
617         (void)updateEventRegistration();
618       }
619
620       if (readCallback_) {
621         checkForImmediateRead();
622       }
623       return;
624     }
625     case StateEnum::CLOSED:
626     case StateEnum::ERROR:
627       // We should never reach here.  SHUT_READ should always be set
628       // if we are in STATE_CLOSED or STATE_ERROR.
629       assert(false);
630       return invalidState(callback);
631     case StateEnum::UNINIT:
632       // We do not allow setReadCallback() to be called before we start
633       // connecting.
634       return invalidState(callback);
635   }
636
637   // We don't put a default case in the switch statement, so that the compiler
638   // will warn us to update the switch statement if a new state is added.
639   return invalidState(callback);
640 }
641
642 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
643   return readCallback_;
644 }
645
646 void AsyncSocket::write(WriteCallback* callback,
647                          const void* buf, size_t bytes, WriteFlags flags) {
648   iovec op;
649   op.iov_base = const_cast<void*>(buf);
650   op.iov_len = bytes;
651   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
652 }
653
654 void AsyncSocket::writev(WriteCallback* callback,
655                           const iovec* vec,
656                           size_t count,
657                           WriteFlags flags) {
658   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
659 }
660
661 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
662                               WriteFlags flags) {
663   constexpr size_t kSmallSizeMax = 64;
664   size_t count = buf->countChainElements();
665   if (count <= kSmallSizeMax) {
666
667     // suppress "warning: variable length array â€˜vec’ is used [-Wvla]"
668     FOLLY_PUSH_WARNING;
669     FOLLY_GCC_DISABLE_WARNING(vla);
670     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
671     FOLLY_POP_WARNING;
672
673     writeChainImpl(callback, vec, count, std::move(buf), flags);
674   } else {
675     iovec* vec = new iovec[count];
676     writeChainImpl(callback, vec, count, std::move(buf), flags);
677     delete[] vec;
678   }
679 }
680
681 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
682     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
683   size_t veclen = buf->fillIov(vec, count);
684   writeImpl(callback, vec, veclen, std::move(buf), flags);
685 }
686
687 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
688                              size_t count, unique_ptr<IOBuf>&& buf,
689                              WriteFlags flags) {
690   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
691           << ", callback=" << callback << ", count=" << count
692           << ", state=" << state_;
693   DestructorGuard dg(this);
694   unique_ptr<IOBuf>ioBuf(std::move(buf));
695   assert(eventBase_->isInEventBaseThread());
696
697   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
698     // No new writes may be performed after the write side of the socket has
699     // been shutdown.
700     //
701     // We could just call callback->writeError() here to fail just this write.
702     // However, fail hard and use invalidState() to fail all outstanding
703     // callbacks and move the socket into the error state.  There's most likely
704     // a bug in the caller's code, so we abort everything rather than trying to
705     // proceed as best we can.
706     return invalidState(callback);
707   }
708
709   uint32_t countWritten = 0;
710   uint32_t partialWritten = 0;
711   int bytesWritten = 0;
712   bool mustRegister = false;
713   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
714       !connecting()) {
715     if (writeReqHead_ == nullptr) {
716       // If we are established and there are no other writes pending,
717       // we can attempt to perform the write immediately.
718       assert(writeReqTail_ == nullptr);
719       assert((eventFlags_ & EventHandler::WRITE) == 0);
720
721       auto writeResult =
722           performWrite(vec, count, flags, &countWritten, &partialWritten);
723       bytesWritten = writeResult.writeReturn;
724       if (bytesWritten < 0) {
725         auto errnoCopy = errno;
726         if (writeResult.exception) {
727           return failWrite(__func__, callback, 0, *writeResult.exception);
728         }
729         AsyncSocketException ex(
730             AsyncSocketException::INTERNAL_ERROR,
731             withAddr("writev failed"),
732             errnoCopy);
733         return failWrite(__func__, callback, 0, ex);
734       } else if (countWritten == count) {
735         // We successfully wrote everything.
736         // Invoke the callback and return.
737         if (callback) {
738           callback->writeSuccess();
739         }
740         return;
741       } else { // continue writing the next writeReq
742         if (bufferCallback_) {
743           bufferCallback_->onEgressBuffered();
744         }
745       }
746       if (!connecting()) {
747         // Writes might put the socket back into connecting state
748         // if TFO is enabled, and using TFO fails.
749         // This means that write timeouts would not be active, however
750         // connect timeouts would affect this stage.
751         mustRegister = true;
752       }
753     }
754   } else if (!connecting()) {
755     // Invalid state for writing
756     return invalidState(callback);
757   }
758
759   // Create a new WriteRequest to add to the queue
760   WriteRequest* req;
761   try {
762     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
763                                         count - countWritten, partialWritten,
764                                         bytesWritten, std::move(ioBuf), flags);
765   } catch (const std::exception& ex) {
766     // we mainly expect to catch std::bad_alloc here
767     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
768         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
769     return failWrite(__func__, callback, bytesWritten, tex);
770   }
771   req->consume();
772   if (writeReqTail_ == nullptr) {
773     assert(writeReqHead_ == nullptr);
774     writeReqHead_ = writeReqTail_ = req;
775   } else {
776     writeReqTail_->append(req);
777     writeReqTail_ = req;
778   }
779
780   // Register for write events if are established and not currently
781   // waiting on write events
782   if (mustRegister) {
783     assert(state_ == StateEnum::ESTABLISHED);
784     assert((eventFlags_ & EventHandler::WRITE) == 0);
785     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
786       assert(state_ == StateEnum::ERROR);
787       return;
788     }
789     if (sendTimeout_ > 0) {
790       // Schedule a timeout to fire if the write takes too long.
791       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
792         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
793                                withAddr("failed to schedule send timeout"));
794         return failWrite(__func__, ex);
795       }
796     }
797   }
798 }
799
800 void AsyncSocket::writeRequest(WriteRequest* req) {
801   if (writeReqTail_ == nullptr) {
802     assert(writeReqHead_ == nullptr);
803     writeReqHead_ = writeReqTail_ = req;
804     req->start();
805   } else {
806     writeReqTail_->append(req);
807     writeReqTail_ = req;
808   }
809 }
810
811 void AsyncSocket::close() {
812   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
813           << ", state=" << state_ << ", shutdownFlags="
814           << std::hex << (int) shutdownFlags_;
815
816   // close() is only different from closeNow() when there are pending writes
817   // that need to drain before we can close.  In all other cases, just call
818   // closeNow().
819   //
820   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
821   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
822   // is still running.  (e.g., If there are multiple pending writes, and we
823   // call writeError() on the first one, it may call close().  In this case we
824   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
825   // writes will still be in the queue.)
826   //
827   // We only need to drain pending writes if we are still in STATE_CONNECTING
828   // or STATE_ESTABLISHED
829   if ((writeReqHead_ == nullptr) ||
830       !(state_ == StateEnum::CONNECTING ||
831       state_ == StateEnum::ESTABLISHED)) {
832     closeNow();
833     return;
834   }
835
836   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
837   // destroyed until close() returns.
838   DestructorGuard dg(this);
839   assert(eventBase_->isInEventBaseThread());
840
841   // Since there are write requests pending, we have to set the
842   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
843   // connect finishes and we finish writing these requests.
844   //
845   // Set SHUT_READ to indicate that reads are shut down, and set the
846   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
847   // pending writes complete.
848   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
849
850   // If a read callback is set, invoke readEOF() immediately to inform it that
851   // the socket has been closed and no more data can be read.
852   if (readCallback_) {
853     // Disable reads if they are enabled
854     if (!updateEventRegistration(0, EventHandler::READ)) {
855       // We're now in the error state; callbacks have been cleaned up
856       assert(state_ == StateEnum::ERROR);
857       assert(readCallback_ == nullptr);
858     } else {
859       ReadCallback* callback = readCallback_;
860       readCallback_ = nullptr;
861       callback->readEOF();
862     }
863   }
864 }
865
866 void AsyncSocket::closeNow() {
867   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
868           << ", state=" << state_ << ", shutdownFlags="
869           << std::hex << (int) shutdownFlags_;
870   DestructorGuard dg(this);
871   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
872
873   switch (state_) {
874     case StateEnum::ESTABLISHED:
875     case StateEnum::CONNECTING:
876     case StateEnum::FAST_OPEN: {
877       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
878       state_ = StateEnum::CLOSED;
879
880       // If the write timeout was set, cancel it.
881       writeTimeout_.cancelTimeout();
882
883       // If we are registered for I/O events, unregister.
884       if (eventFlags_ != EventHandler::NONE) {
885         eventFlags_ = EventHandler::NONE;
886         if (!updateEventRegistration()) {
887           // We will have been moved into the error state.
888           assert(state_ == StateEnum::ERROR);
889           return;
890         }
891       }
892
893       if (immediateReadHandler_.isLoopCallbackScheduled()) {
894         immediateReadHandler_.cancelLoopCallback();
895       }
896
897       if (fd_ >= 0) {
898         ioHandler_.changeHandlerFD(-1);
899         doClose();
900       }
901
902       invokeConnectErr(socketClosedLocallyEx);
903
904       failAllWrites(socketClosedLocallyEx);
905
906       if (readCallback_) {
907         ReadCallback* callback = readCallback_;
908         readCallback_ = nullptr;
909         callback->readEOF();
910       }
911       return;
912     }
913     case StateEnum::CLOSED:
914       // Do nothing.  It's possible that we are being called recursively
915       // from inside a callback that we invoked inside another call to close()
916       // that is still running.
917       return;
918     case StateEnum::ERROR:
919       // Do nothing.  The error handling code has performed (or is performing)
920       // cleanup.
921       return;
922     case StateEnum::UNINIT:
923       assert(eventFlags_ == EventHandler::NONE);
924       assert(connectCallback_ == nullptr);
925       assert(readCallback_ == nullptr);
926       assert(writeReqHead_ == nullptr);
927       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
928       state_ = StateEnum::CLOSED;
929       return;
930   }
931
932   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
933               << ") called in unknown state " << state_;
934 }
935
936 void AsyncSocket::closeWithReset() {
937   // Enable SO_LINGER, with the linger timeout set to 0.
938   // This will trigger a TCP reset when we close the socket.
939   if (fd_ >= 0) {
940     struct linger optLinger = {1, 0};
941     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
942       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
943               << "on " << fd_ << ": errno=" << errno;
944     }
945   }
946
947   // Then let closeNow() take care of the rest
948   closeNow();
949 }
950
951 void AsyncSocket::shutdownWrite() {
952   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
953           << ", state=" << state_ << ", shutdownFlags="
954           << std::hex << (int) shutdownFlags_;
955
956   // If there are no pending writes, shutdownWrite() is identical to
957   // shutdownWriteNow().
958   if (writeReqHead_ == nullptr) {
959     shutdownWriteNow();
960     return;
961   }
962
963   assert(eventBase_->isInEventBaseThread());
964
965   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
966   // shutdown will be performed once all writes complete.
967   shutdownFlags_ |= SHUT_WRITE_PENDING;
968 }
969
970 void AsyncSocket::shutdownWriteNow() {
971   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
972           << ", fd=" << fd_ << ", state=" << state_
973           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
974
975   if (shutdownFlags_ & SHUT_WRITE) {
976     // Writes are already shutdown; nothing else to do.
977     return;
978   }
979
980   // If SHUT_READ is already set, just call closeNow() to completely
981   // close the socket.  This can happen if close() was called with writes
982   // pending, and then shutdownWriteNow() is called before all pending writes
983   // complete.
984   if (shutdownFlags_ & SHUT_READ) {
985     closeNow();
986     return;
987   }
988
989   DestructorGuard dg(this);
990   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
991
992   switch (static_cast<StateEnum>(state_)) {
993     case StateEnum::ESTABLISHED:
994     {
995       shutdownFlags_ |= SHUT_WRITE;
996
997       // If the write timeout was set, cancel it.
998       writeTimeout_.cancelTimeout();
999
1000       // If we are registered for write events, unregister.
1001       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1002         // We will have been moved into the error state.
1003         assert(state_ == StateEnum::ERROR);
1004         return;
1005       }
1006
1007       // Shutdown writes on the file descriptor
1008       ::shutdown(fd_, SHUT_WR);
1009
1010       // Immediately fail all write requests
1011       failAllWrites(socketShutdownForWritesEx);
1012       return;
1013     }
1014     case StateEnum::CONNECTING:
1015     {
1016       // Set the SHUT_WRITE_PENDING flag.
1017       // When the connection completes, it will check this flag,
1018       // shutdown the write half of the socket, and then set SHUT_WRITE.
1019       shutdownFlags_ |= SHUT_WRITE_PENDING;
1020
1021       // Immediately fail all write requests
1022       failAllWrites(socketShutdownForWritesEx);
1023       return;
1024     }
1025     case StateEnum::UNINIT:
1026       // Callers normally shouldn't call shutdownWriteNow() before the socket
1027       // even starts connecting.  Nonetheless, go ahead and set
1028       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1029       // immediately shut down the write side of the socket.
1030       shutdownFlags_ |= SHUT_WRITE_PENDING;
1031       return;
1032     case StateEnum::FAST_OPEN:
1033       // In fast open state we haven't call connected yet, and if we shutdown
1034       // the writes, we will never try to call connect, so shut everything down
1035       shutdownFlags_ |= SHUT_WRITE;
1036       // Immediately fail all write requests
1037       failAllWrites(socketShutdownForWritesEx);
1038       return;
1039     case StateEnum::CLOSED:
1040     case StateEnum::ERROR:
1041       // We should never get here.  SHUT_WRITE should always be set
1042       // in STATE_CLOSED and STATE_ERROR.
1043       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1044                  << ", fd=" << fd_ << ") in unexpected state " << state_
1045                  << " with SHUT_WRITE not set ("
1046                  << std::hex << (int) shutdownFlags_ << ")";
1047       assert(false);
1048       return;
1049   }
1050
1051   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1052               << fd_ << ") called in unknown state " << state_;
1053 }
1054
1055 bool AsyncSocket::readable() const {
1056   if (fd_ == -1) {
1057     return false;
1058   }
1059   struct pollfd fds[1];
1060   fds[0].fd = fd_;
1061   fds[0].events = POLLIN;
1062   fds[0].revents = 0;
1063   int rc = poll(fds, 1, 0);
1064   return rc == 1;
1065 }
1066
1067 bool AsyncSocket::isPending() const {
1068   return ioHandler_.isPending();
1069 }
1070
1071 bool AsyncSocket::hangup() const {
1072   if (fd_ == -1) {
1073     // sanity check, no one should ask for hangup if we are not connected.
1074     assert(false);
1075     return false;
1076   }
1077 #ifdef POLLRDHUP // Linux-only
1078   struct pollfd fds[1];
1079   fds[0].fd = fd_;
1080   fds[0].events = POLLRDHUP|POLLHUP;
1081   fds[0].revents = 0;
1082   poll(fds, 1, 0);
1083   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1084 #else
1085   return false;
1086 #endif
1087 }
1088
1089 bool AsyncSocket::good() const {
1090   return (
1091       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1092        state_ == StateEnum::ESTABLISHED) &&
1093       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1094 }
1095
1096 bool AsyncSocket::error() const {
1097   return (state_ == StateEnum::ERROR);
1098 }
1099
1100 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1101   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1102           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1103           << ", state=" << state_ << ", events="
1104           << std::hex << eventFlags_ << ")";
1105   assert(eventBase_ == nullptr);
1106   assert(eventBase->isInEventBaseThread());
1107
1108   eventBase_ = eventBase;
1109   ioHandler_.attachEventBase(eventBase);
1110   writeTimeout_.attachEventBase(eventBase);
1111 }
1112
1113 void AsyncSocket::detachEventBase() {
1114   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1115           << ", old evb=" << eventBase_ << ", state=" << state_
1116           << ", events=" << std::hex << eventFlags_ << ")";
1117   assert(eventBase_ != nullptr);
1118   assert(eventBase_->isInEventBaseThread());
1119
1120   eventBase_ = nullptr;
1121   ioHandler_.detachEventBase();
1122   writeTimeout_.detachEventBase();
1123 }
1124
1125 bool AsyncSocket::isDetachable() const {
1126   DCHECK(eventBase_ != nullptr);
1127   DCHECK(eventBase_->isInEventBaseThread());
1128
1129   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1130 }
1131
1132 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1133   if (!localAddr_.isInitialized()) {
1134     localAddr_.setFromLocalAddress(fd_);
1135   }
1136   *address = localAddr_;
1137 }
1138
1139 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1140   if (!addr_.isInitialized()) {
1141     addr_.setFromPeerAddress(fd_);
1142   }
1143   *address = addr_;
1144 }
1145
1146 int AsyncSocket::setNoDelay(bool noDelay) {
1147   if (fd_ < 0) {
1148     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1149                << this << "(state=" << state_ << ")";
1150     return EINVAL;
1151
1152   }
1153
1154   int value = noDelay ? 1 : 0;
1155   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1156     int errnoCopy = errno;
1157     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1158             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1159             << strerror(errnoCopy);
1160     return errnoCopy;
1161   }
1162
1163   return 0;
1164 }
1165
1166 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1167
1168   #ifndef TCP_CONGESTION
1169   #define TCP_CONGESTION  13
1170   #endif
1171
1172   if (fd_ < 0) {
1173     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1174                << "socket " << this << "(state=" << state_ << ")";
1175     return EINVAL;
1176
1177   }
1178
1179   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1180         cname.length() + 1) != 0) {
1181     int errnoCopy = errno;
1182     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1183             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1184             << strerror(errnoCopy);
1185     return errnoCopy;
1186   }
1187
1188   return 0;
1189 }
1190
1191 int AsyncSocket::setQuickAck(bool quickack) {
1192   if (fd_ < 0) {
1193     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1194                << this << "(state=" << state_ << ")";
1195     return EINVAL;
1196
1197   }
1198
1199 #ifdef TCP_QUICKACK // Linux-only
1200   int value = quickack ? 1 : 0;
1201   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1202     int errnoCopy = errno;
1203     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1204             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1205             << strerror(errnoCopy);
1206     return errnoCopy;
1207   }
1208
1209   return 0;
1210 #else
1211   return ENOSYS;
1212 #endif
1213 }
1214
1215 int AsyncSocket::setSendBufSize(size_t bufsize) {
1216   if (fd_ < 0) {
1217     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1218                << this << "(state=" << state_ << ")";
1219     return EINVAL;
1220   }
1221
1222   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1223     int errnoCopy = errno;
1224     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1225             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1226             << strerror(errnoCopy);
1227     return errnoCopy;
1228   }
1229
1230   return 0;
1231 }
1232
1233 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1234   if (fd_ < 0) {
1235     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1236                << this << "(state=" << state_ << ")";
1237     return EINVAL;
1238   }
1239
1240   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1241     int errnoCopy = errno;
1242     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1243             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1244             << strerror(errnoCopy);
1245     return errnoCopy;
1246   }
1247
1248   return 0;
1249 }
1250
1251 int AsyncSocket::setTCPProfile(int profd) {
1252   if (fd_ < 0) {
1253     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1254                << this << "(state=" << state_ << ")";
1255     return EINVAL;
1256   }
1257
1258   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1259     int errnoCopy = errno;
1260     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1261             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1262             << strerror(errnoCopy);
1263     return errnoCopy;
1264   }
1265
1266   return 0;
1267 }
1268
1269 void AsyncSocket::ioReady(uint16_t events) noexcept {
1270   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1271           << ", events=" << std::hex << events << ", state=" << state_;
1272   DestructorGuard dg(this);
1273   assert(events & EventHandler::READ_WRITE);
1274   assert(eventBase_->isInEventBaseThread());
1275
1276   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1277   if (relevantEvents == EventHandler::READ) {
1278     handleRead();
1279   } else if (relevantEvents == EventHandler::WRITE) {
1280     handleWrite();
1281   } else if (relevantEvents == EventHandler::READ_WRITE) {
1282     EventBase* originalEventBase = eventBase_;
1283     // If both read and write events are ready, process writes first.
1284     handleWrite();
1285
1286     // Return now if handleWrite() detached us from our EventBase
1287     if (eventBase_ != originalEventBase) {
1288       return;
1289     }
1290
1291     // Only call handleRead() if a read callback is still installed.
1292     // (It's possible that the read callback was uninstalled during
1293     // handleWrite().)
1294     if (readCallback_) {
1295       handleRead();
1296     }
1297   } else {
1298     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1299                << std::hex << events << "(this=" << this << ")";
1300     abort();
1301   }
1302 }
1303
1304 AsyncSocket::ReadResult
1305 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1306   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1307           << ", buflen=" << *buflen;
1308
1309   int recvFlags = 0;
1310   if (peek_) {
1311     recvFlags |= MSG_PEEK;
1312   }
1313
1314   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1315   if (bytes < 0) {
1316     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1317       // No more data to read right now.
1318       return ReadResult(READ_BLOCKING);
1319     } else {
1320       return ReadResult(READ_ERROR);
1321     }
1322   } else {
1323     appBytesReceived_ += bytes;
1324     return ReadResult(bytes);
1325   }
1326 }
1327
1328 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1329   // no matter what, buffer should be preapared for non-ssl socket
1330   CHECK(readCallback_);
1331   readCallback_->getReadBuffer(buf, buflen);
1332 }
1333
1334 void AsyncSocket::handleRead() noexcept {
1335   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1336           << ", state=" << state_;
1337   assert(state_ == StateEnum::ESTABLISHED);
1338   assert((shutdownFlags_ & SHUT_READ) == 0);
1339   assert(readCallback_ != nullptr);
1340   assert(eventFlags_ & EventHandler::READ);
1341
1342   // Loop until:
1343   // - a read attempt would block
1344   // - readCallback_ is uninstalled
1345   // - the number of loop iterations exceeds the optional maximum
1346   // - this AsyncSocket is moved to another EventBase
1347   //
1348   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1349   // which is why need to check for it here.
1350   //
1351   // The last bullet point is slightly subtle.  readDataAvailable() may also
1352   // detach this socket from this EventBase.  However, before
1353   // readDataAvailable() returns another thread may pick it up, attach it to
1354   // a different EventBase, and install another readCallback_.  We need to
1355   // exit immediately after readDataAvailable() returns if the eventBase_ has
1356   // changed.  (The caller must perform some sort of locking to transfer the
1357   // AsyncSocket between threads properly.  This will be sufficient to ensure
1358   // that this thread sees the updated eventBase_ variable after
1359   // readDataAvailable() returns.)
1360   uint16_t numReads = 0;
1361   EventBase* originalEventBase = eventBase_;
1362   while (readCallback_ && eventBase_ == originalEventBase) {
1363     // Get the buffer to read into.
1364     void* buf = nullptr;
1365     size_t buflen = 0, offset = 0;
1366     try {
1367       prepareReadBuffer(&buf, &buflen);
1368       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1369     } catch (const AsyncSocketException& ex) {
1370       return failRead(__func__, ex);
1371     } catch (const std::exception& ex) {
1372       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1373                               string("ReadCallback::getReadBuffer() "
1374                                      "threw exception: ") +
1375                               ex.what());
1376       return failRead(__func__, tex);
1377     } catch (...) {
1378       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1379                              "ReadCallback::getReadBuffer() threw "
1380                              "non-exception type");
1381       return failRead(__func__, ex);
1382     }
1383     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1384       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1385                              "ReadCallback::getReadBuffer() returned "
1386                              "empty buffer");
1387       return failRead(__func__, ex);
1388     }
1389
1390     // Perform the read
1391     auto readResult = performRead(&buf, &buflen, &offset);
1392     auto bytesRead = readResult.readReturn;
1393     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1394             << bytesRead << " bytes";
1395     if (bytesRead > 0) {
1396       if (!isBufferMovable_) {
1397         readCallback_->readDataAvailable(bytesRead);
1398       } else {
1399         CHECK(kOpenSslModeMoveBufferOwnership);
1400         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1401                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1402                 << ", offset=" << offset;
1403         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1404         readBuf->trimStart(offset);
1405         readBuf->trimEnd(buflen - offset - bytesRead);
1406         readCallback_->readBufferAvailable(std::move(readBuf));
1407       }
1408
1409       // Fall through and continue around the loop if the read
1410       // completely filled the available buffer.
1411       // Note that readCallback_ may have been uninstalled or changed inside
1412       // readDataAvailable().
1413       if (size_t(bytesRead) < buflen) {
1414         return;
1415       }
1416     } else if (bytesRead == READ_BLOCKING) {
1417         // No more data to read right now.
1418         return;
1419     } else if (bytesRead == READ_ERROR) {
1420       readErr_ = READ_ERROR;
1421       if (readResult.exception) {
1422         return failRead(__func__, *readResult.exception);
1423       }
1424       auto errnoCopy = errno;
1425       AsyncSocketException ex(
1426           AsyncSocketException::INTERNAL_ERROR,
1427           withAddr("recv() failed"),
1428           errnoCopy);
1429       return failRead(__func__, ex);
1430     } else {
1431       assert(bytesRead == READ_EOF);
1432       readErr_ = READ_EOF;
1433       // EOF
1434       shutdownFlags_ |= SHUT_READ;
1435       if (!updateEventRegistration(0, EventHandler::READ)) {
1436         // we've already been moved into STATE_ERROR
1437         assert(state_ == StateEnum::ERROR);
1438         assert(readCallback_ == nullptr);
1439         return;
1440       }
1441
1442       ReadCallback* callback = readCallback_;
1443       readCallback_ = nullptr;
1444       callback->readEOF();
1445       return;
1446     }
1447     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1448       if (readCallback_ != nullptr) {
1449         // We might still have data in the socket.
1450         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1451         scheduleImmediateRead();
1452       }
1453       return;
1454     }
1455   }
1456 }
1457
1458 /**
1459  * This function attempts to write as much data as possible, until no more data
1460  * can be written.
1461  *
1462  * - If it sends all available data, it unregisters for write events, and stops
1463  *   the writeTimeout_.
1464  *
1465  * - If not all of the data can be sent immediately, it reschedules
1466  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1467  *   registered for write events.
1468  */
1469 void AsyncSocket::handleWrite() noexcept {
1470   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1471           << ", state=" << state_;
1472   DestructorGuard dg(this);
1473
1474   if (state_ == StateEnum::CONNECTING) {
1475     handleConnect();
1476     return;
1477   }
1478
1479   // Normal write
1480   assert(state_ == StateEnum::ESTABLISHED);
1481   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1482   assert(writeReqHead_ != nullptr);
1483
1484   // Loop until we run out of write requests,
1485   // or until this socket is moved to another EventBase.
1486   // (See the comment in handleRead() explaining how this can happen.)
1487   EventBase* originalEventBase = eventBase_;
1488   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1489     auto writeResult = writeReqHead_->performWrite();
1490     if (writeResult.writeReturn < 0) {
1491       if (writeResult.exception) {
1492         return failWrite(__func__, *writeResult.exception);
1493       }
1494       auto errnoCopy = errno;
1495       AsyncSocketException ex(
1496           AsyncSocketException::INTERNAL_ERROR,
1497           withAddr("writev() failed"),
1498           errnoCopy);
1499       return failWrite(__func__, ex);
1500     } else if (writeReqHead_->isComplete()) {
1501       // We finished this request
1502       WriteRequest* req = writeReqHead_;
1503       writeReqHead_ = req->getNext();
1504
1505       if (writeReqHead_ == nullptr) {
1506         writeReqTail_ = nullptr;
1507         // This is the last write request.
1508         // Unregister for write events and cancel the send timer
1509         // before we invoke the callback.  We have to update the state properly
1510         // before calling the callback, since it may want to detach us from
1511         // the EventBase.
1512         if (eventFlags_ & EventHandler::WRITE) {
1513           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1514             assert(state_ == StateEnum::ERROR);
1515             return;
1516           }
1517           // Stop the send timeout
1518           writeTimeout_.cancelTimeout();
1519         }
1520         assert(!writeTimeout_.isScheduled());
1521
1522         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1523         // we finish sending the last write request.
1524         //
1525         // We have to do this before invoking writeSuccess(), since
1526         // writeSuccess() may detach us from our EventBase.
1527         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1528           assert(connectCallback_ == nullptr);
1529           shutdownFlags_ |= SHUT_WRITE;
1530
1531           if (shutdownFlags_ & SHUT_READ) {
1532             // Reads have already been shutdown.  Fully close the socket and
1533             // move to STATE_CLOSED.
1534             //
1535             // Note: This code currently moves us to STATE_CLOSED even if
1536             // close() hasn't ever been called.  This can occur if we have
1537             // received EOF from the peer and shutdownWrite() has been called
1538             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1539             // case, until close() is actually called?  I can't think of a
1540             // reason why we would need to do so.  No other operations besides
1541             // calling close() or destroying the socket can be performed at
1542             // this point.
1543             assert(readCallback_ == nullptr);
1544             state_ = StateEnum::CLOSED;
1545             if (fd_ >= 0) {
1546               ioHandler_.changeHandlerFD(-1);
1547               doClose();
1548             }
1549           } else {
1550             // Reads are still enabled, so we are only doing a half-shutdown
1551             ::shutdown(fd_, SHUT_WR);
1552           }
1553         }
1554       }
1555
1556       // Invoke the callback
1557       WriteCallback* callback = req->getCallback();
1558       req->destroy();
1559       if (callback) {
1560         callback->writeSuccess();
1561       }
1562       // We'll continue around the loop, trying to write another request
1563     } else {
1564       // Partial write.
1565       if (bufferCallback_) {
1566         bufferCallback_->onEgressBuffered();
1567       }
1568       writeReqHead_->consume();
1569       // Stop after a partial write; it's highly likely that a subsequent write
1570       // attempt will just return EAGAIN.
1571       //
1572       // Ensure that we are registered for write events.
1573       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1574         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1575           assert(state_ == StateEnum::ERROR);
1576           return;
1577         }
1578       }
1579
1580       // Reschedule the send timeout, since we have made some write progress.
1581       if (sendTimeout_ > 0) {
1582         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1583           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1584               withAddr("failed to reschedule write timeout"));
1585           return failWrite(__func__, ex);
1586         }
1587       }
1588       return;
1589     }
1590   }
1591   if (!writeReqHead_ && bufferCallback_) {
1592     bufferCallback_->onEgressBufferCleared();
1593   }
1594 }
1595
1596 void AsyncSocket::checkForImmediateRead() noexcept {
1597   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1598   // (However, note that some subclasses do override this method.)
1599   //
1600   // Simply calling handleRead() here would be bad, as this would call
1601   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1602   // buffer even though no data may be available.  This would waste lots of
1603   // memory, since the buffer will sit around unused until the socket actually
1604   // becomes readable.
1605   //
1606   // Checking if the socket is readable now also seems like it would probably
1607   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1608   // would just waste an extra system call.  Even if it is readable, waiting to
1609   // find out from libevent on the next event loop doesn't seem that bad.
1610 }
1611
1612 void AsyncSocket::handleInitialReadWrite() noexcept {
1613   // Our callers should already be holding a DestructorGuard, but grab
1614   // one here just to make sure, in case one of our calling code paths ever
1615   // changes.
1616   DestructorGuard dg(this);
1617
1618   // If we have a readCallback_, make sure we enable read events.  We
1619   // may already be registered for reads if connectSuccess() set
1620   // the read calback.
1621   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1622     assert(state_ == StateEnum::ESTABLISHED);
1623     assert((shutdownFlags_ & SHUT_READ) == 0);
1624     if (!updateEventRegistration(EventHandler::READ, 0)) {
1625       assert(state_ == StateEnum::ERROR);
1626       return;
1627     }
1628     checkForImmediateRead();
1629   } else if (readCallback_ == nullptr) {
1630     // Unregister for read events.
1631     updateEventRegistration(0, EventHandler::READ);
1632   }
1633
1634   // If we have write requests pending, try to send them immediately.
1635   // Since we just finished accepting, there is a very good chance that we can
1636   // write without blocking.
1637   //
1638   // However, we only process them if EventHandler::WRITE is not already set,
1639   // which means that we're already blocked on a write attempt.  (This can
1640   // happen if connectSuccess() called write() before returning.)
1641   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1642     // Call handleWrite() to perform write processing.
1643     handleWrite();
1644   } else if (writeReqHead_ == nullptr) {
1645     // Unregister for write event.
1646     updateEventRegistration(0, EventHandler::WRITE);
1647   }
1648 }
1649
1650 void AsyncSocket::handleConnect() noexcept {
1651   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1652           << ", state=" << state_;
1653   assert(state_ == StateEnum::CONNECTING);
1654   // SHUT_WRITE can never be set while we are still connecting;
1655   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1656   // finishes
1657   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1658
1659   // In case we had a connect timeout, cancel the timeout
1660   writeTimeout_.cancelTimeout();
1661   // We don't use a persistent registration when waiting on a connect event,
1662   // so we have been automatically unregistered now.  Update eventFlags_ to
1663   // reflect reality.
1664   assert(eventFlags_ == EventHandler::WRITE);
1665   eventFlags_ = EventHandler::NONE;
1666
1667   // Call getsockopt() to check if the connect succeeded
1668   int error;
1669   socklen_t len = sizeof(error);
1670   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1671   if (rv != 0) {
1672     auto errnoCopy = errno;
1673     AsyncSocketException ex(
1674         AsyncSocketException::INTERNAL_ERROR,
1675         withAddr("error calling getsockopt() after connect"),
1676         errnoCopy);
1677     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1678                << fd_ << " host=" << addr_.describe()
1679                << ") exception:" << ex.what();
1680     return failConnect(__func__, ex);
1681   }
1682
1683   if (error != 0) {
1684     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1685                            "connect failed", error);
1686     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1687             << fd_ << " host=" << addr_.describe()
1688             << ") exception: " << ex.what();
1689     return failConnect(__func__, ex);
1690   }
1691
1692   // Move into STATE_ESTABLISHED
1693   state_ = StateEnum::ESTABLISHED;
1694
1695   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1696   // perform, immediately shutdown the write half of the socket.
1697   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1698     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1699     // are still connecting we just abort the connect rather than waiting for
1700     // it to complete.
1701     assert((shutdownFlags_ & SHUT_READ) == 0);
1702     ::shutdown(fd_, SHUT_WR);
1703     shutdownFlags_ |= SHUT_WRITE;
1704   }
1705
1706   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1707           << "successfully connected; state=" << state_;
1708
1709   // Remember the EventBase we are attached to, before we start invoking any
1710   // callbacks (since the callbacks may call detachEventBase()).
1711   EventBase* originalEventBase = eventBase_;
1712
1713   invokeConnectSuccess();
1714   // Note that the connect callback may have changed our state.
1715   // (set or unset the read callback, called write(), closed the socket, etc.)
1716   // The following code needs to handle these situations correctly.
1717   //
1718   // If the socket has been closed, readCallback_ and writeReqHead_ will
1719   // always be nullptr, so that will prevent us from trying to read or write.
1720   //
1721   // The main thing to check for is if eventBase_ is still originalEventBase.
1722   // If not, we have been detached from this event base, so we shouldn't
1723   // perform any more operations.
1724   if (eventBase_ != originalEventBase) {
1725     return;
1726   }
1727
1728   handleInitialReadWrite();
1729 }
1730
1731 void AsyncSocket::timeoutExpired() noexcept {
1732   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1733           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1734   DestructorGuard dg(this);
1735   assert(eventBase_->isInEventBaseThread());
1736
1737   if (state_ == StateEnum::CONNECTING) {
1738     // connect() timed out
1739     // Unregister for I/O events.
1740     if (connectCallback_) {
1741       AsyncSocketException ex(
1742           AsyncSocketException::TIMED_OUT, "connect timed out");
1743       failConnect(__func__, ex);
1744     } else {
1745       // we faced a connect error without a connect callback, which could
1746       // happen due to TFO.
1747       AsyncSocketException ex(
1748           AsyncSocketException::TIMED_OUT, "write timed out during connection");
1749       failWrite(__func__, ex);
1750     }
1751   } else {
1752     // a normal write operation timed out
1753     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1754     failWrite(__func__, ex);
1755   }
1756 }
1757
1758 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
1759   return detail::tfo_sendmsg(fd, msg, msg_flags);
1760 }
1761
1762 AsyncSocket::WriteResult
1763 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
1764   ssize_t totalWritten = 0;
1765   if (state_ == StateEnum::FAST_OPEN) {
1766     sockaddr_storage addr;
1767     auto len = addr_.getAddress(&addr);
1768     msg->msg_name = &addr;
1769     msg->msg_namelen = len;
1770     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
1771     if (totalWritten >= 0) {
1772       // Call tfo_succeeded to check if TFO was used.
1773       tfoSucceeded_ = detail::tfo_succeeded(fd_);
1774       if (errno != 0) {
1775         auto errnoCopy = errno;
1776         AsyncSocketException ex(
1777             AsyncSocketException::INTERNAL_ERROR,
1778             withAddr("error calling tfo_succeeded"),
1779             errnoCopy);
1780         return WriteResult(
1781             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1782       }
1783
1784       tfoFinished_ = true;
1785       state_ = StateEnum::ESTABLISHED;
1786       handleInitialReadWrite();
1787     } else if (errno == EINPROGRESS) {
1788       VLOG(4) << "TFO falling back to connecting";
1789       // A normal sendmsg doesn't return EINPROGRESS, however
1790       // TFO might fallback to connecting if there is no
1791       // cookie.
1792       state_ = StateEnum::CONNECTING;
1793       try {
1794         scheduleConnectTimeoutAndRegisterForEvents();
1795       } catch (const AsyncSocketException& ex) {
1796         return WriteResult(
1797             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1798       }
1799       // Let's fake it that no bytes were written and return an errno.
1800       errno = EAGAIN;
1801       totalWritten = -1;
1802     } else if (errno == EOPNOTSUPP) {
1803       VLOG(4) << "TFO not supported";
1804       // Try falling back to connecting.
1805       state_ = StateEnum::CONNECTING;
1806       try {
1807         int ret = socketConnect((const sockaddr*)&addr, len);
1808         if (ret == 0) {
1809           // connect succeeded immediately
1810           // Treat this like no data was written.
1811           state_ = StateEnum::ESTABLISHED;
1812           handleInitialReadWrite();
1813         }
1814         // If there was no exception during connections,
1815         // we would return that no bytes were written.
1816         errno = EAGAIN;
1817         totalWritten = -1;
1818       } catch (const AsyncSocketException& ex) {
1819         return WriteResult(
1820             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1821       }
1822     } else if (errno == EAGAIN) {
1823       // Normally sendmsg would indicate that the write would block.
1824       // However in the fast open case, it would indicate that sendmsg
1825       // fell back to a connect. This is a return code from connect()
1826       // instead, and is an error condition indicating no fds available.
1827       return WriteResult(
1828           WRITE_ERROR,
1829           folly::make_unique<AsyncSocketException>(
1830               AsyncSocketException::UNKNOWN, "No more free local ports"));
1831     }
1832   } else {
1833     totalWritten = ::sendmsg(fd, msg, msg_flags);
1834   }
1835   return WriteResult(totalWritten);
1836 }
1837
1838 AsyncSocket::WriteResult AsyncSocket::performWrite(
1839     const iovec* vec,
1840     uint32_t count,
1841     WriteFlags flags,
1842     uint32_t* countWritten,
1843     uint32_t* partialWritten) {
1844   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1845   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1846   // (since it may terminate the program if the main program doesn't explicitly
1847   // ignore it).
1848   struct msghdr msg;
1849   msg.msg_name = nullptr;
1850   msg.msg_namelen = 0;
1851   msg.msg_iov = const_cast<iovec *>(vec);
1852   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1853   msg.msg_control = nullptr;
1854   msg.msg_controllen = 0;
1855   msg.msg_flags = 0;
1856
1857   int msg_flags = MSG_DONTWAIT;
1858
1859 #ifdef MSG_NOSIGNAL // Linux-only
1860   msg_flags |= MSG_NOSIGNAL;
1861   if (isSet(flags, WriteFlags::CORK)) {
1862     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1863     // give it the rest of the data rather than immediately sending a partial
1864     // frame, even when TCP_NODELAY is enabled.
1865     msg_flags |= MSG_MORE;
1866   }
1867 #endif
1868   if (isSet(flags, WriteFlags::EOR)) {
1869     // marks that this is the last byte of a record (response)
1870     msg_flags |= MSG_EOR;
1871   }
1872   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
1873   auto totalWritten = writeResult.writeReturn;
1874   if (totalWritten < 0) {
1875     if (!writeResult.exception && errno == EAGAIN) {
1876       // TCP buffer is full; we can't write any more data right now.
1877       *countWritten = 0;
1878       *partialWritten = 0;
1879       return WriteResult(0);
1880     }
1881     // error
1882     *countWritten = 0;
1883     *partialWritten = 0;
1884     return writeResult;
1885   }
1886
1887   appBytesWritten_ += totalWritten;
1888
1889   uint32_t bytesWritten;
1890   uint32_t n;
1891   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1892     const iovec* v = vec + n;
1893     if (v->iov_len > bytesWritten) {
1894       // Partial write finished in the middle of this iovec
1895       *countWritten = n;
1896       *partialWritten = bytesWritten;
1897       return WriteResult(totalWritten);
1898     }
1899
1900     bytesWritten -= v->iov_len;
1901   }
1902
1903   assert(bytesWritten == 0);
1904   *countWritten = n;
1905   *partialWritten = 0;
1906   return WriteResult(totalWritten);
1907 }
1908
1909 /**
1910  * Re-register the EventHandler after eventFlags_ has changed.
1911  *
1912  * If an error occurs, fail() is called to move the socket into the error state
1913  * and call all currently installed callbacks.  After an error, the
1914  * AsyncSocket is completely unregistered.
1915  *
1916  * @return Returns true on succcess, or false on error.
1917  */
1918 bool AsyncSocket::updateEventRegistration() {
1919   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1920           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1921           << ", events=" << std::hex << eventFlags_;
1922   assert(eventBase_->isInEventBaseThread());
1923   if (eventFlags_ == EventHandler::NONE) {
1924     ioHandler_.unregisterHandler();
1925     return true;
1926   }
1927
1928   // Always register for persistent events, so we don't have to re-register
1929   // after being called back.
1930   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1931     eventFlags_ = EventHandler::NONE; // we're not registered after error
1932     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1933         withAddr("failed to update AsyncSocket event registration"));
1934     fail("updateEventRegistration", ex);
1935     return false;
1936   }
1937
1938   return true;
1939 }
1940
1941 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1942                                            uint16_t disable) {
1943   uint16_t oldFlags = eventFlags_;
1944   eventFlags_ |= enable;
1945   eventFlags_ &= ~disable;
1946   if (eventFlags_ == oldFlags) {
1947     return true;
1948   } else {
1949     return updateEventRegistration();
1950   }
1951 }
1952
1953 void AsyncSocket::startFail() {
1954   // startFail() should only be called once
1955   assert(state_ != StateEnum::ERROR);
1956   assert(getDestructorGuardCount() > 0);
1957   state_ = StateEnum::ERROR;
1958   // Ensure that SHUT_READ and SHUT_WRITE are set,
1959   // so all future attempts to read or write will be rejected
1960   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1961
1962   if (eventFlags_ != EventHandler::NONE) {
1963     eventFlags_ = EventHandler::NONE;
1964     ioHandler_.unregisterHandler();
1965   }
1966   writeTimeout_.cancelTimeout();
1967
1968   if (fd_ >= 0) {
1969     ioHandler_.changeHandlerFD(-1);
1970     doClose();
1971   }
1972 }
1973
1974 void AsyncSocket::finishFail() {
1975   assert(state_ == StateEnum::ERROR);
1976   assert(getDestructorGuardCount() > 0);
1977
1978   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1979                          withAddr("socket closing after error"));
1980   invokeConnectErr(ex);
1981   failAllWrites(ex);
1982
1983   if (readCallback_) {
1984     ReadCallback* callback = readCallback_;
1985     readCallback_ = nullptr;
1986     callback->readErr(ex);
1987   }
1988 }
1989
1990 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1991   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1992              << state_ << " host=" << addr_.describe()
1993              << "): failed in " << fn << "(): "
1994              << ex.what();
1995   startFail();
1996   finishFail();
1997 }
1998
1999 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2000   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2001                << state_ << " host=" << addr_.describe()
2002                << "): failed while connecting in " << fn << "(): "
2003                << ex.what();
2004   startFail();
2005
2006   invokeConnectErr(ex);
2007   finishFail();
2008 }
2009
2010 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2011   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2012                << state_ << " host=" << addr_.describe()
2013                << "): failed while reading in " << fn << "(): "
2014                << ex.what();
2015   startFail();
2016
2017   if (readCallback_ != nullptr) {
2018     ReadCallback* callback = readCallback_;
2019     readCallback_ = nullptr;
2020     callback->readErr(ex);
2021   }
2022
2023   finishFail();
2024 }
2025
2026 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2027   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2028                << state_ << " host=" << addr_.describe()
2029                << "): failed while writing in " << fn << "(): "
2030                << ex.what();
2031   startFail();
2032
2033   // Only invoke the first write callback, since the error occurred while
2034   // writing this request.  Let any other pending write callbacks be invoked in
2035   // finishFail().
2036   if (writeReqHead_ != nullptr) {
2037     WriteRequest* req = writeReqHead_;
2038     writeReqHead_ = req->getNext();
2039     WriteCallback* callback = req->getCallback();
2040     uint32_t bytesWritten = req->getTotalBytesWritten();
2041     req->destroy();
2042     if (callback) {
2043       callback->writeErr(bytesWritten, ex);
2044     }
2045   }
2046
2047   finishFail();
2048 }
2049
2050 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2051                              size_t bytesWritten,
2052                              const AsyncSocketException& ex) {
2053   // This version of failWrite() is used when the failure occurs before
2054   // we've added the callback to writeReqHead_.
2055   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2056              << state_ << " host=" << addr_.describe()
2057              <<"): failed while writing in " << fn << "(): "
2058              << ex.what();
2059   startFail();
2060
2061   if (callback != nullptr) {
2062     callback->writeErr(bytesWritten, ex);
2063   }
2064
2065   finishFail();
2066 }
2067
2068 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2069   // Invoke writeError() on all write callbacks.
2070   // This is used when writes are forcibly shutdown with write requests
2071   // pending, or when an error occurs with writes pending.
2072   while (writeReqHead_ != nullptr) {
2073     WriteRequest* req = writeReqHead_;
2074     writeReqHead_ = req->getNext();
2075     WriteCallback* callback = req->getCallback();
2076     if (callback) {
2077       callback->writeErr(req->getTotalBytesWritten(), ex);
2078     }
2079     req->destroy();
2080   }
2081 }
2082
2083 void AsyncSocket::invalidState(ConnectCallback* callback) {
2084   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2085              << "): connect() called in invalid state " << state_;
2086
2087   /*
2088    * The invalidState() methods don't use the normal failure mechanisms,
2089    * since we don't know what state we are in.  We don't want to call
2090    * startFail()/finishFail() recursively if we are already in the middle of
2091    * cleaning up.
2092    */
2093
2094   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2095                          "connect() called with socket in invalid state");
2096   connectEndTime_ = std::chrono::steady_clock::now();
2097   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2098     if (callback) {
2099       callback->connectErr(ex);
2100     }
2101   } else {
2102     // We can't use failConnect() here since connectCallback_
2103     // may already be set to another callback.  Invoke this ConnectCallback
2104     // here; any other connectCallback_ will be invoked in finishFail()
2105     startFail();
2106     if (callback) {
2107       callback->connectErr(ex);
2108     }
2109     finishFail();
2110   }
2111 }
2112
2113 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2114   connectEndTime_ = std::chrono::steady_clock::now();
2115   if (connectCallback_) {
2116     ConnectCallback* callback = connectCallback_;
2117     connectCallback_ = nullptr;
2118     callback->connectErr(ex);
2119   }
2120 }
2121
2122 void AsyncSocket::invokeConnectSuccess() {
2123   connectEndTime_ = std::chrono::steady_clock::now();
2124   if (connectCallback_) {
2125     ConnectCallback* callback = connectCallback_;
2126     connectCallback_ = nullptr;
2127     callback->connectSuccess();
2128   }
2129 }
2130
2131 void AsyncSocket::invalidState(ReadCallback* callback) {
2132   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2133              << "): setReadCallback(" << callback
2134              << ") called in invalid state " << state_;
2135
2136   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2137                          "setReadCallback() called with socket in "
2138                          "invalid state");
2139   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2140     if (callback) {
2141       callback->readErr(ex);
2142     }
2143   } else {
2144     startFail();
2145     if (callback) {
2146       callback->readErr(ex);
2147     }
2148     finishFail();
2149   }
2150 }
2151
2152 void AsyncSocket::invalidState(WriteCallback* callback) {
2153   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2154              << "): write() called in invalid state " << state_;
2155
2156   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2157                          withAddr("write() called with socket in invalid state"));
2158   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2159     if (callback) {
2160       callback->writeErr(0, ex);
2161     }
2162   } else {
2163     startFail();
2164     if (callback) {
2165       callback->writeErr(0, ex);
2166     }
2167     finishFail();
2168   }
2169 }
2170
2171 void AsyncSocket::doClose() {
2172   if (fd_ == -1) return;
2173   if (shutdownSocketSet_) {
2174     shutdownSocketSet_->close(fd_);
2175   } else {
2176     ::close(fd_);
2177   }
2178   fd_ = -1;
2179 }
2180
2181 std::ostream& operator << (std::ostream& os,
2182                            const AsyncSocket::StateEnum& state) {
2183   os << static_cast<int>(state);
2184   return os;
2185 }
2186
2187 std::string AsyncSocket::withAddr(const std::string& s) {
2188   // Don't use addr_ directly because it may not be initialized
2189   // e.g. if constructed from fd
2190   folly::SocketAddress peer, local;
2191   try {
2192     getPeerAddress(&peer);
2193     getLocalAddress(&local);
2194   } catch (const std::exception&) {
2195     // ignore
2196   } catch (...) {
2197     // ignore
2198   }
2199   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2200 }
2201
2202 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2203   bufferCallback_ = cb;
2204 }
2205
2206 } // folly