Start fixing implicit truncations
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/Portability.h>
23 #include <folly/portability/Fcntl.h>
24 #include <folly/portability/Sockets.h>
25 #include <folly/portability/SysUio.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <errno.h>
29 #include <limits.h>
30 #include <thread>
31 #include <sys/types.h>
32 #include <boost/preprocessor/control/if.hpp>
33
34 using std::string;
35 using std::unique_ptr;
36
37 namespace fsp = folly::portability::sockets;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   WriteResult performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     auto writeResult = socket_->performWrite(
99         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
100     bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
101     return writeResult;
102   }
103
104   bool isComplete() override {
105     return opsWritten_ == getOpCount();
106   }
107
108   void consume() override {
109     // Advance opIndex_ forward by opsWritten_
110     opIndex_ += opsWritten_;
111     assert(opIndex_ < opCount_);
112
113     // If we've finished writing any IOBufs, release them
114     if (ioBuf_) {
115       for (uint32_t i = opsWritten_; i != 0; --i) {
116         assert(ioBuf_);
117         ioBuf_ = ioBuf_->pop();
118       }
119     }
120
121     // Move partialBytes_ forward into the current iovec buffer
122     struct iovec* currentOp = writeOps_ + opIndex_;
123     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
124     currentOp->iov_base =
125       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
126     currentOp->iov_len -= partialBytes_;
127
128     // Increment the totalBytesWritten_ count by bytesWritten_;
129     assert(bytesWritten_ >= 0);
130     totalBytesWritten_ += uint32_t(bytesWritten_);
131   }
132
133  private:
134   BytesWriteRequest(AsyncSocket* socket,
135                     WriteCallback* callback,
136                     const struct iovec* ops,
137                     uint32_t opCount,
138                     uint32_t partialBytes,
139                     uint32_t bytesWritten,
140                     unique_ptr<IOBuf>&& ioBuf,
141                     WriteFlags flags)
142     : AsyncSocket::WriteRequest(socket, callback)
143     , opCount_(opCount)
144     , opIndex_(0)
145     , flags_(flags)
146     , ioBuf_(std::move(ioBuf))
147     , opsWritten_(0)
148     , partialBytes_(partialBytes)
149     , bytesWritten_(bytesWritten) {
150     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
151   }
152
153   // private destructor, to ensure callers use destroy()
154   ~BytesWriteRequest() override = default;
155
156   const struct iovec* getOps() const {
157     assert(opCount_ > opIndex_);
158     return writeOps_ + opIndex_;
159   }
160
161   uint32_t getOpCount() const {
162     assert(opCount_ > opIndex_);
163     return opCount_ - opIndex_;
164   }
165
166   uint32_t opCount_;            ///< number of entries in writeOps_
167   uint32_t opIndex_;            ///< current index into writeOps_
168   WriteFlags flags_;            ///< set for WriteFlags
169   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
170
171   // for consume(), how much we wrote on the last write
172   uint32_t opsWritten_;         ///< complete ops written
173   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
174   ssize_t bytesWritten_;        ///< bytes written altogether
175
176   struct iovec writeOps_[];     ///< write operation(s) list
177 };
178
179 AsyncSocket::AsyncSocket()
180     : eventBase_(nullptr),
181       writeTimeout_(this, nullptr),
182       ioHandler_(this, nullptr),
183       immediateReadHandler_(this) {
184   VLOG(5) << "new AsyncSocket()";
185   init();
186 }
187
188 AsyncSocket::AsyncSocket(EventBase* evb)
189     : eventBase_(evb),
190       writeTimeout_(this, evb),
191       ioHandler_(this, evb),
192       immediateReadHandler_(this) {
193   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
194   init();
195 }
196
197 AsyncSocket::AsyncSocket(EventBase* evb,
198                            const folly::SocketAddress& address,
199                            uint32_t connectTimeout)
200   : AsyncSocket(evb) {
201   connect(nullptr, address, connectTimeout);
202 }
203
204 AsyncSocket::AsyncSocket(EventBase* evb,
205                            const std::string& ip,
206                            uint16_t port,
207                            uint32_t connectTimeout)
208   : AsyncSocket(evb) {
209   connect(nullptr, ip, port, connectTimeout);
210 }
211
212 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
213     : eventBase_(evb),
214       writeTimeout_(this, evb),
215       ioHandler_(this, evb, fd),
216       immediateReadHandler_(this) {
217   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
218           << fd << ")";
219   init();
220   fd_ = fd;
221   setCloseOnExec();
222   state_ = StateEnum::ESTABLISHED;
223 }
224
225 // init() method, since constructor forwarding isn't supported in most
226 // compilers yet.
227 void AsyncSocket::init() {
228   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
229   shutdownFlags_ = 0;
230   state_ = StateEnum::UNINIT;
231   eventFlags_ = EventHandler::NONE;
232   fd_ = -1;
233   sendTimeout_ = 0;
234   maxReadsPerEvent_ = 16;
235   connectCallback_ = nullptr;
236   readCallback_ = nullptr;
237   writeReqHead_ = nullptr;
238   writeReqTail_ = nullptr;
239   shutdownSocketSet_ = nullptr;
240   appBytesWritten_ = 0;
241   appBytesReceived_ = 0;
242 }
243
244 AsyncSocket::~AsyncSocket() {
245   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
246           << ", evb=" << eventBase_ << ", fd=" << fd_
247           << ", state=" << state_ << ")";
248 }
249
250 void AsyncSocket::destroy() {
251   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
252           << ", fd=" << fd_ << ", state=" << state_;
253   // When destroy is called, close the socket immediately
254   closeNow();
255
256   // Then call DelayedDestruction::destroy() to take care of
257   // whether or not we need immediate or delayed destruction
258   DelayedDestruction::destroy();
259 }
260
261 int AsyncSocket::detachFd() {
262   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
263           << ", evb=" << eventBase_ << ", state=" << state_
264           << ", events=" << std::hex << eventFlags_ << ")";
265   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
266   // actually close the descriptor.
267   if (shutdownSocketSet_) {
268     shutdownSocketSet_->remove(fd_);
269   }
270   int fd = fd_;
271   fd_ = -1;
272   // Call closeNow() to invoke all pending callbacks with an error.
273   closeNow();
274   // Update the EventHandler to stop using this fd.
275   // This can only be done after closeNow() unregisters the handler.
276   ioHandler_.changeHandlerFD(-1);
277   return fd;
278 }
279
280 const folly::SocketAddress& AsyncSocket::anyAddress() {
281   static const folly::SocketAddress anyAddress =
282     folly::SocketAddress("0.0.0.0", 0);
283   return anyAddress;
284 }
285
286 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
287   if (shutdownSocketSet_ == newSS) {
288     return;
289   }
290   if (shutdownSocketSet_ && fd_ != -1) {
291     shutdownSocketSet_->remove(fd_);
292   }
293   shutdownSocketSet_ = newSS;
294   if (shutdownSocketSet_ && fd_ != -1) {
295     shutdownSocketSet_->add(fd_);
296   }
297 }
298
299 void AsyncSocket::setCloseOnExec() {
300   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
301   if (rv != 0) {
302     auto errnoCopy = errno;
303     throw AsyncSocketException(
304         AsyncSocketException::INTERNAL_ERROR,
305         withAddr("failed to set close-on-exec flag"),
306         errnoCopy);
307   }
308 }
309
310 void AsyncSocket::connect(ConnectCallback* callback,
311                            const folly::SocketAddress& address,
312                            int timeout,
313                            const OptionMap &options,
314                            const folly::SocketAddress& bindAddr) noexcept {
315   DestructorGuard dg(this);
316   assert(eventBase_->isInEventBaseThread());
317
318   addr_ = address;
319
320   // Make sure we're in the uninitialized state
321   if (state_ != StateEnum::UNINIT) {
322     return invalidState(callback);
323   }
324
325   connectTimeout_ = std::chrono::milliseconds(timeout);
326   connectStartTime_ = std::chrono::steady_clock::now();
327   // Make connect end time at least >= connectStartTime.
328   connectEndTime_ = connectStartTime_;
329
330   assert(fd_ == -1);
331   state_ = StateEnum::CONNECTING;
332   connectCallback_ = callback;
333
334   sockaddr_storage addrStorage;
335   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
336
337   try {
338     // Create the socket
339     // Technically the first parameter should actually be a protocol family
340     // constant (PF_xxx) rather than an address family (AF_xxx), but the
341     // distinction is mainly just historical.  In pretty much all
342     // implementations the PF_foo and AF_foo constants are identical.
343     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
344     if (fd_ < 0) {
345       auto errnoCopy = errno;
346       throw AsyncSocketException(
347           AsyncSocketException::INTERNAL_ERROR,
348           withAddr("failed to create socket"),
349           errnoCopy);
350     }
351     if (shutdownSocketSet_) {
352       shutdownSocketSet_->add(fd_);
353     }
354     ioHandler_.changeHandlerFD(fd_);
355
356     setCloseOnExec();
357
358     // Put the socket in non-blocking mode
359     int flags = fcntl(fd_, F_GETFL, 0);
360     if (flags == -1) {
361       auto errnoCopy = errno;
362       throw AsyncSocketException(
363           AsyncSocketException::INTERNAL_ERROR,
364           withAddr("failed to get socket flags"),
365           errnoCopy);
366     }
367     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
368     if (rv == -1) {
369       auto errnoCopy = errno;
370       throw AsyncSocketException(
371           AsyncSocketException::INTERNAL_ERROR,
372           withAddr("failed to put socket in non-blocking mode"),
373           errnoCopy);
374     }
375
376 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
377     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
378     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
379     if (rv == -1) {
380       auto errnoCopy = errno;
381       throw AsyncSocketException(
382           AsyncSocketException::INTERNAL_ERROR,
383           "failed to enable F_SETNOSIGPIPE on socket",
384           errnoCopy);
385     }
386 #endif
387
388     // By default, turn on TCP_NODELAY
389     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
390     // setNoDelay() will log an error message if it fails.
391     if (address.getFamily() != AF_UNIX) {
392       (void)setNoDelay(true);
393     }
394
395     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
396             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
397
398     // bind the socket
399     if (bindAddr != anyAddress()) {
400       int one = 1;
401       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
402         auto errnoCopy = errno;
403         doClose();
404         throw AsyncSocketException(
405             AsyncSocketException::NOT_OPEN,
406             "failed to setsockopt prior to bind on " + bindAddr.describe(),
407             errnoCopy);
408       }
409
410       bindAddr.getAddress(&addrStorage);
411
412       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
413         auto errnoCopy = errno;
414         doClose();
415         throw AsyncSocketException(
416             AsyncSocketException::NOT_OPEN,
417             "failed to bind to async socket: " + bindAddr.describe(),
418             errnoCopy);
419       }
420     }
421
422     // Apply the additional options if any.
423     for (const auto& opt: options) {
424       rv = opt.first.apply(fd_, opt.second);
425       if (rv != 0) {
426         auto errnoCopy = errno;
427         throw AsyncSocketException(
428             AsyncSocketException::INTERNAL_ERROR,
429             withAddr("failed to set socket option"),
430             errnoCopy);
431       }
432     }
433
434     // Perform the connect()
435     address.getAddress(&addrStorage);
436
437     if (tfoEnabled_) {
438       state_ = StateEnum::FAST_OPEN;
439       tfoAttempted_ = true;
440     } else {
441       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
442         return;
443       }
444     }
445
446     // If we're still here the connect() succeeded immediately.
447     // Fall through to call the callback outside of this try...catch block
448   } catch (const AsyncSocketException& ex) {
449     return failConnect(__func__, ex);
450   } catch (const std::exception& ex) {
451     // shouldn't happen, but handle it just in case
452     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
453                << "): unexpected " << typeid(ex).name() << " exception: "
454                << ex.what();
455     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
456                             withAddr(string("unexpected exception: ") +
457                                      ex.what()));
458     return failConnect(__func__, tex);
459   }
460
461   // The connection succeeded immediately
462   // The read callback may not have been set yet, and no writes may be pending
463   // yet, so we don't have to register for any events at the moment.
464   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
465   assert(readCallback_ == nullptr);
466   assert(writeReqHead_ == nullptr);
467   if (state_ != StateEnum::FAST_OPEN) {
468     state_ = StateEnum::ESTABLISHED;
469   }
470   invokeConnectSuccess();
471 }
472
473 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
474   int rv = fsp::connect(fd_, saddr, len);
475   if (rv < 0) {
476     auto errnoCopy = errno;
477     if (errnoCopy == EINPROGRESS) {
478       scheduleConnectTimeout();
479       registerForConnectEvents();
480     } else {
481       throw AsyncSocketException(
482           AsyncSocketException::NOT_OPEN,
483           "connect failed (immediately)",
484           errnoCopy);
485     }
486   }
487   return rv;
488 }
489
490 void AsyncSocket::scheduleConnectTimeout() {
491   // Connection in progress.
492   int timeout = connectTimeout_.count();
493   if (timeout > 0) {
494     // Start a timer in case the connection takes too long.
495     if (!writeTimeout_.scheduleTimeout(timeout)) {
496       throw AsyncSocketException(
497           AsyncSocketException::INTERNAL_ERROR,
498           withAddr("failed to schedule AsyncSocket connect timeout"));
499     }
500   }
501 }
502
503 void AsyncSocket::registerForConnectEvents() {
504   // Register for write events, so we'll
505   // be notified when the connection finishes/fails.
506   // Note that we don't register for a persistent event here.
507   assert(eventFlags_ == EventHandler::NONE);
508   eventFlags_ = EventHandler::WRITE;
509   if (!ioHandler_.registerHandler(eventFlags_)) {
510     throw AsyncSocketException(
511         AsyncSocketException::INTERNAL_ERROR,
512         withAddr("failed to register AsyncSocket connect handler"));
513   }
514 }
515
516 void AsyncSocket::connect(ConnectCallback* callback,
517                            const string& ip, uint16_t port,
518                            int timeout,
519                            const OptionMap &options) noexcept {
520   DestructorGuard dg(this);
521   try {
522     connectCallback_ = callback;
523     connect(callback, folly::SocketAddress(ip, port), timeout, options);
524   } catch (const std::exception& ex) {
525     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
526                             ex.what());
527     return failConnect(__func__, tex);
528   }
529 }
530
531 void AsyncSocket::cancelConnect() {
532   connectCallback_ = nullptr;
533   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
534     closeNow();
535   }
536 }
537
538 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
539   sendTimeout_ = milliseconds;
540   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
541
542   // If we are currently pending on write requests, immediately update
543   // writeTimeout_ with the new value.
544   if ((eventFlags_ & EventHandler::WRITE) &&
545       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
546     assert(state_ == StateEnum::ESTABLISHED);
547     assert((shutdownFlags_ & SHUT_WRITE) == 0);
548     if (sendTimeout_ > 0) {
549       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
550         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
551             withAddr("failed to reschedule send timeout in setSendTimeout"));
552         return failWrite(__func__, ex);
553       }
554     } else {
555       writeTimeout_.cancelTimeout();
556     }
557   }
558 }
559
560 void AsyncSocket::setReadCB(ReadCallback *callback) {
561   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
562           << ", callback=" << callback << ", state=" << state_;
563
564   // Short circuit if callback is the same as the existing readCallback_.
565   //
566   // Note that this is needed for proper functioning during some cleanup cases.
567   // During cleanup we allow setReadCallback(nullptr) to be called even if the
568   // read callback is already unset and we have been detached from an event
569   // base.  This check prevents us from asserting
570   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
571   if (callback == readCallback_) {
572     return;
573   }
574
575   /* We are removing a read callback */
576   if (callback == nullptr &&
577       immediateReadHandler_.isLoopCallbackScheduled()) {
578     immediateReadHandler_.cancelLoopCallback();
579   }
580
581   if (shutdownFlags_ & SHUT_READ) {
582     // Reads have already been shut down on this socket.
583     //
584     // Allow setReadCallback(nullptr) to be called in this case, but don't
585     // allow a new callback to be set.
586     //
587     // For example, setReadCallback(nullptr) can happen after an error if we
588     // invoke some other error callback before invoking readError().  The other
589     // error callback that is invoked first may go ahead and clear the read
590     // callback before we get a chance to invoke readError().
591     if (callback != nullptr) {
592       return invalidState(callback);
593     }
594     assert((eventFlags_ & EventHandler::READ) == 0);
595     readCallback_ = nullptr;
596     return;
597   }
598
599   DestructorGuard dg(this);
600   assert(eventBase_->isInEventBaseThread());
601
602   switch ((StateEnum)state_) {
603     case StateEnum::CONNECTING:
604     case StateEnum::FAST_OPEN:
605       // For convenience, we allow the read callback to be set while we are
606       // still connecting.  We just store the callback for now.  Once the
607       // connection completes we'll register for read events.
608       readCallback_ = callback;
609       return;
610     case StateEnum::ESTABLISHED:
611     {
612       readCallback_ = callback;
613       uint16_t oldFlags = eventFlags_;
614       if (readCallback_) {
615         eventFlags_ |= EventHandler::READ;
616       } else {
617         eventFlags_ &= ~EventHandler::READ;
618       }
619
620       // Update our registration if our flags have changed
621       if (eventFlags_ != oldFlags) {
622         // We intentionally ignore the return value here.
623         // updateEventRegistration() will move us into the error state if it
624         // fails, and we don't need to do anything else here afterwards.
625         (void)updateEventRegistration();
626       }
627
628       if (readCallback_) {
629         checkForImmediateRead();
630       }
631       return;
632     }
633     case StateEnum::CLOSED:
634     case StateEnum::ERROR:
635       // We should never reach here.  SHUT_READ should always be set
636       // if we are in STATE_CLOSED or STATE_ERROR.
637       assert(false);
638       return invalidState(callback);
639     case StateEnum::UNINIT:
640       // We do not allow setReadCallback() to be called before we start
641       // connecting.
642       return invalidState(callback);
643   }
644
645   // We don't put a default case in the switch statement, so that the compiler
646   // will warn us to update the switch statement if a new state is added.
647   return invalidState(callback);
648 }
649
650 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
651   return readCallback_;
652 }
653
654 void AsyncSocket::write(WriteCallback* callback,
655                          const void* buf, size_t bytes, WriteFlags flags) {
656   iovec op;
657   op.iov_base = const_cast<void*>(buf);
658   op.iov_len = bytes;
659   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
660 }
661
662 void AsyncSocket::writev(WriteCallback* callback,
663                           const iovec* vec,
664                           size_t count,
665                           WriteFlags flags) {
666   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
667 }
668
669 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
670                               WriteFlags flags) {
671   constexpr size_t kSmallSizeMax = 64;
672   size_t count = buf->countChainElements();
673   if (count <= kSmallSizeMax) {
674     // suppress "warning: variable length array 'vec' is used [-Wvla]"
675     FOLLY_PUSH_WARNING;
676     FOLLY_GCC_DISABLE_WARNING(vla);
677     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
678     FOLLY_POP_WARNING;
679
680     writeChainImpl(callback, vec, count, std::move(buf), flags);
681   } else {
682     iovec* vec = new iovec[count];
683     writeChainImpl(callback, vec, count, std::move(buf), flags);
684     delete[] vec;
685   }
686 }
687
688 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
689     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
690   size_t veclen = buf->fillIov(vec, count);
691   writeImpl(callback, vec, veclen, std::move(buf), flags);
692 }
693
694 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
695                              size_t count, unique_ptr<IOBuf>&& buf,
696                              WriteFlags flags) {
697   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
698           << ", callback=" << callback << ", count=" << count
699           << ", state=" << state_;
700   DestructorGuard dg(this);
701   unique_ptr<IOBuf>ioBuf(std::move(buf));
702   assert(eventBase_->isInEventBaseThread());
703
704   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
705     // No new writes may be performed after the write side of the socket has
706     // been shutdown.
707     //
708     // We could just call callback->writeError() here to fail just this write.
709     // However, fail hard and use invalidState() to fail all outstanding
710     // callbacks and move the socket into the error state.  There's most likely
711     // a bug in the caller's code, so we abort everything rather than trying to
712     // proceed as best we can.
713     return invalidState(callback);
714   }
715
716   uint32_t countWritten = 0;
717   uint32_t partialWritten = 0;
718   int bytesWritten = 0;
719   bool mustRegister = false;
720   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
721       !connecting()) {
722     if (writeReqHead_ == nullptr) {
723       // If we are established and there are no other writes pending,
724       // we can attempt to perform the write immediately.
725       assert(writeReqTail_ == nullptr);
726       assert((eventFlags_ & EventHandler::WRITE) == 0);
727
728       auto writeResult =
729           performWrite(vec, count, flags, &countWritten, &partialWritten);
730       bytesWritten = writeResult.writeReturn;
731       if (bytesWritten < 0) {
732         auto errnoCopy = errno;
733         if (writeResult.exception) {
734           return failWrite(__func__, callback, 0, *writeResult.exception);
735         }
736         AsyncSocketException ex(
737             AsyncSocketException::INTERNAL_ERROR,
738             withAddr("writev failed"),
739             errnoCopy);
740         return failWrite(__func__, callback, 0, ex);
741       } else if (countWritten == count) {
742         // We successfully wrote everything.
743         // Invoke the callback and return.
744         if (callback) {
745           callback->writeSuccess();
746         }
747         return;
748       } else { // continue writing the next writeReq
749         if (bufferCallback_) {
750           bufferCallback_->onEgressBuffered();
751         }
752       }
753       if (!connecting()) {
754         // Writes might put the socket back into connecting state
755         // if TFO is enabled, and using TFO fails.
756         // This means that write timeouts would not be active, however
757         // connect timeouts would affect this stage.
758         mustRegister = true;
759       }
760     }
761   } else if (!connecting()) {
762     // Invalid state for writing
763     return invalidState(callback);
764   }
765
766   // Create a new WriteRequest to add to the queue
767   WriteRequest* req;
768   try {
769     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
770                                         count - countWritten, partialWritten,
771                                         bytesWritten, std::move(ioBuf), flags);
772   } catch (const std::exception& ex) {
773     // we mainly expect to catch std::bad_alloc here
774     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
775         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
776     return failWrite(__func__, callback, bytesWritten, tex);
777   }
778   req->consume();
779   if (writeReqTail_ == nullptr) {
780     assert(writeReqHead_ == nullptr);
781     writeReqHead_ = writeReqTail_ = req;
782   } else {
783     writeReqTail_->append(req);
784     writeReqTail_ = req;
785   }
786
787   // Register for write events if are established and not currently
788   // waiting on write events
789   if (mustRegister) {
790     assert(state_ == StateEnum::ESTABLISHED);
791     assert((eventFlags_ & EventHandler::WRITE) == 0);
792     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
793       assert(state_ == StateEnum::ERROR);
794       return;
795     }
796     if (sendTimeout_ > 0) {
797       // Schedule a timeout to fire if the write takes too long.
798       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
799         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
800                                withAddr("failed to schedule send timeout"));
801         return failWrite(__func__, ex);
802       }
803     }
804   }
805 }
806
807 void AsyncSocket::writeRequest(WriteRequest* req) {
808   if (writeReqTail_ == nullptr) {
809     assert(writeReqHead_ == nullptr);
810     writeReqHead_ = writeReqTail_ = req;
811     req->start();
812   } else {
813     writeReqTail_->append(req);
814     writeReqTail_ = req;
815   }
816 }
817
818 void AsyncSocket::close() {
819   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
820           << ", state=" << state_ << ", shutdownFlags="
821           << std::hex << (int) shutdownFlags_;
822
823   // close() is only different from closeNow() when there are pending writes
824   // that need to drain before we can close.  In all other cases, just call
825   // closeNow().
826   //
827   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
828   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
829   // is still running.  (e.g., If there are multiple pending writes, and we
830   // call writeError() on the first one, it may call close().  In this case we
831   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
832   // writes will still be in the queue.)
833   //
834   // We only need to drain pending writes if we are still in STATE_CONNECTING
835   // or STATE_ESTABLISHED
836   if ((writeReqHead_ == nullptr) ||
837       !(state_ == StateEnum::CONNECTING ||
838       state_ == StateEnum::ESTABLISHED)) {
839     closeNow();
840     return;
841   }
842
843   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
844   // destroyed until close() returns.
845   DestructorGuard dg(this);
846   assert(eventBase_->isInEventBaseThread());
847
848   // Since there are write requests pending, we have to set the
849   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
850   // connect finishes and we finish writing these requests.
851   //
852   // Set SHUT_READ to indicate that reads are shut down, and set the
853   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
854   // pending writes complete.
855   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
856
857   // If a read callback is set, invoke readEOF() immediately to inform it that
858   // the socket has been closed and no more data can be read.
859   if (readCallback_) {
860     // Disable reads if they are enabled
861     if (!updateEventRegistration(0, EventHandler::READ)) {
862       // We're now in the error state; callbacks have been cleaned up
863       assert(state_ == StateEnum::ERROR);
864       assert(readCallback_ == nullptr);
865     } else {
866       ReadCallback* callback = readCallback_;
867       readCallback_ = nullptr;
868       callback->readEOF();
869     }
870   }
871 }
872
873 void AsyncSocket::closeNow() {
874   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
875           << ", state=" << state_ << ", shutdownFlags="
876           << std::hex << (int) shutdownFlags_;
877   DestructorGuard dg(this);
878   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
879
880   switch (state_) {
881     case StateEnum::ESTABLISHED:
882     case StateEnum::CONNECTING:
883     case StateEnum::FAST_OPEN: {
884       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
885       state_ = StateEnum::CLOSED;
886
887       // If the write timeout was set, cancel it.
888       writeTimeout_.cancelTimeout();
889
890       // If we are registered for I/O events, unregister.
891       if (eventFlags_ != EventHandler::NONE) {
892         eventFlags_ = EventHandler::NONE;
893         if (!updateEventRegistration()) {
894           // We will have been moved into the error state.
895           assert(state_ == StateEnum::ERROR);
896           return;
897         }
898       }
899
900       if (immediateReadHandler_.isLoopCallbackScheduled()) {
901         immediateReadHandler_.cancelLoopCallback();
902       }
903
904       if (fd_ >= 0) {
905         ioHandler_.changeHandlerFD(-1);
906         doClose();
907       }
908
909       invokeConnectErr(socketClosedLocallyEx);
910
911       failAllWrites(socketClosedLocallyEx);
912
913       if (readCallback_) {
914         ReadCallback* callback = readCallback_;
915         readCallback_ = nullptr;
916         callback->readEOF();
917       }
918       return;
919     }
920     case StateEnum::CLOSED:
921       // Do nothing.  It's possible that we are being called recursively
922       // from inside a callback that we invoked inside another call to close()
923       // that is still running.
924       return;
925     case StateEnum::ERROR:
926       // Do nothing.  The error handling code has performed (or is performing)
927       // cleanup.
928       return;
929     case StateEnum::UNINIT:
930       assert(eventFlags_ == EventHandler::NONE);
931       assert(connectCallback_ == nullptr);
932       assert(readCallback_ == nullptr);
933       assert(writeReqHead_ == nullptr);
934       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
935       state_ = StateEnum::CLOSED;
936       return;
937   }
938
939   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
940               << ") called in unknown state " << state_;
941 }
942
943 void AsyncSocket::closeWithReset() {
944   // Enable SO_LINGER, with the linger timeout set to 0.
945   // This will trigger a TCP reset when we close the socket.
946   if (fd_ >= 0) {
947     struct linger optLinger = {1, 0};
948     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
949       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
950               << "on " << fd_ << ": errno=" << errno;
951     }
952   }
953
954   // Then let closeNow() take care of the rest
955   closeNow();
956 }
957
958 void AsyncSocket::shutdownWrite() {
959   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
960           << ", state=" << state_ << ", shutdownFlags="
961           << std::hex << (int) shutdownFlags_;
962
963   // If there are no pending writes, shutdownWrite() is identical to
964   // shutdownWriteNow().
965   if (writeReqHead_ == nullptr) {
966     shutdownWriteNow();
967     return;
968   }
969
970   assert(eventBase_->isInEventBaseThread());
971
972   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
973   // shutdown will be performed once all writes complete.
974   shutdownFlags_ |= SHUT_WRITE_PENDING;
975 }
976
977 void AsyncSocket::shutdownWriteNow() {
978   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
979           << ", fd=" << fd_ << ", state=" << state_
980           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
981
982   if (shutdownFlags_ & SHUT_WRITE) {
983     // Writes are already shutdown; nothing else to do.
984     return;
985   }
986
987   // If SHUT_READ is already set, just call closeNow() to completely
988   // close the socket.  This can happen if close() was called with writes
989   // pending, and then shutdownWriteNow() is called before all pending writes
990   // complete.
991   if (shutdownFlags_ & SHUT_READ) {
992     closeNow();
993     return;
994   }
995
996   DestructorGuard dg(this);
997   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
998
999   switch (static_cast<StateEnum>(state_)) {
1000     case StateEnum::ESTABLISHED:
1001     {
1002       shutdownFlags_ |= SHUT_WRITE;
1003
1004       // If the write timeout was set, cancel it.
1005       writeTimeout_.cancelTimeout();
1006
1007       // If we are registered for write events, unregister.
1008       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1009         // We will have been moved into the error state.
1010         assert(state_ == StateEnum::ERROR);
1011         return;
1012       }
1013
1014       // Shutdown writes on the file descriptor
1015       shutdown(fd_, SHUT_WR);
1016
1017       // Immediately fail all write requests
1018       failAllWrites(socketShutdownForWritesEx);
1019       return;
1020     }
1021     case StateEnum::CONNECTING:
1022     {
1023       // Set the SHUT_WRITE_PENDING flag.
1024       // When the connection completes, it will check this flag,
1025       // shutdown the write half of the socket, and then set SHUT_WRITE.
1026       shutdownFlags_ |= SHUT_WRITE_PENDING;
1027
1028       // Immediately fail all write requests
1029       failAllWrites(socketShutdownForWritesEx);
1030       return;
1031     }
1032     case StateEnum::UNINIT:
1033       // Callers normally shouldn't call shutdownWriteNow() before the socket
1034       // even starts connecting.  Nonetheless, go ahead and set
1035       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1036       // immediately shut down the write side of the socket.
1037       shutdownFlags_ |= SHUT_WRITE_PENDING;
1038       return;
1039     case StateEnum::FAST_OPEN:
1040       // In fast open state we haven't call connected yet, and if we shutdown
1041       // the writes, we will never try to call connect, so shut everything down
1042       shutdownFlags_ |= SHUT_WRITE;
1043       // Immediately fail all write requests
1044       failAllWrites(socketShutdownForWritesEx);
1045       return;
1046     case StateEnum::CLOSED:
1047     case StateEnum::ERROR:
1048       // We should never get here.  SHUT_WRITE should always be set
1049       // in STATE_CLOSED and STATE_ERROR.
1050       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1051                  << ", fd=" << fd_ << ") in unexpected state " << state_
1052                  << " with SHUT_WRITE not set ("
1053                  << std::hex << (int) shutdownFlags_ << ")";
1054       assert(false);
1055       return;
1056   }
1057
1058   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1059               << fd_ << ") called in unknown state " << state_;
1060 }
1061
1062 bool AsyncSocket::readable() const {
1063   if (fd_ == -1) {
1064     return false;
1065   }
1066   struct pollfd fds[1];
1067   fds[0].fd = fd_;
1068   fds[0].events = POLLIN;
1069   fds[0].revents = 0;
1070   int rc = poll(fds, 1, 0);
1071   return rc == 1;
1072 }
1073
1074 bool AsyncSocket::isPending() const {
1075   return ioHandler_.isPending();
1076 }
1077
1078 bool AsyncSocket::hangup() const {
1079   if (fd_ == -1) {
1080     // sanity check, no one should ask for hangup if we are not connected.
1081     assert(false);
1082     return false;
1083   }
1084 #ifdef POLLRDHUP // Linux-only
1085   struct pollfd fds[1];
1086   fds[0].fd = fd_;
1087   fds[0].events = POLLRDHUP|POLLHUP;
1088   fds[0].revents = 0;
1089   poll(fds, 1, 0);
1090   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1091 #else
1092   return false;
1093 #endif
1094 }
1095
1096 bool AsyncSocket::good() const {
1097   return (
1098       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1099        state_ == StateEnum::ESTABLISHED) &&
1100       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1101 }
1102
1103 bool AsyncSocket::error() const {
1104   return (state_ == StateEnum::ERROR);
1105 }
1106
1107 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1108   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1109           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1110           << ", state=" << state_ << ", events="
1111           << std::hex << eventFlags_ << ")";
1112   assert(eventBase_ == nullptr);
1113   assert(eventBase->isInEventBaseThread());
1114
1115   eventBase_ = eventBase;
1116   ioHandler_.attachEventBase(eventBase);
1117   writeTimeout_.attachEventBase(eventBase);
1118 }
1119
1120 void AsyncSocket::detachEventBase() {
1121   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1122           << ", old evb=" << eventBase_ << ", state=" << state_
1123           << ", events=" << std::hex << eventFlags_ << ")";
1124   assert(eventBase_ != nullptr);
1125   assert(eventBase_->isInEventBaseThread());
1126
1127   eventBase_ = nullptr;
1128   ioHandler_.detachEventBase();
1129   writeTimeout_.detachEventBase();
1130 }
1131
1132 bool AsyncSocket::isDetachable() const {
1133   DCHECK(eventBase_ != nullptr);
1134   DCHECK(eventBase_->isInEventBaseThread());
1135
1136   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1137 }
1138
1139 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1140   if (!localAddr_.isInitialized()) {
1141     localAddr_.setFromLocalAddress(fd_);
1142   }
1143   *address = localAddr_;
1144 }
1145
1146 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1147   if (!addr_.isInitialized()) {
1148     addr_.setFromPeerAddress(fd_);
1149   }
1150   *address = addr_;
1151 }
1152
1153 bool AsyncSocket::getTFOSucceded() const {
1154   return detail::tfo_succeeded(fd_);
1155 }
1156
1157 int AsyncSocket::setNoDelay(bool noDelay) {
1158   if (fd_ < 0) {
1159     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1160                << this << "(state=" << state_ << ")";
1161     return EINVAL;
1162
1163   }
1164
1165   int value = noDelay ? 1 : 0;
1166   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1167     int errnoCopy = errno;
1168     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1169             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1170             << strerror(errnoCopy);
1171     return errnoCopy;
1172   }
1173
1174   return 0;
1175 }
1176
1177 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1178
1179   #ifndef TCP_CONGESTION
1180   #define TCP_CONGESTION  13
1181   #endif
1182
1183   if (fd_ < 0) {
1184     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1185                << "socket " << this << "(state=" << state_ << ")";
1186     return EINVAL;
1187
1188   }
1189
1190   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1191         cname.length() + 1) != 0) {
1192     int errnoCopy = errno;
1193     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1194             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1195             << strerror(errnoCopy);
1196     return errnoCopy;
1197   }
1198
1199   return 0;
1200 }
1201
1202 int AsyncSocket::setQuickAck(bool quickack) {
1203   if (fd_ < 0) {
1204     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1205                << this << "(state=" << state_ << ")";
1206     return EINVAL;
1207
1208   }
1209
1210 #ifdef TCP_QUICKACK // Linux-only
1211   int value = quickack ? 1 : 0;
1212   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1213     int errnoCopy = errno;
1214     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1215             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1216             << strerror(errnoCopy);
1217     return errnoCopy;
1218   }
1219
1220   return 0;
1221 #else
1222   return ENOSYS;
1223 #endif
1224 }
1225
1226 int AsyncSocket::setSendBufSize(size_t bufsize) {
1227   if (fd_ < 0) {
1228     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1229                << this << "(state=" << state_ << ")";
1230     return EINVAL;
1231   }
1232
1233   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1234     int errnoCopy = errno;
1235     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1236             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1237             << strerror(errnoCopy);
1238     return errnoCopy;
1239   }
1240
1241   return 0;
1242 }
1243
1244 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1245   if (fd_ < 0) {
1246     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1247                << this << "(state=" << state_ << ")";
1248     return EINVAL;
1249   }
1250
1251   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1252     int errnoCopy = errno;
1253     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1254             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1255             << strerror(errnoCopy);
1256     return errnoCopy;
1257   }
1258
1259   return 0;
1260 }
1261
1262 int AsyncSocket::setTCPProfile(int profd) {
1263   if (fd_ < 0) {
1264     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1265                << this << "(state=" << state_ << ")";
1266     return EINVAL;
1267   }
1268
1269   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1270     int errnoCopy = errno;
1271     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1272             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1273             << strerror(errnoCopy);
1274     return errnoCopy;
1275   }
1276
1277   return 0;
1278 }
1279
1280 void AsyncSocket::ioReady(uint16_t events) noexcept {
1281   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1282           << ", events=" << std::hex << events << ", state=" << state_;
1283   DestructorGuard dg(this);
1284   assert(events & EventHandler::READ_WRITE);
1285   assert(eventBase_->isInEventBaseThread());
1286
1287   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1288   if (relevantEvents == EventHandler::READ) {
1289     handleRead();
1290   } else if (relevantEvents == EventHandler::WRITE) {
1291     handleWrite();
1292   } else if (relevantEvents == EventHandler::READ_WRITE) {
1293     EventBase* originalEventBase = eventBase_;
1294     // If both read and write events are ready, process writes first.
1295     handleWrite();
1296
1297     // Return now if handleWrite() detached us from our EventBase
1298     if (eventBase_ != originalEventBase) {
1299       return;
1300     }
1301
1302     // Only call handleRead() if a read callback is still installed.
1303     // (It's possible that the read callback was uninstalled during
1304     // handleWrite().)
1305     if (readCallback_) {
1306       handleRead();
1307     }
1308   } else {
1309     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1310                << std::hex << events << "(this=" << this << ")";
1311     abort();
1312   }
1313 }
1314
1315 AsyncSocket::ReadResult
1316 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1317   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1318           << ", buflen=" << *buflen;
1319
1320   int recvFlags = 0;
1321   if (peek_) {
1322     recvFlags |= MSG_PEEK;
1323   }
1324
1325   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1326   if (bytes < 0) {
1327     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1328       // No more data to read right now.
1329       return ReadResult(READ_BLOCKING);
1330     } else {
1331       return ReadResult(READ_ERROR);
1332     }
1333   } else {
1334     appBytesReceived_ += bytes;
1335     return ReadResult(bytes);
1336   }
1337 }
1338
1339 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1340   // no matter what, buffer should be preapared for non-ssl socket
1341   CHECK(readCallback_);
1342   readCallback_->getReadBuffer(buf, buflen);
1343 }
1344
1345 void AsyncSocket::handleRead() noexcept {
1346   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1347           << ", state=" << state_;
1348   assert(state_ == StateEnum::ESTABLISHED);
1349   assert((shutdownFlags_ & SHUT_READ) == 0);
1350   assert(readCallback_ != nullptr);
1351   assert(eventFlags_ & EventHandler::READ);
1352
1353   // Loop until:
1354   // - a read attempt would block
1355   // - readCallback_ is uninstalled
1356   // - the number of loop iterations exceeds the optional maximum
1357   // - this AsyncSocket is moved to another EventBase
1358   //
1359   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1360   // which is why need to check for it here.
1361   //
1362   // The last bullet point is slightly subtle.  readDataAvailable() may also
1363   // detach this socket from this EventBase.  However, before
1364   // readDataAvailable() returns another thread may pick it up, attach it to
1365   // a different EventBase, and install another readCallback_.  We need to
1366   // exit immediately after readDataAvailable() returns if the eventBase_ has
1367   // changed.  (The caller must perform some sort of locking to transfer the
1368   // AsyncSocket between threads properly.  This will be sufficient to ensure
1369   // that this thread sees the updated eventBase_ variable after
1370   // readDataAvailable() returns.)
1371   uint16_t numReads = 0;
1372   EventBase* originalEventBase = eventBase_;
1373   while (readCallback_ && eventBase_ == originalEventBase) {
1374     // Get the buffer to read into.
1375     void* buf = nullptr;
1376     size_t buflen = 0, offset = 0;
1377     try {
1378       prepareReadBuffer(&buf, &buflen);
1379       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1380     } catch (const AsyncSocketException& ex) {
1381       return failRead(__func__, ex);
1382     } catch (const std::exception& ex) {
1383       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1384                               string("ReadCallback::getReadBuffer() "
1385                                      "threw exception: ") +
1386                               ex.what());
1387       return failRead(__func__, tex);
1388     } catch (...) {
1389       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1390                              "ReadCallback::getReadBuffer() threw "
1391                              "non-exception type");
1392       return failRead(__func__, ex);
1393     }
1394     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1395       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1396                              "ReadCallback::getReadBuffer() returned "
1397                              "empty buffer");
1398       return failRead(__func__, ex);
1399     }
1400
1401     // Perform the read
1402     auto readResult = performRead(&buf, &buflen, &offset);
1403     auto bytesRead = readResult.readReturn;
1404     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1405             << bytesRead << " bytes";
1406     if (bytesRead > 0) {
1407       if (!isBufferMovable_) {
1408         readCallback_->readDataAvailable(bytesRead);
1409       } else {
1410         CHECK(kOpenSslModeMoveBufferOwnership);
1411         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1412                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1413                 << ", offset=" << offset;
1414         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1415         readBuf->trimStart(offset);
1416         readBuf->trimEnd(buflen - offset - bytesRead);
1417         readCallback_->readBufferAvailable(std::move(readBuf));
1418       }
1419
1420       // Fall through and continue around the loop if the read
1421       // completely filled the available buffer.
1422       // Note that readCallback_ may have been uninstalled or changed inside
1423       // readDataAvailable().
1424       if (size_t(bytesRead) < buflen) {
1425         return;
1426       }
1427     } else if (bytesRead == READ_BLOCKING) {
1428         // No more data to read right now.
1429         return;
1430     } else if (bytesRead == READ_ERROR) {
1431       readErr_ = READ_ERROR;
1432       if (readResult.exception) {
1433         return failRead(__func__, *readResult.exception);
1434       }
1435       auto errnoCopy = errno;
1436       AsyncSocketException ex(
1437           AsyncSocketException::INTERNAL_ERROR,
1438           withAddr("recv() failed"),
1439           errnoCopy);
1440       return failRead(__func__, ex);
1441     } else {
1442       assert(bytesRead == READ_EOF);
1443       readErr_ = READ_EOF;
1444       // EOF
1445       shutdownFlags_ |= SHUT_READ;
1446       if (!updateEventRegistration(0, EventHandler::READ)) {
1447         // we've already been moved into STATE_ERROR
1448         assert(state_ == StateEnum::ERROR);
1449         assert(readCallback_ == nullptr);
1450         return;
1451       }
1452
1453       ReadCallback* callback = readCallback_;
1454       readCallback_ = nullptr;
1455       callback->readEOF();
1456       return;
1457     }
1458     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1459       if (readCallback_ != nullptr) {
1460         // We might still have data in the socket.
1461         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1462         scheduleImmediateRead();
1463       }
1464       return;
1465     }
1466   }
1467 }
1468
1469 /**
1470  * This function attempts to write as much data as possible, until no more data
1471  * can be written.
1472  *
1473  * - If it sends all available data, it unregisters for write events, and stops
1474  *   the writeTimeout_.
1475  *
1476  * - If not all of the data can be sent immediately, it reschedules
1477  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1478  *   registered for write events.
1479  */
1480 void AsyncSocket::handleWrite() noexcept {
1481   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1482           << ", state=" << state_;
1483   DestructorGuard dg(this);
1484
1485   if (state_ == StateEnum::CONNECTING) {
1486     handleConnect();
1487     return;
1488   }
1489
1490   // Normal write
1491   assert(state_ == StateEnum::ESTABLISHED);
1492   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1493   assert(writeReqHead_ != nullptr);
1494
1495   // Loop until we run out of write requests,
1496   // or until this socket is moved to another EventBase.
1497   // (See the comment in handleRead() explaining how this can happen.)
1498   EventBase* originalEventBase = eventBase_;
1499   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1500     auto writeResult = writeReqHead_->performWrite();
1501     if (writeResult.writeReturn < 0) {
1502       if (writeResult.exception) {
1503         return failWrite(__func__, *writeResult.exception);
1504       }
1505       auto errnoCopy = errno;
1506       AsyncSocketException ex(
1507           AsyncSocketException::INTERNAL_ERROR,
1508           withAddr("writev() failed"),
1509           errnoCopy);
1510       return failWrite(__func__, ex);
1511     } else if (writeReqHead_->isComplete()) {
1512       // We finished this request
1513       WriteRequest* req = writeReqHead_;
1514       writeReqHead_ = req->getNext();
1515
1516       if (writeReqHead_ == nullptr) {
1517         writeReqTail_ = nullptr;
1518         // This is the last write request.
1519         // Unregister for write events and cancel the send timer
1520         // before we invoke the callback.  We have to update the state properly
1521         // before calling the callback, since it may want to detach us from
1522         // the EventBase.
1523         if (eventFlags_ & EventHandler::WRITE) {
1524           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1525             assert(state_ == StateEnum::ERROR);
1526             return;
1527           }
1528           // Stop the send timeout
1529           writeTimeout_.cancelTimeout();
1530         }
1531         assert(!writeTimeout_.isScheduled());
1532
1533         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1534         // we finish sending the last write request.
1535         //
1536         // We have to do this before invoking writeSuccess(), since
1537         // writeSuccess() may detach us from our EventBase.
1538         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1539           assert(connectCallback_ == nullptr);
1540           shutdownFlags_ |= SHUT_WRITE;
1541
1542           if (shutdownFlags_ & SHUT_READ) {
1543             // Reads have already been shutdown.  Fully close the socket and
1544             // move to STATE_CLOSED.
1545             //
1546             // Note: This code currently moves us to STATE_CLOSED even if
1547             // close() hasn't ever been called.  This can occur if we have
1548             // received EOF from the peer and shutdownWrite() has been called
1549             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1550             // case, until close() is actually called?  I can't think of a
1551             // reason why we would need to do so.  No other operations besides
1552             // calling close() or destroying the socket can be performed at
1553             // this point.
1554             assert(readCallback_ == nullptr);
1555             state_ = StateEnum::CLOSED;
1556             if (fd_ >= 0) {
1557               ioHandler_.changeHandlerFD(-1);
1558               doClose();
1559             }
1560           } else {
1561             // Reads are still enabled, so we are only doing a half-shutdown
1562             shutdown(fd_, SHUT_WR);
1563           }
1564         }
1565       }
1566
1567       // Invoke the callback
1568       WriteCallback* callback = req->getCallback();
1569       req->destroy();
1570       if (callback) {
1571         callback->writeSuccess();
1572       }
1573       // We'll continue around the loop, trying to write another request
1574     } else {
1575       // Partial write.
1576       if (bufferCallback_) {
1577         bufferCallback_->onEgressBuffered();
1578       }
1579       writeReqHead_->consume();
1580       // Stop after a partial write; it's highly likely that a subsequent write
1581       // attempt will just return EAGAIN.
1582       //
1583       // Ensure that we are registered for write events.
1584       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1585         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1586           assert(state_ == StateEnum::ERROR);
1587           return;
1588         }
1589       }
1590
1591       // Reschedule the send timeout, since we have made some write progress.
1592       if (sendTimeout_ > 0) {
1593         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1594           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1595               withAddr("failed to reschedule write timeout"));
1596           return failWrite(__func__, ex);
1597         }
1598       }
1599       return;
1600     }
1601   }
1602   if (!writeReqHead_ && bufferCallback_) {
1603     bufferCallback_->onEgressBufferCleared();
1604   }
1605 }
1606
1607 void AsyncSocket::checkForImmediateRead() noexcept {
1608   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1609   // (However, note that some subclasses do override this method.)
1610   //
1611   // Simply calling handleRead() here would be bad, as this would call
1612   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1613   // buffer even though no data may be available.  This would waste lots of
1614   // memory, since the buffer will sit around unused until the socket actually
1615   // becomes readable.
1616   //
1617   // Checking if the socket is readable now also seems like it would probably
1618   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1619   // would just waste an extra system call.  Even if it is readable, waiting to
1620   // find out from libevent on the next event loop doesn't seem that bad.
1621 }
1622
1623 void AsyncSocket::handleInitialReadWrite() noexcept {
1624   // Our callers should already be holding a DestructorGuard, but grab
1625   // one here just to make sure, in case one of our calling code paths ever
1626   // changes.
1627   DestructorGuard dg(this);
1628   // If we have a readCallback_, make sure we enable read events.  We
1629   // may already be registered for reads if connectSuccess() set
1630   // the read calback.
1631   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1632     assert(state_ == StateEnum::ESTABLISHED);
1633     assert((shutdownFlags_ & SHUT_READ) == 0);
1634     if (!updateEventRegistration(EventHandler::READ, 0)) {
1635       assert(state_ == StateEnum::ERROR);
1636       return;
1637     }
1638     checkForImmediateRead();
1639   } else if (readCallback_ == nullptr) {
1640     // Unregister for read events.
1641     updateEventRegistration(0, EventHandler::READ);
1642   }
1643
1644   // If we have write requests pending, try to send them immediately.
1645   // Since we just finished accepting, there is a very good chance that we can
1646   // write without blocking.
1647   //
1648   // However, we only process them if EventHandler::WRITE is not already set,
1649   // which means that we're already blocked on a write attempt.  (This can
1650   // happen if connectSuccess() called write() before returning.)
1651   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1652     // Call handleWrite() to perform write processing.
1653     handleWrite();
1654   } else if (writeReqHead_ == nullptr) {
1655     // Unregister for write event.
1656     updateEventRegistration(0, EventHandler::WRITE);
1657   }
1658 }
1659
1660 void AsyncSocket::handleConnect() noexcept {
1661   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1662           << ", state=" << state_;
1663   assert(state_ == StateEnum::CONNECTING);
1664   // SHUT_WRITE can never be set while we are still connecting;
1665   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1666   // finishes
1667   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1668
1669   // In case we had a connect timeout, cancel the timeout
1670   writeTimeout_.cancelTimeout();
1671   // We don't use a persistent registration when waiting on a connect event,
1672   // so we have been automatically unregistered now.  Update eventFlags_ to
1673   // reflect reality.
1674   assert(eventFlags_ == EventHandler::WRITE);
1675   eventFlags_ = EventHandler::NONE;
1676
1677   // Call getsockopt() to check if the connect succeeded
1678   int error;
1679   socklen_t len = sizeof(error);
1680   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1681   if (rv != 0) {
1682     auto errnoCopy = errno;
1683     AsyncSocketException ex(
1684         AsyncSocketException::INTERNAL_ERROR,
1685         withAddr("error calling getsockopt() after connect"),
1686         errnoCopy);
1687     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1688                << fd_ << " host=" << addr_.describe()
1689                << ") exception:" << ex.what();
1690     return failConnect(__func__, ex);
1691   }
1692
1693   if (error != 0) {
1694     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1695                            "connect failed", error);
1696     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1697             << fd_ << " host=" << addr_.describe()
1698             << ") exception: " << ex.what();
1699     return failConnect(__func__, ex);
1700   }
1701
1702   // Move into STATE_ESTABLISHED
1703   state_ = StateEnum::ESTABLISHED;
1704
1705   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1706   // perform, immediately shutdown the write half of the socket.
1707   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1708     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1709     // are still connecting we just abort the connect rather than waiting for
1710     // it to complete.
1711     assert((shutdownFlags_ & SHUT_READ) == 0);
1712     shutdown(fd_, SHUT_WR);
1713     shutdownFlags_ |= SHUT_WRITE;
1714   }
1715
1716   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1717           << "successfully connected; state=" << state_;
1718
1719   // Remember the EventBase we are attached to, before we start invoking any
1720   // callbacks (since the callbacks may call detachEventBase()).
1721   EventBase* originalEventBase = eventBase_;
1722
1723   invokeConnectSuccess();
1724   // Note that the connect callback may have changed our state.
1725   // (set or unset the read callback, called write(), closed the socket, etc.)
1726   // The following code needs to handle these situations correctly.
1727   //
1728   // If the socket has been closed, readCallback_ and writeReqHead_ will
1729   // always be nullptr, so that will prevent us from trying to read or write.
1730   //
1731   // The main thing to check for is if eventBase_ is still originalEventBase.
1732   // If not, we have been detached from this event base, so we shouldn't
1733   // perform any more operations.
1734   if (eventBase_ != originalEventBase) {
1735     return;
1736   }
1737
1738   handleInitialReadWrite();
1739 }
1740
1741 void AsyncSocket::timeoutExpired() noexcept {
1742   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1743           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1744   DestructorGuard dg(this);
1745   assert(eventBase_->isInEventBaseThread());
1746
1747   if (state_ == StateEnum::CONNECTING) {
1748     // connect() timed out
1749     // Unregister for I/O events.
1750     if (connectCallback_) {
1751       AsyncSocketException ex(
1752           AsyncSocketException::TIMED_OUT, "connect timed out");
1753       failConnect(__func__, ex);
1754     } else {
1755       // we faced a connect error without a connect callback, which could
1756       // happen due to TFO.
1757       AsyncSocketException ex(
1758           AsyncSocketException::TIMED_OUT, "write timed out during connection");
1759       failWrite(__func__, ex);
1760     }
1761   } else {
1762     // a normal write operation timed out
1763     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1764     failWrite(__func__, ex);
1765   }
1766 }
1767
1768 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
1769   return detail::tfo_sendmsg(fd, msg, msg_flags);
1770 }
1771
1772 AsyncSocket::WriteResult
1773 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
1774   ssize_t totalWritten = 0;
1775   if (state_ == StateEnum::FAST_OPEN) {
1776     sockaddr_storage addr;
1777     auto len = addr_.getAddress(&addr);
1778     msg->msg_name = &addr;
1779     msg->msg_namelen = len;
1780     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
1781     if (totalWritten >= 0) {
1782       tfoFinished_ = true;
1783       state_ = StateEnum::ESTABLISHED;
1784       // We schedule this asynchrously so that we don't end up
1785       // invoking initial read or write while a write is in progress.
1786       scheduleInitialReadWrite();
1787     } else if (errno == EINPROGRESS) {
1788       VLOG(4) << "TFO falling back to connecting";
1789       // A normal sendmsg doesn't return EINPROGRESS, however
1790       // TFO might fallback to connecting if there is no
1791       // cookie.
1792       state_ = StateEnum::CONNECTING;
1793       try {
1794         scheduleConnectTimeout();
1795         registerForConnectEvents();
1796       } catch (const AsyncSocketException& ex) {
1797         return WriteResult(
1798             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1799       }
1800       // Let's fake it that no bytes were written and return an errno.
1801       errno = EAGAIN;
1802       totalWritten = -1;
1803     } else if (errno == EOPNOTSUPP) {
1804       // Try falling back to connecting.
1805       VLOG(4) << "TFO not supported";
1806       state_ = StateEnum::CONNECTING;
1807       try {
1808         int ret = socketConnect((const sockaddr*)&addr, len);
1809         if (ret == 0) {
1810           // connect succeeded immediately
1811           // Treat this like no data was written.
1812           state_ = StateEnum::ESTABLISHED;
1813           scheduleInitialReadWrite();
1814         }
1815         // If there was no exception during connections,
1816         // we would return that no bytes were written.
1817         errno = EAGAIN;
1818         totalWritten = -1;
1819       } catch (const AsyncSocketException& ex) {
1820         return WriteResult(
1821             WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
1822       }
1823     } else if (errno == EAGAIN) {
1824       // Normally sendmsg would indicate that the write would block.
1825       // However in the fast open case, it would indicate that sendmsg
1826       // fell back to a connect. This is a return code from connect()
1827       // instead, and is an error condition indicating no fds available.
1828       return WriteResult(
1829           WRITE_ERROR,
1830           folly::make_unique<AsyncSocketException>(
1831               AsyncSocketException::UNKNOWN, "No more free local ports"));
1832     }
1833   } else {
1834     totalWritten = ::sendmsg(fd, msg, msg_flags);
1835   }
1836   return WriteResult(totalWritten);
1837 }
1838
1839 AsyncSocket::WriteResult AsyncSocket::performWrite(
1840     const iovec* vec,
1841     uint32_t count,
1842     WriteFlags flags,
1843     uint32_t* countWritten,
1844     uint32_t* partialWritten) {
1845   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1846   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1847   // (since it may terminate the program if the main program doesn't explicitly
1848   // ignore it).
1849   struct msghdr msg;
1850   msg.msg_name = nullptr;
1851   msg.msg_namelen = 0;
1852   msg.msg_iov = const_cast<iovec *>(vec);
1853   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1854   msg.msg_control = nullptr;
1855   msg.msg_controllen = 0;
1856   msg.msg_flags = 0;
1857
1858   int msg_flags = MSG_DONTWAIT;
1859
1860 #ifdef MSG_NOSIGNAL // Linux-only
1861   msg_flags |= MSG_NOSIGNAL;
1862   if (isSet(flags, WriteFlags::CORK)) {
1863     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1864     // give it the rest of the data rather than immediately sending a partial
1865     // frame, even when TCP_NODELAY is enabled.
1866     msg_flags |= MSG_MORE;
1867   }
1868 #endif
1869   if (isSet(flags, WriteFlags::EOR)) {
1870     // marks that this is the last byte of a record (response)
1871     msg_flags |= MSG_EOR;
1872   }
1873   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
1874   auto totalWritten = writeResult.writeReturn;
1875   if (totalWritten < 0) {
1876     bool tryAgain = (errno == EAGAIN);
1877 #ifdef __APPLE__
1878     // Apple has a bug where doing a second write on a socket which we
1879     // have opened with TFO causes an ENOTCONN to be thrown. However the
1880     // socket is really connected, so treat ENOTCONN as a EAGAIN until
1881     // this bug is fixed.
1882     tryAgain |= (errno == ENOTCONN);
1883 #endif
1884     if (!writeResult.exception && tryAgain) {
1885       // TCP buffer is full; we can't write any more data right now.
1886       *countWritten = 0;
1887       *partialWritten = 0;
1888       return WriteResult(0);
1889     }
1890     // error
1891     *countWritten = 0;
1892     *partialWritten = 0;
1893     return writeResult;
1894   }
1895
1896   appBytesWritten_ += totalWritten;
1897
1898   uint32_t bytesWritten;
1899   uint32_t n;
1900   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1901     const iovec* v = vec + n;
1902     if (v->iov_len > bytesWritten) {
1903       // Partial write finished in the middle of this iovec
1904       *countWritten = n;
1905       *partialWritten = bytesWritten;
1906       return WriteResult(totalWritten);
1907     }
1908
1909     bytesWritten -= v->iov_len;
1910   }
1911
1912   assert(bytesWritten == 0);
1913   *countWritten = n;
1914   *partialWritten = 0;
1915   return WriteResult(totalWritten);
1916 }
1917
1918 /**
1919  * Re-register the EventHandler after eventFlags_ has changed.
1920  *
1921  * If an error occurs, fail() is called to move the socket into the error state
1922  * and call all currently installed callbacks.  After an error, the
1923  * AsyncSocket is completely unregistered.
1924  *
1925  * @return Returns true on succcess, or false on error.
1926  */
1927 bool AsyncSocket::updateEventRegistration() {
1928   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1929           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1930           << ", events=" << std::hex << eventFlags_;
1931   assert(eventBase_->isInEventBaseThread());
1932   if (eventFlags_ == EventHandler::NONE) {
1933     ioHandler_.unregisterHandler();
1934     return true;
1935   }
1936
1937   // Always register for persistent events, so we don't have to re-register
1938   // after being called back.
1939   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1940     eventFlags_ = EventHandler::NONE; // we're not registered after error
1941     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1942         withAddr("failed to update AsyncSocket event registration"));
1943     fail("updateEventRegistration", ex);
1944     return false;
1945   }
1946
1947   return true;
1948 }
1949
1950 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1951                                            uint16_t disable) {
1952   uint16_t oldFlags = eventFlags_;
1953   eventFlags_ |= enable;
1954   eventFlags_ &= ~disable;
1955   if (eventFlags_ == oldFlags) {
1956     return true;
1957   } else {
1958     return updateEventRegistration();
1959   }
1960 }
1961
1962 void AsyncSocket::startFail() {
1963   // startFail() should only be called once
1964   assert(state_ != StateEnum::ERROR);
1965   assert(getDestructorGuardCount() > 0);
1966   state_ = StateEnum::ERROR;
1967   // Ensure that SHUT_READ and SHUT_WRITE are set,
1968   // so all future attempts to read or write will be rejected
1969   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1970
1971   if (eventFlags_ != EventHandler::NONE) {
1972     eventFlags_ = EventHandler::NONE;
1973     ioHandler_.unregisterHandler();
1974   }
1975   writeTimeout_.cancelTimeout();
1976
1977   if (fd_ >= 0) {
1978     ioHandler_.changeHandlerFD(-1);
1979     doClose();
1980   }
1981 }
1982
1983 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
1984   invokeConnectErr(ex);
1985   failAllWrites(ex);
1986
1987   if (readCallback_) {
1988     ReadCallback* callback = readCallback_;
1989     readCallback_ = nullptr;
1990     callback->readErr(ex);
1991   }
1992 }
1993
1994 void AsyncSocket::finishFail() {
1995   assert(state_ == StateEnum::ERROR);
1996   assert(getDestructorGuardCount() > 0);
1997
1998   AsyncSocketException ex(
1999       AsyncSocketException::INTERNAL_ERROR,
2000       withAddr("socket closing after error"));
2001   invokeAllErrors(ex);
2002 }
2003
2004 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2005   assert(state_ == StateEnum::ERROR);
2006   assert(getDestructorGuardCount() > 0);
2007   invokeAllErrors(ex);
2008 }
2009
2010 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2011   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2012              << state_ << " host=" << addr_.describe()
2013              << "): failed in " << fn << "(): "
2014              << ex.what();
2015   startFail();
2016   finishFail();
2017 }
2018
2019 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2020   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2021                << state_ << " host=" << addr_.describe()
2022                << "): failed while connecting in " << fn << "(): "
2023                << ex.what();
2024   startFail();
2025
2026   invokeConnectErr(ex);
2027   finishFail(ex);
2028 }
2029
2030 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2031   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2032                << state_ << " host=" << addr_.describe()
2033                << "): failed while reading in " << fn << "(): "
2034                << ex.what();
2035   startFail();
2036
2037   if (readCallback_ != nullptr) {
2038     ReadCallback* callback = readCallback_;
2039     readCallback_ = nullptr;
2040     callback->readErr(ex);
2041   }
2042
2043   finishFail();
2044 }
2045
2046 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2047   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2048                << state_ << " host=" << addr_.describe()
2049                << "): failed while writing in " << fn << "(): "
2050                << ex.what();
2051   startFail();
2052
2053   // Only invoke the first write callback, since the error occurred while
2054   // writing this request.  Let any other pending write callbacks be invoked in
2055   // finishFail().
2056   if (writeReqHead_ != nullptr) {
2057     WriteRequest* req = writeReqHead_;
2058     writeReqHead_ = req->getNext();
2059     WriteCallback* callback = req->getCallback();
2060     uint32_t bytesWritten = req->getTotalBytesWritten();
2061     req->destroy();
2062     if (callback) {
2063       callback->writeErr(bytesWritten, ex);
2064     }
2065   }
2066
2067   finishFail();
2068 }
2069
2070 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2071                              size_t bytesWritten,
2072                              const AsyncSocketException& ex) {
2073   // This version of failWrite() is used when the failure occurs before
2074   // we've added the callback to writeReqHead_.
2075   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2076              << state_ << " host=" << addr_.describe()
2077              <<"): failed while writing in " << fn << "(): "
2078              << ex.what();
2079   startFail();
2080
2081   if (callback != nullptr) {
2082     callback->writeErr(bytesWritten, ex);
2083   }
2084
2085   finishFail();
2086 }
2087
2088 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2089   // Invoke writeError() on all write callbacks.
2090   // This is used when writes are forcibly shutdown with write requests
2091   // pending, or when an error occurs with writes pending.
2092   while (writeReqHead_ != nullptr) {
2093     WriteRequest* req = writeReqHead_;
2094     writeReqHead_ = req->getNext();
2095     WriteCallback* callback = req->getCallback();
2096     if (callback) {
2097       callback->writeErr(req->getTotalBytesWritten(), ex);
2098     }
2099     req->destroy();
2100   }
2101 }
2102
2103 void AsyncSocket::invalidState(ConnectCallback* callback) {
2104   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2105              << "): connect() called in invalid state " << state_;
2106
2107   /*
2108    * The invalidState() methods don't use the normal failure mechanisms,
2109    * since we don't know what state we are in.  We don't want to call
2110    * startFail()/finishFail() recursively if we are already in the middle of
2111    * cleaning up.
2112    */
2113
2114   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2115                          "connect() called with socket in invalid state");
2116   connectEndTime_ = std::chrono::steady_clock::now();
2117   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2118     if (callback) {
2119       callback->connectErr(ex);
2120     }
2121   } else {
2122     // We can't use failConnect() here since connectCallback_
2123     // may already be set to another callback.  Invoke this ConnectCallback
2124     // here; any other connectCallback_ will be invoked in finishFail()
2125     startFail();
2126     if (callback) {
2127       callback->connectErr(ex);
2128     }
2129     finishFail();
2130   }
2131 }
2132
2133 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2134   connectEndTime_ = std::chrono::steady_clock::now();
2135   if (connectCallback_) {
2136     ConnectCallback* callback = connectCallback_;
2137     connectCallback_ = nullptr;
2138     callback->connectErr(ex);
2139   }
2140 }
2141
2142 void AsyncSocket::invokeConnectSuccess() {
2143   connectEndTime_ = std::chrono::steady_clock::now();
2144   if (connectCallback_) {
2145     ConnectCallback* callback = connectCallback_;
2146     connectCallback_ = nullptr;
2147     callback->connectSuccess();
2148   }
2149 }
2150
2151 void AsyncSocket::invalidState(ReadCallback* callback) {
2152   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2153              << "): setReadCallback(" << callback
2154              << ") called in invalid state " << state_;
2155
2156   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2157                          "setReadCallback() called with socket in "
2158                          "invalid state");
2159   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2160     if (callback) {
2161       callback->readErr(ex);
2162     }
2163   } else {
2164     startFail();
2165     if (callback) {
2166       callback->readErr(ex);
2167     }
2168     finishFail();
2169   }
2170 }
2171
2172 void AsyncSocket::invalidState(WriteCallback* callback) {
2173   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2174              << "): write() called in invalid state " << state_;
2175
2176   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2177                          withAddr("write() called with socket in invalid state"));
2178   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2179     if (callback) {
2180       callback->writeErr(0, ex);
2181     }
2182   } else {
2183     startFail();
2184     if (callback) {
2185       callback->writeErr(0, ex);
2186     }
2187     finishFail();
2188   }
2189 }
2190
2191 void AsyncSocket::doClose() {
2192   if (fd_ == -1) return;
2193   if (shutdownSocketSet_) {
2194     shutdownSocketSet_->close(fd_);
2195   } else {
2196     ::close(fd_);
2197   }
2198   fd_ = -1;
2199 }
2200
2201 std::ostream& operator << (std::ostream& os,
2202                            const AsyncSocket::StateEnum& state) {
2203   os << static_cast<int>(state);
2204   return os;
2205 }
2206
2207 std::string AsyncSocket::withAddr(const std::string& s) {
2208   // Don't use addr_ directly because it may not be initialized
2209   // e.g. if constructed from fd
2210   folly::SocketAddress peer, local;
2211   try {
2212     getPeerAddress(&peer);
2213     getLocalAddress(&local);
2214   } catch (const std::exception&) {
2215     // ignore
2216   } catch (...) {
2217     // ignore
2218   }
2219   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2220 }
2221
2222 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2223   bufferCallback_ = cb;
2224 }
2225
2226 } // folly