Return if we handle any error messages to avoid unnecessarily calling recv/send
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncSocket.h>
17
18 #include <folly/ExceptionWrapper.h>
19 #include <folly/Format.h>
20 #include <folly/Portability.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/Cursor.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25 #include <folly/portability/Fcntl.h>
26 #include <folly/portability/Sockets.h>
27 #include <folly/portability/SysUio.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <boost/preprocessor/control/if.hpp>
31 #include <errno.h>
32 #include <limits.h>
33 #include <sys/types.h>
34 #include <thread>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace fsp = folly::portability::sockets;
40
41 namespace folly {
42
43 static constexpr bool msgErrQueueSupported =
44 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
45     true;
46 #else
47     false;
48 #endif // FOLLY_HAVE_MSG_ERRQUEUE
49
50 // static members initializers
51 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
52
53 const AsyncSocketException socketClosedLocallyEx(
54     AsyncSocketException::END_OF_FILE, "socket closed locally");
55 const AsyncSocketException socketShutdownForWritesEx(
56     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
57
58 // TODO: It might help performance to provide a version of BytesWriteRequest that
59 // users could derive from, so we can avoid the extra allocation for each call
60 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
61 // protocols are currently templatized for transports.
62 //
63 // We would need the version for external users where they provide the iovec
64 // storage space, and only our internal version would allocate it at the end of
65 // the WriteRequest.
66
67 /* The default WriteRequest implementation, used for write(), writev() and
68  * writeChain()
69  *
70  * A new BytesWriteRequest operation is allocated on the heap for all write
71  * operations that cannot be completed immediately.
72  */
73 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
74  public:
75   static BytesWriteRequest* newRequest(AsyncSocket* socket,
76                                        WriteCallback* callback,
77                                        const iovec* ops,
78                                        uint32_t opCount,
79                                        uint32_t partialWritten,
80                                        uint32_t bytesWritten,
81                                        unique_ptr<IOBuf>&& ioBuf,
82                                        WriteFlags flags) {
83     assert(opCount > 0);
84     // Since we put a variable size iovec array at the end
85     // of each BytesWriteRequest, we have to manually allocate the memory.
86     void* buf = malloc(sizeof(BytesWriteRequest) +
87                        (opCount * sizeof(struct iovec)));
88     if (buf == nullptr) {
89       throw std::bad_alloc();
90     }
91
92     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
93                                       partialWritten, bytesWritten,
94                                       std::move(ioBuf), flags);
95   }
96
97   void destroy() override {
98     this->~BytesWriteRequest();
99     free(this);
100   }
101
102   WriteResult performWrite() override {
103     WriteFlags writeFlags = flags_;
104     if (getNext() != nullptr) {
105       writeFlags |= WriteFlags::CORK;
106     }
107
108     socket_->adjustZeroCopyFlags(writeFlags);
109
110     auto writeResult = socket_->performWrite(
111         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
112     bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
113     if (bytesWritten_) {
114       if (socket_->isZeroCopyRequest(writeFlags)) {
115         if (isComplete()) {
116           socket_->addZeroCopyBuf(std::move(ioBuf_));
117         } else {
118           socket_->addZeroCopyBuf(ioBuf_.get());
119         }
120       } else {
121         // this happens if at least one of the prev requests were sent
122         // with zero copy but not the last one
123         if (isComplete() && socket_->getZeroCopy() &&
124             socket_->containsZeroCopyBuf(ioBuf_.get())) {
125           socket_->setZeroCopyBuf(std::move(ioBuf_));
126         }
127       }
128     }
129     return writeResult;
130   }
131
132   bool isComplete() override {
133     return opsWritten_ == getOpCount();
134   }
135
136   void consume() override {
137     // Advance opIndex_ forward by opsWritten_
138     opIndex_ += opsWritten_;
139     assert(opIndex_ < opCount_);
140
141     if (!socket_->isZeroCopyRequest(flags_)) {
142       // If we've finished writing any IOBufs, release them
143       if (ioBuf_) {
144         for (uint32_t i = opsWritten_; i != 0; --i) {
145           assert(ioBuf_);
146           ioBuf_ = ioBuf_->pop();
147         }
148       }
149     }
150
151     // Move partialBytes_ forward into the current iovec buffer
152     struct iovec* currentOp = writeOps_ + opIndex_;
153     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
154     currentOp->iov_base =
155       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
156     currentOp->iov_len -= partialBytes_;
157
158     // Increment the totalBytesWritten_ count by bytesWritten_;
159     assert(bytesWritten_ >= 0);
160     totalBytesWritten_ += uint32_t(bytesWritten_);
161   }
162
163  private:
164   BytesWriteRequest(AsyncSocket* socket,
165                     WriteCallback* callback,
166                     const struct iovec* ops,
167                     uint32_t opCount,
168                     uint32_t partialBytes,
169                     uint32_t bytesWritten,
170                     unique_ptr<IOBuf>&& ioBuf,
171                     WriteFlags flags)
172     : AsyncSocket::WriteRequest(socket, callback)
173     , opCount_(opCount)
174     , opIndex_(0)
175     , flags_(flags)
176     , ioBuf_(std::move(ioBuf))
177     , opsWritten_(0)
178     , partialBytes_(partialBytes)
179     , bytesWritten_(bytesWritten) {
180     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
181   }
182
183   // private destructor, to ensure callers use destroy()
184   ~BytesWriteRequest() override = default;
185
186   const struct iovec* getOps() const {
187     assert(opCount_ > opIndex_);
188     return writeOps_ + opIndex_;
189   }
190
191   uint32_t getOpCount() const {
192     assert(opCount_ > opIndex_);
193     return opCount_ - opIndex_;
194   }
195
196   uint32_t opCount_;            ///< number of entries in writeOps_
197   uint32_t opIndex_;            ///< current index into writeOps_
198   WriteFlags flags_;            ///< set for WriteFlags
199   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
200
201   // for consume(), how much we wrote on the last write
202   uint32_t opsWritten_;         ///< complete ops written
203   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
204   ssize_t bytesWritten_;        ///< bytes written altogether
205
206   struct iovec writeOps_[];     ///< write operation(s) list
207 };
208
209 int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
210     folly::WriteFlags flags,
211     bool zeroCopyEnabled) noexcept {
212   int msg_flags = MSG_DONTWAIT;
213
214 #ifdef MSG_NOSIGNAL // Linux-only
215   msg_flags |= MSG_NOSIGNAL;
216 #ifdef MSG_MORE
217   if (isSet(flags, WriteFlags::CORK)) {
218     // MSG_MORE tells the kernel we have more data to send, so wait for us to
219     // give it the rest of the data rather than immediately sending a partial
220     // frame, even when TCP_NODELAY is enabled.
221     msg_flags |= MSG_MORE;
222   }
223 #endif // MSG_MORE
224 #endif // MSG_NOSIGNAL
225   if (isSet(flags, WriteFlags::EOR)) {
226     // marks that this is the last byte of a record (response)
227     msg_flags |= MSG_EOR;
228   }
229
230   if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
231     msg_flags |= MSG_ZEROCOPY;
232   }
233
234   return msg_flags;
235 }
236
237 namespace {
238 static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
239 } // namespace
240
241 AsyncSocket::AsyncSocket()
242     : eventBase_(nullptr),
243       writeTimeout_(this, nullptr),
244       ioHandler_(this, nullptr),
245       immediateReadHandler_(this) {
246   VLOG(5) << "new AsyncSocket()";
247   init();
248 }
249
250 AsyncSocket::AsyncSocket(EventBase* evb)
251     : eventBase_(evb),
252       writeTimeout_(this, evb),
253       ioHandler_(this, evb),
254       immediateReadHandler_(this) {
255   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
256   init();
257 }
258
259 AsyncSocket::AsyncSocket(EventBase* evb,
260                            const folly::SocketAddress& address,
261                            uint32_t connectTimeout)
262   : AsyncSocket(evb) {
263   connect(nullptr, address, connectTimeout);
264 }
265
266 AsyncSocket::AsyncSocket(EventBase* evb,
267                            const std::string& ip,
268                            uint16_t port,
269                            uint32_t connectTimeout)
270   : AsyncSocket(evb) {
271   connect(nullptr, ip, port, connectTimeout);
272 }
273
274 AsyncSocket::AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId)
275     : zeroCopyBufId_(zeroCopyBufId),
276       eventBase_(evb),
277       writeTimeout_(this, evb),
278       ioHandler_(this, evb, fd),
279       immediateReadHandler_(this) {
280   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" << fd
281           << ", zeroCopyBufId=" << zeroCopyBufId << ")";
282   init();
283   fd_ = fd;
284   setCloseOnExec();
285   state_ = StateEnum::ESTABLISHED;
286 }
287
288 AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
289     : AsyncSocket(
290           oldAsyncSocket->getEventBase(),
291           oldAsyncSocket->detachFd(),
292           oldAsyncSocket->getZeroCopyBufId()) {
293   preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
294 }
295
296 // init() method, since constructor forwarding isn't supported in most
297 // compilers yet.
298 void AsyncSocket::init() {
299   if (eventBase_) {
300     eventBase_->dcheckIsInEventBaseThread();
301   }
302   shutdownFlags_ = 0;
303   state_ = StateEnum::UNINIT;
304   eventFlags_ = EventHandler::NONE;
305   fd_ = -1;
306   sendTimeout_ = 0;
307   maxReadsPerEvent_ = 16;
308   connectCallback_ = nullptr;
309   errMessageCallback_ = nullptr;
310   readCallback_ = nullptr;
311   writeReqHead_ = nullptr;
312   writeReqTail_ = nullptr;
313   wShutdownSocketSet_.reset();
314   appBytesWritten_ = 0;
315   appBytesReceived_ = 0;
316   sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
317 }
318
319 AsyncSocket::~AsyncSocket() {
320   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
321           << ", evb=" << eventBase_ << ", fd=" << fd_
322           << ", state=" << state_ << ")";
323 }
324
325 void AsyncSocket::destroy() {
326   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
327           << ", fd=" << fd_ << ", state=" << state_;
328   // When destroy is called, close the socket immediately
329   closeNow();
330
331   // Then call DelayedDestruction::destroy() to take care of
332   // whether or not we need immediate or delayed destruction
333   DelayedDestruction::destroy();
334 }
335
336 int AsyncSocket::detachFd() {
337   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
338           << ", evb=" << eventBase_ << ", state=" << state_
339           << ", events=" << std::hex << eventFlags_ << ")";
340   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
341   // actually close the descriptor.
342   if (const auto socketSet = wShutdownSocketSet_.lock()) {
343     socketSet->remove(fd_);
344   }
345   int fd = fd_;
346   fd_ = -1;
347   // Call closeNow() to invoke all pending callbacks with an error.
348   closeNow();
349   // Update the EventHandler to stop using this fd.
350   // This can only be done after closeNow() unregisters the handler.
351   ioHandler_.changeHandlerFD(-1);
352   return fd;
353 }
354
355 const folly::SocketAddress& AsyncSocket::anyAddress() {
356   static const folly::SocketAddress anyAddress =
357     folly::SocketAddress("0.0.0.0", 0);
358   return anyAddress;
359 }
360
361 void AsyncSocket::setShutdownSocketSet(
362     const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
363   const auto newSS = wNewSS.lock();
364   const auto shutdownSocketSet = wShutdownSocketSet_.lock();
365
366   if (newSS == shutdownSocketSet) {
367     return;
368   }
369
370   if (shutdownSocketSet && fd_ != -1) {
371     shutdownSocketSet->remove(fd_);
372   }
373
374   if (newSS && fd_ != -1) {
375     newSS->add(fd_);
376   }
377
378   wShutdownSocketSet_ = wNewSS;
379 }
380
381 void AsyncSocket::setCloseOnExec() {
382   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
383   if (rv != 0) {
384     auto errnoCopy = errno;
385     throw AsyncSocketException(
386         AsyncSocketException::INTERNAL_ERROR,
387         withAddr("failed to set close-on-exec flag"),
388         errnoCopy);
389   }
390 }
391
392 void AsyncSocket::connect(ConnectCallback* callback,
393                            const folly::SocketAddress& address,
394                            int timeout,
395                            const OptionMap &options,
396                            const folly::SocketAddress& bindAddr) noexcept {
397   DestructorGuard dg(this);
398   eventBase_->dcheckIsInEventBaseThread();
399
400   addr_ = address;
401
402   // Make sure we're in the uninitialized state
403   if (state_ != StateEnum::UNINIT) {
404     return invalidState(callback);
405   }
406
407   connectTimeout_ = std::chrono::milliseconds(timeout);
408   connectStartTime_ = std::chrono::steady_clock::now();
409   // Make connect end time at least >= connectStartTime.
410   connectEndTime_ = connectStartTime_;
411
412   assert(fd_ == -1);
413   state_ = StateEnum::CONNECTING;
414   connectCallback_ = callback;
415
416   sockaddr_storage addrStorage;
417   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
418
419   try {
420     // Create the socket
421     // Technically the first parameter should actually be a protocol family
422     // constant (PF_xxx) rather than an address family (AF_xxx), but the
423     // distinction is mainly just historical.  In pretty much all
424     // implementations the PF_foo and AF_foo constants are identical.
425     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
426     if (fd_ < 0) {
427       auto errnoCopy = errno;
428       throw AsyncSocketException(
429           AsyncSocketException::INTERNAL_ERROR,
430           withAddr("failed to create socket"),
431           errnoCopy);
432     }
433     if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
434       shutdownSocketSet->add(fd_);
435     }
436     ioHandler_.changeHandlerFD(fd_);
437
438     setCloseOnExec();
439
440     // Put the socket in non-blocking mode
441     int flags = fcntl(fd_, F_GETFL, 0);
442     if (flags == -1) {
443       auto errnoCopy = errno;
444       throw AsyncSocketException(
445           AsyncSocketException::INTERNAL_ERROR,
446           withAddr("failed to get socket flags"),
447           errnoCopy);
448     }
449     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
450     if (rv == -1) {
451       auto errnoCopy = errno;
452       throw AsyncSocketException(
453           AsyncSocketException::INTERNAL_ERROR,
454           withAddr("failed to put socket in non-blocking mode"),
455           errnoCopy);
456     }
457
458 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
459     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
460     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
461     if (rv == -1) {
462       auto errnoCopy = errno;
463       throw AsyncSocketException(
464           AsyncSocketException::INTERNAL_ERROR,
465           "failed to enable F_SETNOSIGPIPE on socket",
466           errnoCopy);
467     }
468 #endif
469
470     // By default, turn on TCP_NODELAY
471     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
472     // setNoDelay() will log an error message if it fails.
473     // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
474     // is not created
475     if (address.getFamily() != AF_UNIX) {
476       (void)setNoDelay(true);
477       setZeroCopy(zeroCopyVal_);
478     }
479
480     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
481             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
482
483     // bind the socket
484     if (bindAddr != anyAddress()) {
485       int one = 1;
486       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
487         auto errnoCopy = errno;
488         doClose();
489         throw AsyncSocketException(
490             AsyncSocketException::NOT_OPEN,
491             "failed to setsockopt prior to bind on " + bindAddr.describe(),
492             errnoCopy);
493       }
494
495       bindAddr.getAddress(&addrStorage);
496
497       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
498         auto errnoCopy = errno;
499         doClose();
500         throw AsyncSocketException(
501             AsyncSocketException::NOT_OPEN,
502             "failed to bind to async socket: " + bindAddr.describe(),
503             errnoCopy);
504       }
505     }
506
507     // Apply the additional options if any.
508     for (const auto& opt: options) {
509       rv = opt.first.apply(fd_, opt.second);
510       if (rv != 0) {
511         auto errnoCopy = errno;
512         throw AsyncSocketException(
513             AsyncSocketException::INTERNAL_ERROR,
514             withAddr("failed to set socket option"),
515             errnoCopy);
516       }
517     }
518
519     // Perform the connect()
520     address.getAddress(&addrStorage);
521
522     if (tfoEnabled_) {
523       state_ = StateEnum::FAST_OPEN;
524       tfoAttempted_ = true;
525     } else {
526       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
527         return;
528       }
529     }
530
531     // If we're still here the connect() succeeded immediately.
532     // Fall through to call the callback outside of this try...catch block
533   } catch (const AsyncSocketException& ex) {
534     return failConnect(__func__, ex);
535   } catch (const std::exception& ex) {
536     // shouldn't happen, but handle it just in case
537     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
538                << "): unexpected " << typeid(ex).name() << " exception: "
539                << ex.what();
540     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
541                             withAddr(string("unexpected exception: ") +
542                                      ex.what()));
543     return failConnect(__func__, tex);
544   }
545
546   // The connection succeeded immediately
547   // The read callback may not have been set yet, and no writes may be pending
548   // yet, so we don't have to register for any events at the moment.
549   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
550   assert(errMessageCallback_ == nullptr);
551   assert(readCallback_ == nullptr);
552   assert(writeReqHead_ == nullptr);
553   if (state_ != StateEnum::FAST_OPEN) {
554     state_ = StateEnum::ESTABLISHED;
555   }
556   invokeConnectSuccess();
557 }
558
559 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
560 #if __linux__
561   if (noTransparentTls_) {
562     // Ignore return value, errors are ok
563     setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
564   }
565   if (noTSocks_) {
566     VLOG(4) << "Disabling TSOCKS for fd " << fd_;
567     // Ignore return value, errors are ok
568     setsockopt(fd_, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
569   }
570 #endif
571   int rv = fsp::connect(fd_, saddr, len);
572   if (rv < 0) {
573     auto errnoCopy = errno;
574     if (errnoCopy == EINPROGRESS) {
575       scheduleConnectTimeout();
576       registerForConnectEvents();
577     } else {
578       throw AsyncSocketException(
579           AsyncSocketException::NOT_OPEN,
580           "connect failed (immediately)",
581           errnoCopy);
582     }
583   }
584   return rv;
585 }
586
587 void AsyncSocket::scheduleConnectTimeout() {
588   // Connection in progress.
589   auto timeout = connectTimeout_.count();
590   if (timeout > 0) {
591     // Start a timer in case the connection takes too long.
592     if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
593       throw AsyncSocketException(
594           AsyncSocketException::INTERNAL_ERROR,
595           withAddr("failed to schedule AsyncSocket connect timeout"));
596     }
597   }
598 }
599
600 void AsyncSocket::registerForConnectEvents() {
601   // Register for write events, so we'll
602   // be notified when the connection finishes/fails.
603   // Note that we don't register for a persistent event here.
604   assert(eventFlags_ == EventHandler::NONE);
605   eventFlags_ = EventHandler::WRITE;
606   if (!ioHandler_.registerHandler(eventFlags_)) {
607     throw AsyncSocketException(
608         AsyncSocketException::INTERNAL_ERROR,
609         withAddr("failed to register AsyncSocket connect handler"));
610   }
611 }
612
613 void AsyncSocket::connect(ConnectCallback* callback,
614                            const string& ip, uint16_t port,
615                            int timeout,
616                            const OptionMap &options) noexcept {
617   DestructorGuard dg(this);
618   try {
619     connectCallback_ = callback;
620     connect(callback, folly::SocketAddress(ip, port), timeout, options);
621   } catch (const std::exception& ex) {
622     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
623                             ex.what());
624     return failConnect(__func__, tex);
625   }
626 }
627
628 void AsyncSocket::cancelConnect() {
629   connectCallback_ = nullptr;
630   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
631     closeNow();
632   }
633 }
634
635 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
636   sendTimeout_ = milliseconds;
637   if (eventBase_) {
638     eventBase_->dcheckIsInEventBaseThread();
639   }
640
641   // If we are currently pending on write requests, immediately update
642   // writeTimeout_ with the new value.
643   if ((eventFlags_ & EventHandler::WRITE) &&
644       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
645     assert(state_ == StateEnum::ESTABLISHED);
646     assert((shutdownFlags_ & SHUT_WRITE) == 0);
647     if (sendTimeout_ > 0) {
648       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
649         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
650             withAddr("failed to reschedule send timeout in setSendTimeout"));
651         return failWrite(__func__, ex);
652       }
653     } else {
654       writeTimeout_.cancelTimeout();
655     }
656   }
657 }
658
659 void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
660   VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this
661           << ", fd=" << fd_ << ", callback=" << callback
662           << ", state=" << state_;
663
664   // In the latest stable kernel 4.14.3 as of 2017-12-04, unix domain
665   // socket does not support MSG_ERRQUEUE. So recvmsg(MSG_ERRQUEUE)
666   // will read application data from unix doamin socket as error
667   // message, which breaks the message flow in application.  Feel free
668   // to remove the next code block if MSG_ERRQUEUE is added for unix
669   // domain socket in the future.
670   if (callback != nullptr) {
671     cacheLocalAddress();
672     if (localAddr_.getFamily() == AF_UNIX) {
673       LOG(ERROR) << "Failed to set ErrMessageCallback=" << callback
674                  << " for Unix Doamin Socket where MSG_ERRQUEUE is unsupported,"
675                  << " fd=" << fd_;
676       return;
677     }
678   }
679
680   // Short circuit if callback is the same as the existing errMessageCallback_.
681   if (callback == errMessageCallback_) {
682     return;
683   }
684
685   if (!msgErrQueueSupported) {
686       // Per-socket error message queue is not supported on this platform.
687       return invalidState(callback);
688   }
689
690   DestructorGuard dg(this);
691   eventBase_->dcheckIsInEventBaseThread();
692
693   if (callback == nullptr) {
694     // We should be able to reset the callback regardless of the
695     // socket state. It's important to have a reliable callback
696     // cancellation mechanism.
697     errMessageCallback_ = callback;
698     return;
699   }
700
701   switch ((StateEnum)state_) {
702     case StateEnum::CONNECTING:
703     case StateEnum::FAST_OPEN:
704     case StateEnum::ESTABLISHED: {
705       errMessageCallback_ = callback;
706       return;
707     }
708     case StateEnum::CLOSED:
709     case StateEnum::ERROR:
710       // We should never reach here.  SHUT_READ should always be set
711       // if we are in STATE_CLOSED or STATE_ERROR.
712       assert(false);
713       return invalidState(callback);
714     case StateEnum::UNINIT:
715       // We do not allow setReadCallback() to be called before we start
716       // connecting.
717       return invalidState(callback);
718   }
719
720   // We don't put a default case in the switch statement, so that the compiler
721   // will warn us to update the switch statement if a new state is added.
722   return invalidState(callback);
723 }
724
725 AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
726   return errMessageCallback_;
727 }
728
729 void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
730   sendMsgParamCallback_ = callback;
731 }
732
733 AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {
734   return sendMsgParamCallback_;
735 }
736
737 void AsyncSocket::setReadCB(ReadCallback *callback) {
738   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
739           << ", callback=" << callback << ", state=" << state_;
740
741   // Short circuit if callback is the same as the existing readCallback_.
742   //
743   // Note that this is needed for proper functioning during some cleanup cases.
744   // During cleanup we allow setReadCallback(nullptr) to be called even if the
745   // read callback is already unset and we have been detached from an event
746   // base.  This check prevents us from asserting
747   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
748   if (callback == readCallback_) {
749     return;
750   }
751
752   /* We are removing a read callback */
753   if (callback == nullptr &&
754       immediateReadHandler_.isLoopCallbackScheduled()) {
755     immediateReadHandler_.cancelLoopCallback();
756   }
757
758   if (shutdownFlags_ & SHUT_READ) {
759     // Reads have already been shut down on this socket.
760     //
761     // Allow setReadCallback(nullptr) to be called in this case, but don't
762     // allow a new callback to be set.
763     //
764     // For example, setReadCallback(nullptr) can happen after an error if we
765     // invoke some other error callback before invoking readError().  The other
766     // error callback that is invoked first may go ahead and clear the read
767     // callback before we get a chance to invoke readError().
768     if (callback != nullptr) {
769       return invalidState(callback);
770     }
771     assert((eventFlags_ & EventHandler::READ) == 0);
772     readCallback_ = nullptr;
773     return;
774   }
775
776   DestructorGuard dg(this);
777   eventBase_->dcheckIsInEventBaseThread();
778
779   switch ((StateEnum)state_) {
780     case StateEnum::CONNECTING:
781     case StateEnum::FAST_OPEN:
782       // For convenience, we allow the read callback to be set while we are
783       // still connecting.  We just store the callback for now.  Once the
784       // connection completes we'll register for read events.
785       readCallback_ = callback;
786       return;
787     case StateEnum::ESTABLISHED:
788     {
789       readCallback_ = callback;
790       uint16_t oldFlags = eventFlags_;
791       if (readCallback_) {
792         eventFlags_ |= EventHandler::READ;
793       } else {
794         eventFlags_ &= ~EventHandler::READ;
795       }
796
797       // Update our registration if our flags have changed
798       if (eventFlags_ != oldFlags) {
799         // We intentionally ignore the return value here.
800         // updateEventRegistration() will move us into the error state if it
801         // fails, and we don't need to do anything else here afterwards.
802         (void)updateEventRegistration();
803       }
804
805       if (readCallback_) {
806         checkForImmediateRead();
807       }
808       return;
809     }
810     case StateEnum::CLOSED:
811     case StateEnum::ERROR:
812       // We should never reach here.  SHUT_READ should always be set
813       // if we are in STATE_CLOSED or STATE_ERROR.
814       assert(false);
815       return invalidState(callback);
816     case StateEnum::UNINIT:
817       // We do not allow setReadCallback() to be called before we start
818       // connecting.
819       return invalidState(callback);
820   }
821
822   // We don't put a default case in the switch statement, so that the compiler
823   // will warn us to update the switch statement if a new state is added.
824   return invalidState(callback);
825 }
826
827 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
828   return readCallback_;
829 }
830
831 bool AsyncSocket::setZeroCopy(bool enable) {
832   if (msgErrQueueSupported) {
833     zeroCopyVal_ = enable;
834
835     if (fd_ < 0) {
836       return false;
837     }
838
839     int val = enable ? 1 : 0;
840     int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
841
842     // if enable == false, set zeroCopyEnabled_ = false regardless
843     // if SO_ZEROCOPY is set or not
844     if (!enable) {
845       zeroCopyEnabled_ = enable;
846       return true;
847     }
848
849     /* if the setsockopt failed, try to see if the socket inherited the flag
850      * since we cannot set SO_ZEROCOPY on a socket s = accept
851      */
852     if (ret) {
853       val = 0;
854       socklen_t optlen = sizeof(val);
855       ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
856
857       if (!ret) {
858         enable = val ? true : false;
859       }
860     }
861
862     if (!ret) {
863       zeroCopyEnabled_ = enable;
864
865       return true;
866     }
867   }
868
869   return false;
870 }
871
872 bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
873   return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
874 }
875
876 void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {
877   if (!zeroCopyEnabled_) {
878     flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
879   }
880 }
881
882 void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
883   uint32_t id = getNextZeroCopyBufId();
884   folly::IOBuf* ptr = buf.get();
885
886   idZeroCopyBufPtrMap_[id] = ptr;
887   auto& p = idZeroCopyBufInfoMap_[ptr];
888   p.count_++;
889   CHECK(p.buf_.get() == nullptr);
890   p.buf_ = std::move(buf);
891 }
892
893 void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
894   uint32_t id = getNextZeroCopyBufId();
895   idZeroCopyBufPtrMap_[id] = ptr;
896
897   idZeroCopyBufInfoMap_[ptr].count_++;
898 }
899
900 void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
901   auto iter = idZeroCopyBufPtrMap_.find(id);
902   CHECK(iter != idZeroCopyBufPtrMap_.end());
903   auto ptr = iter->second;
904   auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
905   CHECK(iter1 != idZeroCopyBufInfoMap_.end());
906   if (0 == --iter1->second.count_) {
907     idZeroCopyBufInfoMap_.erase(iter1);
908   }
909
910   idZeroCopyBufPtrMap_.erase(iter);
911 }
912
913 void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
914   folly::IOBuf* ptr = buf.get();
915   auto& p = idZeroCopyBufInfoMap_[ptr];
916   CHECK(p.buf_.get() == nullptr);
917
918   p.buf_ = std::move(buf);
919 }
920
921 bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
922   return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
923 }
924
925 bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
926 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
927   if (zeroCopyEnabled_ &&
928       ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
929        (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
930     const struct sock_extended_err* serr =
931         reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
932     return (
933         (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
934   }
935 #endif
936   return false;
937 }
938
939 void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
940 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
941   const struct sock_extended_err* serr =
942       reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
943   uint32_t hi = serr->ee_data;
944   uint32_t lo = serr->ee_info;
945   // disable zero copy if the buffer was actually copied
946   if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
947     VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
948             << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
949             << "on " << fd_;
950     zeroCopyEnabled_ = false;
951   }
952
953   for (uint32_t i = lo; i <= hi; i++) {
954     releaseZeroCopyBuf(i);
955   }
956 #endif
957 }
958
959 void AsyncSocket::write(WriteCallback* callback,
960                          const void* buf, size_t bytes, WriteFlags flags) {
961   iovec op;
962   op.iov_base = const_cast<void*>(buf);
963   op.iov_len = bytes;
964   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
965 }
966
967 void AsyncSocket::writev(WriteCallback* callback,
968                           const iovec* vec,
969                           size_t count,
970                           WriteFlags flags) {
971   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
972 }
973
974 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
975                               WriteFlags flags) {
976   adjustZeroCopyFlags(flags);
977
978   constexpr size_t kSmallSizeMax = 64;
979   size_t count = buf->countChainElements();
980   if (count <= kSmallSizeMax) {
981     // suppress "warning: variable length array 'vec' is used [-Wvla]"
982     FOLLY_PUSH_WARNING
983     FOLLY_GCC_DISABLE_WARNING("-Wvla")
984     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
985     FOLLY_POP_WARNING
986
987     writeChainImpl(callback, vec, count, std::move(buf), flags);
988   } else {
989     iovec* vec = new iovec[count];
990     writeChainImpl(callback, vec, count, std::move(buf), flags);
991     delete[] vec;
992   }
993 }
994
995 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
996     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
997   size_t veclen = buf->fillIov(vec, count);
998   writeImpl(callback, vec, veclen, std::move(buf), flags);
999 }
1000
1001 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
1002                              size_t count, unique_ptr<IOBuf>&& buf,
1003                              WriteFlags flags) {
1004   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
1005           << ", callback=" << callback << ", count=" << count
1006           << ", state=" << state_;
1007   DestructorGuard dg(this);
1008   unique_ptr<IOBuf>ioBuf(std::move(buf));
1009   eventBase_->dcheckIsInEventBaseThread();
1010
1011   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
1012     // No new writes may be performed after the write side of the socket has
1013     // been shutdown.
1014     //
1015     // We could just call callback->writeError() here to fail just this write.
1016     // However, fail hard and use invalidState() to fail all outstanding
1017     // callbacks and move the socket into the error state.  There's most likely
1018     // a bug in the caller's code, so we abort everything rather than trying to
1019     // proceed as best we can.
1020     return invalidState(callback);
1021   }
1022
1023   uint32_t countWritten = 0;
1024   uint32_t partialWritten = 0;
1025   ssize_t bytesWritten = 0;
1026   bool mustRegister = false;
1027   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
1028       !connecting()) {
1029     if (writeReqHead_ == nullptr) {
1030       // If we are established and there are no other writes pending,
1031       // we can attempt to perform the write immediately.
1032       assert(writeReqTail_ == nullptr);
1033       assert((eventFlags_ & EventHandler::WRITE) == 0);
1034
1035       auto writeResult = performWrite(
1036           vec, uint32_t(count), flags, &countWritten, &partialWritten);
1037       bytesWritten = writeResult.writeReturn;
1038       if (bytesWritten < 0) {
1039         auto errnoCopy = errno;
1040         if (writeResult.exception) {
1041           return failWrite(__func__, callback, 0, *writeResult.exception);
1042         }
1043         AsyncSocketException ex(
1044             AsyncSocketException::INTERNAL_ERROR,
1045             withAddr("writev failed"),
1046             errnoCopy);
1047         return failWrite(__func__, callback, 0, ex);
1048       } else if (countWritten == count) {
1049         // done, add the whole buffer
1050         if (countWritten && isZeroCopyRequest(flags)) {
1051           addZeroCopyBuf(std::move(ioBuf));
1052         }
1053         // We successfully wrote everything.
1054         // Invoke the callback and return.
1055         if (callback) {
1056           callback->writeSuccess();
1057         }
1058         return;
1059       } else { // continue writing the next writeReq
1060         // add just the ptr
1061         if (bytesWritten && isZeroCopyRequest(flags)) {
1062           addZeroCopyBuf(ioBuf.get());
1063         }
1064         if (bufferCallback_) {
1065           bufferCallback_->onEgressBuffered();
1066         }
1067       }
1068       if (!connecting()) {
1069         // Writes might put the socket back into connecting state
1070         // if TFO is enabled, and using TFO fails.
1071         // This means that write timeouts would not be active, however
1072         // connect timeouts would affect this stage.
1073         mustRegister = true;
1074       }
1075     }
1076   } else if (!connecting()) {
1077     // Invalid state for writing
1078     return invalidState(callback);
1079   }
1080
1081   // Create a new WriteRequest to add to the queue
1082   WriteRequest* req;
1083   try {
1084     req = BytesWriteRequest::newRequest(
1085         this,
1086         callback,
1087         vec + countWritten,
1088         uint32_t(count - countWritten),
1089         partialWritten,
1090         uint32_t(bytesWritten),
1091         std::move(ioBuf),
1092         flags);
1093   } catch (const std::exception& ex) {
1094     // we mainly expect to catch std::bad_alloc here
1095     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
1096         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
1097     return failWrite(__func__, callback, size_t(bytesWritten), tex);
1098   }
1099   req->consume();
1100   if (writeReqTail_ == nullptr) {
1101     assert(writeReqHead_ == nullptr);
1102     writeReqHead_ = writeReqTail_ = req;
1103   } else {
1104     writeReqTail_->append(req);
1105     writeReqTail_ = req;
1106   }
1107
1108   // Register for write events if are established and not currently
1109   // waiting on write events
1110   if (mustRegister) {
1111     assert(state_ == StateEnum::ESTABLISHED);
1112     assert((eventFlags_ & EventHandler::WRITE) == 0);
1113     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1114       assert(state_ == StateEnum::ERROR);
1115       return;
1116     }
1117     if (sendTimeout_ > 0) {
1118       // Schedule a timeout to fire if the write takes too long.
1119       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1120         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1121                                withAddr("failed to schedule send timeout"));
1122         return failWrite(__func__, ex);
1123       }
1124     }
1125   }
1126 }
1127
1128 void AsyncSocket::writeRequest(WriteRequest* req) {
1129   if (writeReqTail_ == nullptr) {
1130     assert(writeReqHead_ == nullptr);
1131     writeReqHead_ = writeReqTail_ = req;
1132     req->start();
1133   } else {
1134     writeReqTail_->append(req);
1135     writeReqTail_ = req;
1136   }
1137 }
1138
1139 void AsyncSocket::close() {
1140   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
1141           << ", state=" << state_ << ", shutdownFlags="
1142           << std::hex << (int) shutdownFlags_;
1143
1144   // close() is only different from closeNow() when there are pending writes
1145   // that need to drain before we can close.  In all other cases, just call
1146   // closeNow().
1147   //
1148   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
1149   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
1150   // is still running.  (e.g., If there are multiple pending writes, and we
1151   // call writeError() on the first one, it may call close().  In this case we
1152   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
1153   // writes will still be in the queue.)
1154   //
1155   // We only need to drain pending writes if we are still in STATE_CONNECTING
1156   // or STATE_ESTABLISHED
1157   if ((writeReqHead_ == nullptr) ||
1158       !(state_ == StateEnum::CONNECTING ||
1159       state_ == StateEnum::ESTABLISHED)) {
1160     closeNow();
1161     return;
1162   }
1163
1164   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
1165   // destroyed until close() returns.
1166   DestructorGuard dg(this);
1167   eventBase_->dcheckIsInEventBaseThread();
1168
1169   // Since there are write requests pending, we have to set the
1170   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
1171   // connect finishes and we finish writing these requests.
1172   //
1173   // Set SHUT_READ to indicate that reads are shut down, and set the
1174   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
1175   // pending writes complete.
1176   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
1177
1178   // If a read callback is set, invoke readEOF() immediately to inform it that
1179   // the socket has been closed and no more data can be read.
1180   if (readCallback_) {
1181     // Disable reads if they are enabled
1182     if (!updateEventRegistration(0, EventHandler::READ)) {
1183       // We're now in the error state; callbacks have been cleaned up
1184       assert(state_ == StateEnum::ERROR);
1185       assert(readCallback_ == nullptr);
1186     } else {
1187       ReadCallback* callback = readCallback_;
1188       readCallback_ = nullptr;
1189       callback->readEOF();
1190     }
1191   }
1192 }
1193
1194 void AsyncSocket::closeNow() {
1195   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
1196           << ", state=" << state_ << ", shutdownFlags="
1197           << std::hex << (int) shutdownFlags_;
1198   DestructorGuard dg(this);
1199   if (eventBase_) {
1200     eventBase_->dcheckIsInEventBaseThread();
1201   }
1202
1203   switch (state_) {
1204     case StateEnum::ESTABLISHED:
1205     case StateEnum::CONNECTING:
1206     case StateEnum::FAST_OPEN: {
1207       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1208       state_ = StateEnum::CLOSED;
1209
1210       // If the write timeout was set, cancel it.
1211       writeTimeout_.cancelTimeout();
1212
1213       // If we are registered for I/O events, unregister.
1214       if (eventFlags_ != EventHandler::NONE) {
1215         eventFlags_ = EventHandler::NONE;
1216         if (!updateEventRegistration()) {
1217           // We will have been moved into the error state.
1218           assert(state_ == StateEnum::ERROR);
1219           return;
1220         }
1221       }
1222
1223       if (immediateReadHandler_.isLoopCallbackScheduled()) {
1224         immediateReadHandler_.cancelLoopCallback();
1225       }
1226
1227       if (fd_ >= 0) {
1228         ioHandler_.changeHandlerFD(-1);
1229         doClose();
1230       }
1231
1232       invokeConnectErr(socketClosedLocallyEx);
1233
1234       failAllWrites(socketClosedLocallyEx);
1235
1236       if (readCallback_) {
1237         ReadCallback* callback = readCallback_;
1238         readCallback_ = nullptr;
1239         callback->readEOF();
1240       }
1241       return;
1242     }
1243     case StateEnum::CLOSED:
1244       // Do nothing.  It's possible that we are being called recursively
1245       // from inside a callback that we invoked inside another call to close()
1246       // that is still running.
1247       return;
1248     case StateEnum::ERROR:
1249       // Do nothing.  The error handling code has performed (or is performing)
1250       // cleanup.
1251       return;
1252     case StateEnum::UNINIT:
1253       assert(eventFlags_ == EventHandler::NONE);
1254       assert(connectCallback_ == nullptr);
1255       assert(readCallback_ == nullptr);
1256       assert(writeReqHead_ == nullptr);
1257       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1258       state_ = StateEnum::CLOSED;
1259       return;
1260   }
1261
1262   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
1263               << ") called in unknown state " << state_;
1264 }
1265
1266 void AsyncSocket::closeWithReset() {
1267   // Enable SO_LINGER, with the linger timeout set to 0.
1268   // This will trigger a TCP reset when we close the socket.
1269   if (fd_ >= 0) {
1270     struct linger optLinger = {1, 0};
1271     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1272       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
1273               << "on " << fd_ << ": errno=" << errno;
1274     }
1275   }
1276
1277   // Then let closeNow() take care of the rest
1278   closeNow();
1279 }
1280
1281 void AsyncSocket::shutdownWrite() {
1282   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
1283           << ", state=" << state_ << ", shutdownFlags="
1284           << std::hex << (int) shutdownFlags_;
1285
1286   // If there are no pending writes, shutdownWrite() is identical to
1287   // shutdownWriteNow().
1288   if (writeReqHead_ == nullptr) {
1289     shutdownWriteNow();
1290     return;
1291   }
1292
1293   eventBase_->dcheckIsInEventBaseThread();
1294
1295   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
1296   // shutdown will be performed once all writes complete.
1297   shutdownFlags_ |= SHUT_WRITE_PENDING;
1298 }
1299
1300 void AsyncSocket::shutdownWriteNow() {
1301   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
1302           << ", fd=" << fd_ << ", state=" << state_
1303           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
1304
1305   if (shutdownFlags_ & SHUT_WRITE) {
1306     // Writes are already shutdown; nothing else to do.
1307     return;
1308   }
1309
1310   // If SHUT_READ is already set, just call closeNow() to completely
1311   // close the socket.  This can happen if close() was called with writes
1312   // pending, and then shutdownWriteNow() is called before all pending writes
1313   // complete.
1314   if (shutdownFlags_ & SHUT_READ) {
1315     closeNow();
1316     return;
1317   }
1318
1319   DestructorGuard dg(this);
1320   if (eventBase_) {
1321     eventBase_->dcheckIsInEventBaseThread();
1322   }
1323
1324   switch (static_cast<StateEnum>(state_)) {
1325     case StateEnum::ESTABLISHED:
1326     {
1327       shutdownFlags_ |= SHUT_WRITE;
1328
1329       // If the write timeout was set, cancel it.
1330       writeTimeout_.cancelTimeout();
1331
1332       // If we are registered for write events, unregister.
1333       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1334         // We will have been moved into the error state.
1335         assert(state_ == StateEnum::ERROR);
1336         return;
1337       }
1338
1339       // Shutdown writes on the file descriptor
1340       shutdown(fd_, SHUT_WR);
1341
1342       // Immediately fail all write requests
1343       failAllWrites(socketShutdownForWritesEx);
1344       return;
1345     }
1346     case StateEnum::CONNECTING:
1347     {
1348       // Set the SHUT_WRITE_PENDING flag.
1349       // When the connection completes, it will check this flag,
1350       // shutdown the write half of the socket, and then set SHUT_WRITE.
1351       shutdownFlags_ |= SHUT_WRITE_PENDING;
1352
1353       // Immediately fail all write requests
1354       failAllWrites(socketShutdownForWritesEx);
1355       return;
1356     }
1357     case StateEnum::UNINIT:
1358       // Callers normally shouldn't call shutdownWriteNow() before the socket
1359       // even starts connecting.  Nonetheless, go ahead and set
1360       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1361       // immediately shut down the write side of the socket.
1362       shutdownFlags_ |= SHUT_WRITE_PENDING;
1363       return;
1364     case StateEnum::FAST_OPEN:
1365       // In fast open state we haven't call connected yet, and if we shutdown
1366       // the writes, we will never try to call connect, so shut everything down
1367       shutdownFlags_ |= SHUT_WRITE;
1368       // Immediately fail all write requests
1369       failAllWrites(socketShutdownForWritesEx);
1370       return;
1371     case StateEnum::CLOSED:
1372     case StateEnum::ERROR:
1373       // We should never get here.  SHUT_WRITE should always be set
1374       // in STATE_CLOSED and STATE_ERROR.
1375       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1376                  << ", fd=" << fd_ << ") in unexpected state " << state_
1377                  << " with SHUT_WRITE not set ("
1378                  << std::hex << (int) shutdownFlags_ << ")";
1379       assert(false);
1380       return;
1381   }
1382
1383   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1384               << fd_ << ") called in unknown state " << state_;
1385 }
1386
1387 bool AsyncSocket::readable() const {
1388   if (fd_ == -1) {
1389     return false;
1390   }
1391   struct pollfd fds[1];
1392   fds[0].fd = fd_;
1393   fds[0].events = POLLIN;
1394   fds[0].revents = 0;
1395   int rc = poll(fds, 1, 0);
1396   return rc == 1;
1397 }
1398
1399 bool AsyncSocket::writable() const {
1400   if (fd_ == -1) {
1401     return false;
1402   }
1403   struct pollfd fds[1];
1404   fds[0].fd = fd_;
1405   fds[0].events = POLLOUT;
1406   fds[0].revents = 0;
1407   int rc = poll(fds, 1, 0);
1408   return rc == 1;
1409 }
1410
1411 bool AsyncSocket::isPending() const {
1412   return ioHandler_.isPending();
1413 }
1414
1415 bool AsyncSocket::hangup() const {
1416   if (fd_ == -1) {
1417     // sanity check, no one should ask for hangup if we are not connected.
1418     assert(false);
1419     return false;
1420   }
1421 #ifdef POLLRDHUP // Linux-only
1422   struct pollfd fds[1];
1423   fds[0].fd = fd_;
1424   fds[0].events = POLLRDHUP|POLLHUP;
1425   fds[0].revents = 0;
1426   poll(fds, 1, 0);
1427   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1428 #else
1429   return false;
1430 #endif
1431 }
1432
1433 bool AsyncSocket::good() const {
1434   return (
1435       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1436        state_ == StateEnum::ESTABLISHED) &&
1437       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1438 }
1439
1440 bool AsyncSocket::error() const {
1441   return (state_ == StateEnum::ERROR);
1442 }
1443
1444 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1445   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1446           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1447           << ", state=" << state_ << ", events="
1448           << std::hex << eventFlags_ << ")";
1449   assert(eventBase_ == nullptr);
1450   eventBase->dcheckIsInEventBaseThread();
1451
1452   eventBase_ = eventBase;
1453   ioHandler_.attachEventBase(eventBase);
1454
1455   updateEventRegistration();
1456
1457   writeTimeout_.attachEventBase(eventBase);
1458   if (evbChangeCb_) {
1459     evbChangeCb_->evbAttached(this);
1460   }
1461 }
1462
1463 void AsyncSocket::detachEventBase() {
1464   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1465           << ", old evb=" << eventBase_ << ", state=" << state_
1466           << ", events=" << std::hex << eventFlags_ << ")";
1467   assert(eventBase_ != nullptr);
1468   eventBase_->dcheckIsInEventBaseThread();
1469
1470   eventBase_ = nullptr;
1471
1472   ioHandler_.unregisterHandler();
1473
1474   ioHandler_.detachEventBase();
1475   writeTimeout_.detachEventBase();
1476   if (evbChangeCb_) {
1477     evbChangeCb_->evbDetached(this);
1478   }
1479 }
1480
1481 bool AsyncSocket::isDetachable() const {
1482   DCHECK(eventBase_ != nullptr);
1483   eventBase_->dcheckIsInEventBaseThread();
1484
1485   return !writeTimeout_.isScheduled();
1486 }
1487
1488 void AsyncSocket::cacheAddresses() {
1489   if (fd_ >= 0) {
1490     try {
1491       cacheLocalAddress();
1492       cachePeerAddress();
1493     } catch (const std::system_error& e) {
1494       if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1495         VLOG(1) << "Error caching addresses: " << e.code().value() << ", "
1496                 << e.code().message();
1497       }
1498     }
1499   }
1500 }
1501
1502 void AsyncSocket::cacheLocalAddress() const {
1503   if (!localAddr_.isInitialized()) {
1504     localAddr_.setFromLocalAddress(fd_);
1505   }
1506 }
1507
1508 void AsyncSocket::cachePeerAddress() const {
1509   if (!addr_.isInitialized()) {
1510     addr_.setFromPeerAddress(fd_);
1511   }
1512 }
1513
1514 bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
1515   eventBase_->dcheckIsInEventBaseThread();
1516   return (!idZeroCopyBufPtrMap_.empty());
1517 }
1518
1519 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1520   cacheLocalAddress();
1521   *address = localAddr_;
1522 }
1523
1524 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1525   cachePeerAddress();
1526   *address = addr_;
1527 }
1528
1529 bool AsyncSocket::getTFOSucceded() const {
1530   return detail::tfo_succeeded(fd_);
1531 }
1532
1533 int AsyncSocket::setNoDelay(bool noDelay) {
1534   if (fd_ < 0) {
1535     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1536                << this << "(state=" << state_ << ")";
1537     return EINVAL;
1538
1539   }
1540
1541   int value = noDelay ? 1 : 0;
1542   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1543     int errnoCopy = errno;
1544     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1545             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1546             << strerror(errnoCopy);
1547     return errnoCopy;
1548   }
1549
1550   return 0;
1551 }
1552
1553 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1554
1555   #ifndef TCP_CONGESTION
1556   #define TCP_CONGESTION  13
1557   #endif
1558
1559   if (fd_ < 0) {
1560     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1561                << "socket " << this << "(state=" << state_ << ")";
1562     return EINVAL;
1563
1564   }
1565
1566   if (setsockopt(
1567           fd_,
1568           IPPROTO_TCP,
1569           TCP_CONGESTION,
1570           cname.c_str(),
1571           socklen_t(cname.length() + 1)) != 0) {
1572     int errnoCopy = errno;
1573     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1574             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1575             << strerror(errnoCopy);
1576     return errnoCopy;
1577   }
1578
1579   return 0;
1580 }
1581
1582 int AsyncSocket::setQuickAck(bool quickack) {
1583   (void)quickack;
1584   if (fd_ < 0) {
1585     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1586                << this << "(state=" << state_ << ")";
1587     return EINVAL;
1588
1589   }
1590
1591 #ifdef TCP_QUICKACK // Linux-only
1592   int value = quickack ? 1 : 0;
1593   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1594     int errnoCopy = errno;
1595     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1596             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1597             << strerror(errnoCopy);
1598     return errnoCopy;
1599   }
1600
1601   return 0;
1602 #else
1603   return ENOSYS;
1604 #endif
1605 }
1606
1607 int AsyncSocket::setSendBufSize(size_t bufsize) {
1608   if (fd_ < 0) {
1609     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1610                << this << "(state=" << state_ << ")";
1611     return EINVAL;
1612   }
1613
1614   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1615     int errnoCopy = errno;
1616     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1617             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1618             << strerror(errnoCopy);
1619     return errnoCopy;
1620   }
1621
1622   return 0;
1623 }
1624
1625 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1626   if (fd_ < 0) {
1627     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1628                << this << "(state=" << state_ << ")";
1629     return EINVAL;
1630   }
1631
1632   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1633     int errnoCopy = errno;
1634     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1635             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1636             << strerror(errnoCopy);
1637     return errnoCopy;
1638   }
1639
1640   return 0;
1641 }
1642
1643 int AsyncSocket::setTCPProfile(int profd) {
1644   if (fd_ < 0) {
1645     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1646                << this << "(state=" << state_ << ")";
1647     return EINVAL;
1648   }
1649
1650   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1651     int errnoCopy = errno;
1652     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1653             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1654             << strerror(errnoCopy);
1655     return errnoCopy;
1656   }
1657
1658   return 0;
1659 }
1660
1661 void AsyncSocket::ioReady(uint16_t events) noexcept {
1662   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
1663           << ", events=" << std::hex << events << ", state=" << state_;
1664   DestructorGuard dg(this);
1665   assert(events & EventHandler::READ_WRITE);
1666   eventBase_->dcheckIsInEventBaseThread();
1667
1668   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1669   EventBase* originalEventBase = eventBase_;
1670   // If we got there it means that either EventHandler::READ or
1671   // EventHandler::WRITE is set. Any of these flags can
1672   // indicate that there are messages available in the socket
1673   // error message queue.
1674   // Return if we handle any error messages - this is to avoid
1675   // unnecessary read/write calls
1676   if (handleErrMessages()) {
1677     return;
1678   }
1679
1680   // Return now if handleErrMessages() detached us from our EventBase
1681   if (eventBase_ != originalEventBase) {
1682     return;
1683   }
1684
1685   if (relevantEvents == EventHandler::READ) {
1686     handleRead();
1687   } else if (relevantEvents == EventHandler::WRITE) {
1688     handleWrite();
1689   } else if (relevantEvents == EventHandler::READ_WRITE) {
1690     // If both read and write events are ready, process writes first.
1691     handleWrite();
1692
1693     // Return now if handleWrite() detached us from our EventBase
1694     if (eventBase_ != originalEventBase) {
1695       return;
1696     }
1697
1698     // Only call handleRead() if a read callback is still installed.
1699     // (It's possible that the read callback was uninstalled during
1700     // handleWrite().)
1701     if (readCallback_) {
1702       handleRead();
1703     }
1704   } else {
1705     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1706                << std::hex << events << "(this=" << this << ")";
1707     abort();
1708   }
1709 }
1710
1711 AsyncSocket::ReadResult
1712 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1713   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1714           << ", buflen=" << *buflen;
1715
1716   if (preReceivedData_ && !preReceivedData_->empty()) {
1717     VLOG(5) << "AsyncSocket::performRead() this=" << this
1718             << ", reading pre-received data";
1719
1720     io::Cursor cursor(preReceivedData_.get());
1721     auto len = cursor.pullAtMost(*buf, *buflen);
1722
1723     IOBufQueue queue;
1724     queue.append(std::move(preReceivedData_));
1725     queue.trimStart(len);
1726     preReceivedData_ = queue.move();
1727
1728     appBytesReceived_ += len;
1729     return ReadResult(len);
1730   }
1731
1732   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1733   if (bytes < 0) {
1734     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1735       // No more data to read right now.
1736       return ReadResult(READ_BLOCKING);
1737     } else {
1738       return ReadResult(READ_ERROR);
1739     }
1740   } else {
1741     appBytesReceived_ += bytes;
1742     return ReadResult(bytes);
1743   }
1744 }
1745
1746 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1747   // no matter what, buffer should be preapared for non-ssl socket
1748   CHECK(readCallback_);
1749   readCallback_->getReadBuffer(buf, buflen);
1750 }
1751
1752 size_t AsyncSocket::handleErrMessages() noexcept {
1753   // This method has non-empty implementation only for platforms
1754   // supporting per-socket error queues.
1755   VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
1756           << ", state=" << state_;
1757   if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty()) {
1758     VLOG(7) << "AsyncSocket::handleErrMessages(): "
1759             << "no callback installed - exiting.";
1760     return 0;
1761   }
1762
1763 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
1764   uint8_t ctrl[1024];
1765   unsigned char data;
1766   struct msghdr msg;
1767   iovec entry;
1768
1769   entry.iov_base = &data;
1770   entry.iov_len = sizeof(data);
1771   msg.msg_iov = &entry;
1772   msg.msg_iovlen = 1;
1773   msg.msg_name = nullptr;
1774   msg.msg_namelen = 0;
1775   msg.msg_control = ctrl;
1776   msg.msg_controllen = sizeof(ctrl);
1777   msg.msg_flags = 0;
1778
1779   int ret;
1780   size_t num = 0;
1781   while (true) {
1782     ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
1783     VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1784
1785     if (ret < 0) {
1786       if (errno != EAGAIN) {
1787         auto errnoCopy = errno;
1788         LOG(ERROR) << "::recvmsg exited with code " << ret
1789                    << ", errno: " << errnoCopy;
1790         AsyncSocketException ex(
1791           AsyncSocketException::INTERNAL_ERROR,
1792           withAddr("recvmsg() failed"),
1793           errnoCopy);
1794         failErrMessageRead(__func__, ex);
1795       }
1796
1797       return num;
1798     }
1799
1800     for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1801          cmsg != nullptr && cmsg->cmsg_len != 0;
1802          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1803       ++num;
1804       if (isZeroCopyMsg(*cmsg)) {
1805         processZeroCopyMsg(*cmsg);
1806       } else {
1807         if (errMessageCallback_) {
1808           errMessageCallback_->errMessage(*cmsg);
1809         }
1810       }
1811     }
1812   }
1813 #else
1814   return 0;
1815 #endif // FOLLY_HAVE_MSG_ERRQUEUE
1816 }
1817
1818 bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
1819   eventBase_->dcheckIsInEventBaseThread();
1820   if (idZeroCopyBufPtrMap_.empty()) {
1821     return true;
1822   }
1823
1824   handleErrMessages();
1825
1826   return idZeroCopyBufPtrMap_.empty();
1827 }
1828
1829 void AsyncSocket::handleRead() noexcept {
1830   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1831           << ", state=" << state_;
1832   assert(state_ == StateEnum::ESTABLISHED);
1833   assert((shutdownFlags_ & SHUT_READ) == 0);
1834   assert(readCallback_ != nullptr);
1835   assert(eventFlags_ & EventHandler::READ);
1836
1837   // Loop until:
1838   // - a read attempt would block
1839   // - readCallback_ is uninstalled
1840   // - the number of loop iterations exceeds the optional maximum
1841   // - this AsyncSocket is moved to another EventBase
1842   //
1843   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1844   // which is why need to check for it here.
1845   //
1846   // The last bullet point is slightly subtle.  readDataAvailable() may also
1847   // detach this socket from this EventBase.  However, before
1848   // readDataAvailable() returns another thread may pick it up, attach it to
1849   // a different EventBase, and install another readCallback_.  We need to
1850   // exit immediately after readDataAvailable() returns if the eventBase_ has
1851   // changed.  (The caller must perform some sort of locking to transfer the
1852   // AsyncSocket between threads properly.  This will be sufficient to ensure
1853   // that this thread sees the updated eventBase_ variable after
1854   // readDataAvailable() returns.)
1855   uint16_t numReads = 0;
1856   EventBase* originalEventBase = eventBase_;
1857   while (readCallback_ && eventBase_ == originalEventBase) {
1858     // Get the buffer to read into.
1859     void* buf = nullptr;
1860     size_t buflen = 0, offset = 0;
1861     try {
1862       prepareReadBuffer(&buf, &buflen);
1863       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1864     } catch (const AsyncSocketException& ex) {
1865       return failRead(__func__, ex);
1866     } catch (const std::exception& ex) {
1867       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1868                               string("ReadCallback::getReadBuffer() "
1869                                      "threw exception: ") +
1870                               ex.what());
1871       return failRead(__func__, tex);
1872     } catch (...) {
1873       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1874                              "ReadCallback::getReadBuffer() threw "
1875                              "non-exception type");
1876       return failRead(__func__, ex);
1877     }
1878     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1879       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1880                              "ReadCallback::getReadBuffer() returned "
1881                              "empty buffer");
1882       return failRead(__func__, ex);
1883     }
1884
1885     // Perform the read
1886     auto readResult = performRead(&buf, &buflen, &offset);
1887     auto bytesRead = readResult.readReturn;
1888     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1889             << bytesRead << " bytes";
1890     if (bytesRead > 0) {
1891       if (!isBufferMovable_) {
1892         readCallback_->readDataAvailable(size_t(bytesRead));
1893       } else {
1894         CHECK(kOpenSslModeMoveBufferOwnership);
1895         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1896                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1897                 << ", offset=" << offset;
1898         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1899         readBuf->trimStart(offset);
1900         readBuf->trimEnd(buflen - offset - bytesRead);
1901         readCallback_->readBufferAvailable(std::move(readBuf));
1902       }
1903
1904       // Fall through and continue around the loop if the read
1905       // completely filled the available buffer.
1906       // Note that readCallback_ may have been uninstalled or changed inside
1907       // readDataAvailable().
1908       if (size_t(bytesRead) < buflen) {
1909         return;
1910       }
1911     } else if (bytesRead == READ_BLOCKING) {
1912         // No more data to read right now.
1913         return;
1914     } else if (bytesRead == READ_ERROR) {
1915       readErr_ = READ_ERROR;
1916       if (readResult.exception) {
1917         return failRead(__func__, *readResult.exception);
1918       }
1919       auto errnoCopy = errno;
1920       AsyncSocketException ex(
1921           AsyncSocketException::INTERNAL_ERROR,
1922           withAddr("recv() failed"),
1923           errnoCopy);
1924       return failRead(__func__, ex);
1925     } else {
1926       assert(bytesRead == READ_EOF);
1927       readErr_ = READ_EOF;
1928       // EOF
1929       shutdownFlags_ |= SHUT_READ;
1930       if (!updateEventRegistration(0, EventHandler::READ)) {
1931         // we've already been moved into STATE_ERROR
1932         assert(state_ == StateEnum::ERROR);
1933         assert(readCallback_ == nullptr);
1934         return;
1935       }
1936
1937       ReadCallback* callback = readCallback_;
1938       readCallback_ = nullptr;
1939       callback->readEOF();
1940       return;
1941     }
1942     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1943       if (readCallback_ != nullptr) {
1944         // We might still have data in the socket.
1945         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1946         scheduleImmediateRead();
1947       }
1948       return;
1949     }
1950   }
1951 }
1952
1953 /**
1954  * This function attempts to write as much data as possible, until no more data
1955  * can be written.
1956  *
1957  * - If it sends all available data, it unregisters for write events, and stops
1958  *   the writeTimeout_.
1959  *
1960  * - If not all of the data can be sent immediately, it reschedules
1961  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1962  *   registered for write events.
1963  */
1964 void AsyncSocket::handleWrite() noexcept {
1965   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1966           << ", state=" << state_;
1967   DestructorGuard dg(this);
1968
1969   if (state_ == StateEnum::CONNECTING) {
1970     handleConnect();
1971     return;
1972   }
1973
1974   // Normal write
1975   assert(state_ == StateEnum::ESTABLISHED);
1976   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1977   assert(writeReqHead_ != nullptr);
1978
1979   // Loop until we run out of write requests,
1980   // or until this socket is moved to another EventBase.
1981   // (See the comment in handleRead() explaining how this can happen.)
1982   EventBase* originalEventBase = eventBase_;
1983   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1984     auto writeResult = writeReqHead_->performWrite();
1985     if (writeResult.writeReturn < 0) {
1986       if (writeResult.exception) {
1987         return failWrite(__func__, *writeResult.exception);
1988       }
1989       auto errnoCopy = errno;
1990       AsyncSocketException ex(
1991           AsyncSocketException::INTERNAL_ERROR,
1992           withAddr("writev() failed"),
1993           errnoCopy);
1994       return failWrite(__func__, ex);
1995     } else if (writeReqHead_->isComplete()) {
1996       // We finished this request
1997       WriteRequest* req = writeReqHead_;
1998       writeReqHead_ = req->getNext();
1999
2000       if (writeReqHead_ == nullptr) {
2001         writeReqTail_ = nullptr;
2002         // This is the last write request.
2003         // Unregister for write events and cancel the send timer
2004         // before we invoke the callback.  We have to update the state properly
2005         // before calling the callback, since it may want to detach us from
2006         // the EventBase.
2007         if (eventFlags_ & EventHandler::WRITE) {
2008           if (!updateEventRegistration(0, EventHandler::WRITE)) {
2009             assert(state_ == StateEnum::ERROR);
2010             return;
2011           }
2012           // Stop the send timeout
2013           writeTimeout_.cancelTimeout();
2014         }
2015         assert(!writeTimeout_.isScheduled());
2016
2017         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
2018         // we finish sending the last write request.
2019         //
2020         // We have to do this before invoking writeSuccess(), since
2021         // writeSuccess() may detach us from our EventBase.
2022         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
2023           assert(connectCallback_ == nullptr);
2024           shutdownFlags_ |= SHUT_WRITE;
2025
2026           if (shutdownFlags_ & SHUT_READ) {
2027             // Reads have already been shutdown.  Fully close the socket and
2028             // move to STATE_CLOSED.
2029             //
2030             // Note: This code currently moves us to STATE_CLOSED even if
2031             // close() hasn't ever been called.  This can occur if we have
2032             // received EOF from the peer and shutdownWrite() has been called
2033             // locally.  Should we bother staying in STATE_ESTABLISHED in this
2034             // case, until close() is actually called?  I can't think of a
2035             // reason why we would need to do so.  No other operations besides
2036             // calling close() or destroying the socket can be performed at
2037             // this point.
2038             assert(readCallback_ == nullptr);
2039             state_ = StateEnum::CLOSED;
2040             if (fd_ >= 0) {
2041               ioHandler_.changeHandlerFD(-1);
2042               doClose();
2043             }
2044           } else {
2045             // Reads are still enabled, so we are only doing a half-shutdown
2046             shutdown(fd_, SHUT_WR);
2047           }
2048         }
2049       }
2050
2051       // Invoke the callback
2052       WriteCallback* callback = req->getCallback();
2053       req->destroy();
2054       if (callback) {
2055         callback->writeSuccess();
2056       }
2057       // We'll continue around the loop, trying to write another request
2058     } else {
2059       // Partial write.
2060       if (bufferCallback_) {
2061         bufferCallback_->onEgressBuffered();
2062       }
2063       writeReqHead_->consume();
2064       // Stop after a partial write; it's highly likely that a subsequent write
2065       // attempt will just return EAGAIN.
2066       //
2067       // Ensure that we are registered for write events.
2068       if ((eventFlags_ & EventHandler::WRITE) == 0) {
2069         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
2070           assert(state_ == StateEnum::ERROR);
2071           return;
2072         }
2073       }
2074
2075       // Reschedule the send timeout, since we have made some write progress.
2076       if (sendTimeout_ > 0) {
2077         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
2078           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2079               withAddr("failed to reschedule write timeout"));
2080           return failWrite(__func__, ex);
2081         }
2082       }
2083       return;
2084     }
2085   }
2086   if (!writeReqHead_ && bufferCallback_) {
2087     bufferCallback_->onEgressBufferCleared();
2088   }
2089 }
2090
2091 void AsyncSocket::checkForImmediateRead() noexcept {
2092   // We currently don't attempt to perform optimistic reads in AsyncSocket.
2093   // (However, note that some subclasses do override this method.)
2094   //
2095   // Simply calling handleRead() here would be bad, as this would call
2096   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
2097   // buffer even though no data may be available.  This would waste lots of
2098   // memory, since the buffer will sit around unused until the socket actually
2099   // becomes readable.
2100   //
2101   // Checking if the socket is readable now also seems like it would probably
2102   // be a pessimism.  In most cases it probably wouldn't be readable, and we
2103   // would just waste an extra system call.  Even if it is readable, waiting to
2104   // find out from libevent on the next event loop doesn't seem that bad.
2105   //
2106   // The exception to this is if we have pre-received data. In that case there
2107   // is definitely data available immediately.
2108   if (preReceivedData_ && !preReceivedData_->empty()) {
2109     handleRead();
2110   }
2111 }
2112
2113 void AsyncSocket::handleInitialReadWrite() noexcept {
2114   // Our callers should already be holding a DestructorGuard, but grab
2115   // one here just to make sure, in case one of our calling code paths ever
2116   // changes.
2117   DestructorGuard dg(this);
2118   // If we have a readCallback_, make sure we enable read events.  We
2119   // may already be registered for reads if connectSuccess() set
2120   // the read calback.
2121   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
2122     assert(state_ == StateEnum::ESTABLISHED);
2123     assert((shutdownFlags_ & SHUT_READ) == 0);
2124     if (!updateEventRegistration(EventHandler::READ, 0)) {
2125       assert(state_ == StateEnum::ERROR);
2126       return;
2127     }
2128     checkForImmediateRead();
2129   } else if (readCallback_ == nullptr) {
2130     // Unregister for read events.
2131     updateEventRegistration(0, EventHandler::READ);
2132   }
2133
2134   // If we have write requests pending, try to send them immediately.
2135   // Since we just finished accepting, there is a very good chance that we can
2136   // write without blocking.
2137   //
2138   // However, we only process them if EventHandler::WRITE is not already set,
2139   // which means that we're already blocked on a write attempt.  (This can
2140   // happen if connectSuccess() called write() before returning.)
2141   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
2142     // Call handleWrite() to perform write processing.
2143     handleWrite();
2144   } else if (writeReqHead_ == nullptr) {
2145     // Unregister for write event.
2146     updateEventRegistration(0, EventHandler::WRITE);
2147   }
2148 }
2149
2150 void AsyncSocket::handleConnect() noexcept {
2151   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
2152           << ", state=" << state_;
2153   assert(state_ == StateEnum::CONNECTING);
2154   // SHUT_WRITE can never be set while we are still connecting;
2155   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
2156   // finishes
2157   assert((shutdownFlags_ & SHUT_WRITE) == 0);
2158
2159   // In case we had a connect timeout, cancel the timeout
2160   writeTimeout_.cancelTimeout();
2161   // We don't use a persistent registration when waiting on a connect event,
2162   // so we have been automatically unregistered now.  Update eventFlags_ to
2163   // reflect reality.
2164   assert(eventFlags_ == EventHandler::WRITE);
2165   eventFlags_ = EventHandler::NONE;
2166
2167   // Call getsockopt() to check if the connect succeeded
2168   int error;
2169   socklen_t len = sizeof(error);
2170   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
2171   if (rv != 0) {
2172     auto errnoCopy = errno;
2173     AsyncSocketException ex(
2174         AsyncSocketException::INTERNAL_ERROR,
2175         withAddr("error calling getsockopt() after connect"),
2176         errnoCopy);
2177     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2178                << fd_ << " host=" << addr_.describe()
2179                << ") exception:" << ex.what();
2180     return failConnect(__func__, ex);
2181   }
2182
2183   if (error != 0) {
2184     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2185                            "connect failed", error);
2186     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2187             << fd_ << " host=" << addr_.describe()
2188             << ") exception: " << ex.what();
2189     return failConnect(__func__, ex);
2190   }
2191
2192   // Move into STATE_ESTABLISHED
2193   state_ = StateEnum::ESTABLISHED;
2194
2195   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
2196   // perform, immediately shutdown the write half of the socket.
2197   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
2198     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
2199     // are still connecting we just abort the connect rather than waiting for
2200     // it to complete.
2201     assert((shutdownFlags_ & SHUT_READ) == 0);
2202     shutdown(fd_, SHUT_WR);
2203     shutdownFlags_ |= SHUT_WRITE;
2204   }
2205
2206   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
2207           << "successfully connected; state=" << state_;
2208
2209   // Remember the EventBase we are attached to, before we start invoking any
2210   // callbacks (since the callbacks may call detachEventBase()).
2211   EventBase* originalEventBase = eventBase_;
2212
2213   invokeConnectSuccess();
2214   // Note that the connect callback may have changed our state.
2215   // (set or unset the read callback, called write(), closed the socket, etc.)
2216   // The following code needs to handle these situations correctly.
2217   //
2218   // If the socket has been closed, readCallback_ and writeReqHead_ will
2219   // always be nullptr, so that will prevent us from trying to read or write.
2220   //
2221   // The main thing to check for is if eventBase_ is still originalEventBase.
2222   // If not, we have been detached from this event base, so we shouldn't
2223   // perform any more operations.
2224   if (eventBase_ != originalEventBase) {
2225     return;
2226   }
2227
2228   handleInitialReadWrite();
2229 }
2230
2231 void AsyncSocket::timeoutExpired() noexcept {
2232   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
2233           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
2234   DestructorGuard dg(this);
2235   eventBase_->dcheckIsInEventBaseThread();
2236
2237   if (state_ == StateEnum::CONNECTING) {
2238     // connect() timed out
2239     // Unregister for I/O events.
2240     if (connectCallback_) {
2241       AsyncSocketException ex(
2242           AsyncSocketException::TIMED_OUT,
2243           folly::sformat(
2244               "connect timed out after {}ms", connectTimeout_.count()));
2245       failConnect(__func__, ex);
2246     } else {
2247       // we faced a connect error without a connect callback, which could
2248       // happen due to TFO.
2249       AsyncSocketException ex(
2250           AsyncSocketException::TIMED_OUT, "write timed out during connection");
2251       failWrite(__func__, ex);
2252     }
2253   } else {
2254     // a normal write operation timed out
2255     AsyncSocketException ex(
2256         AsyncSocketException::TIMED_OUT,
2257         folly::sformat("write timed out after {}ms", sendTimeout_));
2258     failWrite(__func__, ex);
2259   }
2260 }
2261
2262 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
2263   return detail::tfo_sendmsg(fd, msg, msg_flags);
2264 }
2265
2266 AsyncSocket::WriteResult
2267 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
2268   ssize_t totalWritten = 0;
2269   if (state_ == StateEnum::FAST_OPEN) {
2270     sockaddr_storage addr;
2271     auto len = addr_.getAddress(&addr);
2272     msg->msg_name = &addr;
2273     msg->msg_namelen = len;
2274     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
2275     if (totalWritten >= 0) {
2276       tfoFinished_ = true;
2277       state_ = StateEnum::ESTABLISHED;
2278       // We schedule this asynchrously so that we don't end up
2279       // invoking initial read or write while a write is in progress.
2280       scheduleInitialReadWrite();
2281     } else if (errno == EINPROGRESS) {
2282       VLOG(4) << "TFO falling back to connecting";
2283       // A normal sendmsg doesn't return EINPROGRESS, however
2284       // TFO might fallback to connecting if there is no
2285       // cookie.
2286       state_ = StateEnum::CONNECTING;
2287       try {
2288         scheduleConnectTimeout();
2289         registerForConnectEvents();
2290       } catch (const AsyncSocketException& ex) {
2291         return WriteResult(
2292             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2293       }
2294       // Let's fake it that no bytes were written and return an errno.
2295       errno = EAGAIN;
2296       totalWritten = -1;
2297     } else if (errno == EOPNOTSUPP) {
2298       // Try falling back to connecting.
2299       VLOG(4) << "TFO not supported";
2300       state_ = StateEnum::CONNECTING;
2301       try {
2302         int ret = socketConnect((const sockaddr*)&addr, len);
2303         if (ret == 0) {
2304           // connect succeeded immediately
2305           // Treat this like no data was written.
2306           state_ = StateEnum::ESTABLISHED;
2307           scheduleInitialReadWrite();
2308         }
2309         // If there was no exception during connections,
2310         // we would return that no bytes were written.
2311         errno = EAGAIN;
2312         totalWritten = -1;
2313       } catch (const AsyncSocketException& ex) {
2314         return WriteResult(
2315             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2316       }
2317     } else if (errno == EAGAIN) {
2318       // Normally sendmsg would indicate that the write would block.
2319       // However in the fast open case, it would indicate that sendmsg
2320       // fell back to a connect. This is a return code from connect()
2321       // instead, and is an error condition indicating no fds available.
2322       return WriteResult(
2323           WRITE_ERROR,
2324           std::make_unique<AsyncSocketException>(
2325               AsyncSocketException::UNKNOWN, "No more free local ports"));
2326     }
2327   } else {
2328     totalWritten = ::sendmsg(fd, msg, msg_flags);
2329   }
2330   return WriteResult(totalWritten);
2331 }
2332
2333 AsyncSocket::WriteResult AsyncSocket::performWrite(
2334     const iovec* vec,
2335     uint32_t count,
2336     WriteFlags flags,
2337     uint32_t* countWritten,
2338     uint32_t* partialWritten) {
2339   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
2340   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
2341   // (since it may terminate the program if the main program doesn't explicitly
2342   // ignore it).
2343   struct msghdr msg;
2344   msg.msg_name = nullptr;
2345   msg.msg_namelen = 0;
2346   msg.msg_iov = const_cast<iovec *>(vec);
2347   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
2348   msg.msg_flags = 0;
2349   msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
2350   CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
2351            msg.msg_controllen);
2352
2353   if (msg.msg_controllen != 0) {
2354     msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
2355     sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
2356   } else {
2357     msg.msg_control = nullptr;
2358   }
2359   int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
2360
2361   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2362   auto totalWritten = writeResult.writeReturn;
2363   if (totalWritten < 0) {
2364     bool tryAgain = (errno == EAGAIN);
2365 #ifdef __APPLE__
2366     // Apple has a bug where doing a second write on a socket which we
2367     // have opened with TFO causes an ENOTCONN to be thrown. However the
2368     // socket is really connected, so treat ENOTCONN as a EAGAIN until
2369     // this bug is fixed.
2370     tryAgain |= (errno == ENOTCONN);
2371 #endif
2372
2373     // workaround for running with zerocopy enabled but without a proper
2374     // memlock value - see ulimit -l
2375     if (zeroCopyEnabled_ && (errno == ENOBUFS)) {
2376       tryAgain = true;
2377       zeroCopyEnabled_ = false;
2378     }
2379
2380     if (!writeResult.exception && tryAgain) {
2381       // TCP buffer is full; we can't write any more data right now.
2382       *countWritten = 0;
2383       *partialWritten = 0;
2384       return WriteResult(0);
2385     }
2386     // error
2387     *countWritten = 0;
2388     *partialWritten = 0;
2389     return writeResult;
2390   }
2391
2392   appBytesWritten_ += totalWritten;
2393
2394   uint32_t bytesWritten;
2395   uint32_t n;
2396   for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
2397     const iovec* v = vec + n;
2398     if (v->iov_len > bytesWritten) {
2399       // Partial write finished in the middle of this iovec
2400       *countWritten = n;
2401       *partialWritten = bytesWritten;
2402       return WriteResult(totalWritten);
2403     }
2404
2405     bytesWritten -= uint32_t(v->iov_len);
2406   }
2407
2408   assert(bytesWritten == 0);
2409   *countWritten = n;
2410   *partialWritten = 0;
2411   return WriteResult(totalWritten);
2412 }
2413
2414 /**
2415  * Re-register the EventHandler after eventFlags_ has changed.
2416  *
2417  * If an error occurs, fail() is called to move the socket into the error state
2418  * and call all currently installed callbacks.  After an error, the
2419  * AsyncSocket is completely unregistered.
2420  *
2421  * @return Returns true on success, or false on error.
2422  */
2423 bool AsyncSocket::updateEventRegistration() {
2424   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
2425           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
2426           << ", events=" << std::hex << eventFlags_;
2427   eventBase_->dcheckIsInEventBaseThread();
2428   if (eventFlags_ == EventHandler::NONE) {
2429     ioHandler_.unregisterHandler();
2430     return true;
2431   }
2432
2433   // Always register for persistent events, so we don't have to re-register
2434   // after being called back.
2435   if (!ioHandler_.registerHandler(
2436           uint16_t(eventFlags_ | EventHandler::PERSIST))) {
2437     eventFlags_ = EventHandler::NONE; // we're not registered after error
2438     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2439         withAddr("failed to update AsyncSocket event registration"));
2440     fail("updateEventRegistration", ex);
2441     return false;
2442   }
2443
2444   return true;
2445 }
2446
2447 bool AsyncSocket::updateEventRegistration(uint16_t enable,
2448                                            uint16_t disable) {
2449   uint16_t oldFlags = eventFlags_;
2450   eventFlags_ |= enable;
2451   eventFlags_ &= ~disable;
2452   if (eventFlags_ == oldFlags) {
2453     return true;
2454   } else {
2455     return updateEventRegistration();
2456   }
2457 }
2458
2459 void AsyncSocket::startFail() {
2460   // startFail() should only be called once
2461   assert(state_ != StateEnum::ERROR);
2462   assert(getDestructorGuardCount() > 0);
2463   state_ = StateEnum::ERROR;
2464   // Ensure that SHUT_READ and SHUT_WRITE are set,
2465   // so all future attempts to read or write will be rejected
2466   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
2467
2468   if (eventFlags_ != EventHandler::NONE) {
2469     eventFlags_ = EventHandler::NONE;
2470     ioHandler_.unregisterHandler();
2471   }
2472   writeTimeout_.cancelTimeout();
2473
2474   if (fd_ >= 0) {
2475     ioHandler_.changeHandlerFD(-1);
2476     doClose();
2477   }
2478 }
2479
2480 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
2481   invokeConnectErr(ex);
2482   failAllWrites(ex);
2483
2484   if (readCallback_) {
2485     ReadCallback* callback = readCallback_;
2486     readCallback_ = nullptr;
2487     callback->readErr(ex);
2488   }
2489 }
2490
2491 void AsyncSocket::finishFail() {
2492   assert(state_ == StateEnum::ERROR);
2493   assert(getDestructorGuardCount() > 0);
2494
2495   AsyncSocketException ex(
2496       AsyncSocketException::INTERNAL_ERROR,
2497       withAddr("socket closing after error"));
2498   invokeAllErrors(ex);
2499 }
2500
2501 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2502   assert(state_ == StateEnum::ERROR);
2503   assert(getDestructorGuardCount() > 0);
2504   invokeAllErrors(ex);
2505 }
2506
2507 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2508   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2509              << state_ << " host=" << addr_.describe()
2510              << "): failed in " << fn << "(): "
2511              << ex.what();
2512   startFail();
2513   finishFail();
2514 }
2515
2516 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2517   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2518                << state_ << " host=" << addr_.describe()
2519                << "): failed while connecting in " << fn << "(): "
2520                << ex.what();
2521   startFail();
2522
2523   invokeConnectErr(ex);
2524   finishFail(ex);
2525 }
2526
2527 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2528   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2529                << state_ << " host=" << addr_.describe()
2530                << "): failed while reading in " << fn << "(): "
2531                << ex.what();
2532   startFail();
2533
2534   if (readCallback_ != nullptr) {
2535     ReadCallback* callback = readCallback_;
2536     readCallback_ = nullptr;
2537     callback->readErr(ex);
2538   }
2539
2540   finishFail();
2541 }
2542
2543 void AsyncSocket::failErrMessageRead(const char* fn,
2544                                      const AsyncSocketException& ex) {
2545   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2546                << state_ << " host=" << addr_.describe()
2547                << "): failed while reading message in " << fn << "(): "
2548                << ex.what();
2549   startFail();
2550
2551   if (errMessageCallback_ != nullptr) {
2552     ErrMessageCallback* callback = errMessageCallback_;
2553     errMessageCallback_ = nullptr;
2554     callback->errMessageError(ex);
2555   }
2556
2557   finishFail();
2558 }
2559
2560 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2561   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2562                << state_ << " host=" << addr_.describe()
2563                << "): failed while writing in " << fn << "(): "
2564                << ex.what();
2565   startFail();
2566
2567   // Only invoke the first write callback, since the error occurred while
2568   // writing this request.  Let any other pending write callbacks be invoked in
2569   // finishFail().
2570   if (writeReqHead_ != nullptr) {
2571     WriteRequest* req = writeReqHead_;
2572     writeReqHead_ = req->getNext();
2573     WriteCallback* callback = req->getCallback();
2574     uint32_t bytesWritten = req->getTotalBytesWritten();
2575     req->destroy();
2576     if (callback) {
2577       callback->writeErr(bytesWritten, ex);
2578     }
2579   }
2580
2581   finishFail();
2582 }
2583
2584 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2585                              size_t bytesWritten,
2586                              const AsyncSocketException& ex) {
2587   // This version of failWrite() is used when the failure occurs before
2588   // we've added the callback to writeReqHead_.
2589   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2590              << state_ << " host=" << addr_.describe()
2591              <<"): failed while writing in " << fn << "(): "
2592              << ex.what();
2593   startFail();
2594
2595   if (callback != nullptr) {
2596     callback->writeErr(bytesWritten, ex);
2597   }
2598
2599   finishFail();
2600 }
2601
2602 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2603   // Invoke writeError() on all write callbacks.
2604   // This is used when writes are forcibly shutdown with write requests
2605   // pending, or when an error occurs with writes pending.
2606   while (writeReqHead_ != nullptr) {
2607     WriteRequest* req = writeReqHead_;
2608     writeReqHead_ = req->getNext();
2609     WriteCallback* callback = req->getCallback();
2610     if (callback) {
2611       callback->writeErr(req->getTotalBytesWritten(), ex);
2612     }
2613     req->destroy();
2614   }
2615 }
2616
2617 void AsyncSocket::invalidState(ConnectCallback* callback) {
2618   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2619           << "): connect() called in invalid state " << state_;
2620
2621   /*
2622    * The invalidState() methods don't use the normal failure mechanisms,
2623    * since we don't know what state we are in.  We don't want to call
2624    * startFail()/finishFail() recursively if we are already in the middle of
2625    * cleaning up.
2626    */
2627
2628   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2629                          "connect() called with socket in invalid state");
2630   connectEndTime_ = std::chrono::steady_clock::now();
2631   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2632     if (callback) {
2633       callback->connectErr(ex);
2634     }
2635   } else {
2636     // We can't use failConnect() here since connectCallback_
2637     // may already be set to another callback.  Invoke this ConnectCallback
2638     // here; any other connectCallback_ will be invoked in finishFail()
2639     startFail();
2640     if (callback) {
2641       callback->connectErr(ex);
2642     }
2643     finishFail();
2644   }
2645 }
2646
2647 void AsyncSocket::invalidState(ErrMessageCallback* callback) {
2648   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2649           << "): setErrMessageCB(" << callback
2650           << ") called in invalid state " << state_;
2651
2652   AsyncSocketException ex(
2653       AsyncSocketException::NOT_OPEN,
2654       msgErrQueueSupported
2655       ? "setErrMessageCB() called with socket in invalid state"
2656       : "This platform does not support socket error message notifications");
2657   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2658     if (callback) {
2659       callback->errMessageError(ex);
2660     }
2661   } else {
2662     startFail();
2663     if (callback) {
2664       callback->errMessageError(ex);
2665     }
2666     finishFail();
2667   }
2668 }
2669
2670 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2671   connectEndTime_ = std::chrono::steady_clock::now();
2672   if (connectCallback_) {
2673     ConnectCallback* callback = connectCallback_;
2674     connectCallback_ = nullptr;
2675     callback->connectErr(ex);
2676   }
2677 }
2678
2679 void AsyncSocket::invokeConnectSuccess() {
2680   connectEndTime_ = std::chrono::steady_clock::now();
2681   if (connectCallback_) {
2682     ConnectCallback* callback = connectCallback_;
2683     connectCallback_ = nullptr;
2684     callback->connectSuccess();
2685   }
2686 }
2687
2688 void AsyncSocket::invalidState(ReadCallback* callback) {
2689   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2690              << "): setReadCallback(" << callback
2691              << ") called in invalid state " << state_;
2692
2693   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2694                          "setReadCallback() called with socket in "
2695                          "invalid state");
2696   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2697     if (callback) {
2698       callback->readErr(ex);
2699     }
2700   } else {
2701     startFail();
2702     if (callback) {
2703       callback->readErr(ex);
2704     }
2705     finishFail();
2706   }
2707 }
2708
2709 void AsyncSocket::invalidState(WriteCallback* callback) {
2710   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2711              << "): write() called in invalid state " << state_;
2712
2713   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2714                          withAddr("write() called with socket in invalid state"));
2715   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2716     if (callback) {
2717       callback->writeErr(0, ex);
2718     }
2719   } else {
2720     startFail();
2721     if (callback) {
2722       callback->writeErr(0, ex);
2723     }
2724     finishFail();
2725   }
2726 }
2727
2728 void AsyncSocket::doClose() {
2729   if (fd_ == -1) {
2730     return;
2731   }
2732   if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
2733     shutdownSocketSet->close(fd_);
2734   } else {
2735     ::close(fd_);
2736   }
2737   fd_ = -1;
2738 }
2739
2740 std::ostream& operator << (std::ostream& os,
2741                            const AsyncSocket::StateEnum& state) {
2742   os << static_cast<int>(state);
2743   return os;
2744 }
2745
2746 std::string AsyncSocket::withAddr(const std::string& s) {
2747   // Don't use addr_ directly because it may not be initialized
2748   // e.g. if constructed from fd
2749   folly::SocketAddress peer, local;
2750   try {
2751     getPeerAddress(&peer);
2752     getLocalAddress(&local);
2753   } catch (const std::exception&) {
2754     // ignore
2755   } catch (...) {
2756     // ignore
2757   }
2758   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2759 }
2760
2761 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2762   bufferCallback_ = cb;
2763 }
2764
2765 } // namespace folly