Support MSVC, which does not have VLAs, in folly/io/async/AsyncSocket.cpp
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   assert(fd_ == -1);
322   state_ = StateEnum::CONNECTING;
323   connectCallback_ = callback;
324
325   sockaddr_storage addrStorage;
326   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
327
328   try {
329     // Create the socket
330     // Technically the first parameter should actually be a protocol family
331     // constant (PF_xxx) rather than an address family (AF_xxx), but the
332     // distinction is mainly just historical.  In pretty much all
333     // implementations the PF_foo and AF_foo constants are identical.
334     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
335     if (fd_ < 0) {
336       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
337                                 withAddr("failed to create socket"), errno);
338     }
339     if (shutdownSocketSet_) {
340       shutdownSocketSet_->add(fd_);
341     }
342     ioHandler_.changeHandlerFD(fd_);
343
344     setCloseOnExec();
345
346     // Put the socket in non-blocking mode
347     int flags = fcntl(fd_, F_GETFL, 0);
348     if (flags == -1) {
349       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
350                                 withAddr("failed to get socket flags"), errno);
351     }
352     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
353     if (rv == -1) {
354       throw AsyncSocketException(
355           AsyncSocketException::INTERNAL_ERROR,
356           withAddr("failed to put socket in non-blocking mode"),
357           errno);
358     }
359
360 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
361     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
362     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
363     if (rv == -1) {
364       throw AsyncSocketException(
365           AsyncSocketException::INTERNAL_ERROR,
366           "failed to enable F_SETNOSIGPIPE on socket",
367           errno);
368     }
369 #endif
370
371     // By default, turn on TCP_NODELAY
372     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
373     // setNoDelay() will log an error message if it fails.
374     if (address.getFamily() != AF_UNIX) {
375       (void)setNoDelay(true);
376     }
377
378     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
379             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
380
381     // bind the socket
382     if (bindAddr != anyAddress()) {
383       int one = 1;
384       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
385         doClose();
386         throw AsyncSocketException(
387           AsyncSocketException::NOT_OPEN,
388           "failed to setsockopt prior to bind on " + bindAddr.describe(),
389           errno);
390       }
391
392       bindAddr.getAddress(&addrStorage);
393
394       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
395         doClose();
396         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
397                                   "failed to bind to async socket: " +
398                                   bindAddr.describe(),
399                                   errno);
400       }
401     }
402
403     // Apply the additional options if any.
404     for (const auto& opt: options) {
405       int rv = opt.first.apply(fd_, opt.second);
406       if (rv != 0) {
407         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
408                                   withAddr("failed to set socket option"),
409                                   errno);
410       }
411     }
412
413     // Perform the connect()
414     address.getAddress(&addrStorage);
415
416     rv = ::connect(fd_, saddr, address.getActualSize());
417     if (rv < 0) {
418       if (errno == EINPROGRESS) {
419         // Connection in progress.
420         if (timeout > 0) {
421           // Start a timer in case the connection takes too long.
422           if (!writeTimeout_.scheduleTimeout(timeout)) {
423             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
424                 withAddr("failed to schedule AsyncSocket connect timeout"));
425           }
426         }
427
428         // Register for write events, so we'll
429         // be notified when the connection finishes/fails.
430         // Note that we don't register for a persistent event here.
431         assert(eventFlags_ == EventHandler::NONE);
432         eventFlags_ = EventHandler::WRITE;
433         if (!ioHandler_.registerHandler(eventFlags_)) {
434           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
435               withAddr("failed to register AsyncSocket connect handler"));
436         }
437         return;
438       } else {
439         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
440                                   "connect failed (immediately)", errno);
441       }
442     }
443
444     // If we're still here the connect() succeeded immediately.
445     // Fall through to call the callback outside of this try...catch block
446   } catch (const AsyncSocketException& ex) {
447     return failConnect(__func__, ex);
448   } catch (const std::exception& ex) {
449     // shouldn't happen, but handle it just in case
450     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
451                << "): unexpected " << typeid(ex).name() << " exception: "
452                << ex.what();
453     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
454                             withAddr(string("unexpected exception: ") +
455                                      ex.what()));
456     return failConnect(__func__, tex);
457   }
458
459   // The connection succeeded immediately
460   // The read callback may not have been set yet, and no writes may be pending
461   // yet, so we don't have to register for any events at the moment.
462   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
463   assert(readCallback_ == nullptr);
464   assert(writeReqHead_ == nullptr);
465   state_ = StateEnum::ESTABLISHED;
466   if (callback) {
467     connectCallback_ = nullptr;
468     callback->connectSuccess();
469   }
470 }
471
472 void AsyncSocket::connect(ConnectCallback* callback,
473                            const string& ip, uint16_t port,
474                            int timeout,
475                            const OptionMap &options) noexcept {
476   DestructorGuard dg(this);
477   try {
478     connectCallback_ = callback;
479     connect(callback, folly::SocketAddress(ip, port), timeout, options);
480   } catch (const std::exception& ex) {
481     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
482                             ex.what());
483     return failConnect(__func__, tex);
484   }
485 }
486
487 void AsyncSocket::cancelConnect() {
488   connectCallback_ = nullptr;
489   if (state_ == StateEnum::CONNECTING) {
490     closeNow();
491   }
492 }
493
494 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
495   sendTimeout_ = milliseconds;
496   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
497
498   // If we are currently pending on write requests, immediately update
499   // writeTimeout_ with the new value.
500   if ((eventFlags_ & EventHandler::WRITE) &&
501       (state_ != StateEnum::CONNECTING)) {
502     assert(state_ == StateEnum::ESTABLISHED);
503     assert((shutdownFlags_ & SHUT_WRITE) == 0);
504     if (sendTimeout_ > 0) {
505       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
506         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
507             withAddr("failed to reschedule send timeout in setSendTimeout"));
508         return failWrite(__func__, ex);
509       }
510     } else {
511       writeTimeout_.cancelTimeout();
512     }
513   }
514 }
515
516 void AsyncSocket::setReadCB(ReadCallback *callback) {
517   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
518           << ", callback=" << callback << ", state=" << state_;
519
520   // Short circuit if callback is the same as the existing readCallback_.
521   //
522   // Note that this is needed for proper functioning during some cleanup cases.
523   // During cleanup we allow setReadCallback(nullptr) to be called even if the
524   // read callback is already unset and we have been detached from an event
525   // base.  This check prevents us from asserting
526   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
527   if (callback == readCallback_) {
528     return;
529   }
530
531   /* We are removing a read callback */
532   if (callback == nullptr &&
533       immediateReadHandler_.isLoopCallbackScheduled()) {
534     immediateReadHandler_.cancelLoopCallback();
535   }
536
537   if (shutdownFlags_ & SHUT_READ) {
538     // Reads have already been shut down on this socket.
539     //
540     // Allow setReadCallback(nullptr) to be called in this case, but don't
541     // allow a new callback to be set.
542     //
543     // For example, setReadCallback(nullptr) can happen after an error if we
544     // invoke some other error callback before invoking readError().  The other
545     // error callback that is invoked first may go ahead and clear the read
546     // callback before we get a chance to invoke readError().
547     if (callback != nullptr) {
548       return invalidState(callback);
549     }
550     assert((eventFlags_ & EventHandler::READ) == 0);
551     readCallback_ = nullptr;
552     return;
553   }
554
555   DestructorGuard dg(this);
556   assert(eventBase_->isInEventBaseThread());
557
558   switch ((StateEnum)state_) {
559     case StateEnum::CONNECTING:
560       // For convenience, we allow the read callback to be set while we are
561       // still connecting.  We just store the callback for now.  Once the
562       // connection completes we'll register for read events.
563       readCallback_ = callback;
564       return;
565     case StateEnum::ESTABLISHED:
566     {
567       readCallback_ = callback;
568       uint16_t oldFlags = eventFlags_;
569       if (readCallback_) {
570         eventFlags_ |= EventHandler::READ;
571       } else {
572         eventFlags_ &= ~EventHandler::READ;
573       }
574
575       // Update our registration if our flags have changed
576       if (eventFlags_ != oldFlags) {
577         // We intentionally ignore the return value here.
578         // updateEventRegistration() will move us into the error state if it
579         // fails, and we don't need to do anything else here afterwards.
580         (void)updateEventRegistration();
581       }
582
583       if (readCallback_) {
584         checkForImmediateRead();
585       }
586       return;
587     }
588     case StateEnum::CLOSED:
589     case StateEnum::ERROR:
590       // We should never reach here.  SHUT_READ should always be set
591       // if we are in STATE_CLOSED or STATE_ERROR.
592       assert(false);
593       return invalidState(callback);
594     case StateEnum::UNINIT:
595       // We do not allow setReadCallback() to be called before we start
596       // connecting.
597       return invalidState(callback);
598   }
599
600   // We don't put a default case in the switch statement, so that the compiler
601   // will warn us to update the switch statement if a new state is added.
602   return invalidState(callback);
603 }
604
605 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
606   return readCallback_;
607 }
608
609 void AsyncSocket::write(WriteCallback* callback,
610                          const void* buf, size_t bytes, WriteFlags flags) {
611   iovec op;
612   op.iov_base = const_cast<void*>(buf);
613   op.iov_len = bytes;
614   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
615 }
616
617 void AsyncSocket::writev(WriteCallback* callback,
618                           const iovec* vec,
619                           size_t count,
620                           WriteFlags flags) {
621   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
622 }
623
624 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
625                               WriteFlags flags) {
626   constexpr size_t kSmallSizeMax = 64;
627   size_t count = buf->countChainElements();
628   if (count <= kSmallSizeMax) {
629     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
630     writeChainImpl(callback, vec, count, std::move(buf), flags);
631   } else {
632     iovec* vec = new iovec[count];
633     writeChainImpl(callback, vec, count, std::move(buf), flags);
634     delete[] vec;
635   }
636 }
637
638 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
639     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
640   size_t veclen = buf->fillIov(vec, count);
641   writeImpl(callback, vec, veclen, std::move(buf), flags);
642 }
643
644 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
645                              size_t count, unique_ptr<IOBuf>&& buf,
646                              WriteFlags flags) {
647   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
648           << ", callback=" << callback << ", count=" << count
649           << ", state=" << state_;
650   DestructorGuard dg(this);
651   unique_ptr<IOBuf>ioBuf(std::move(buf));
652   assert(eventBase_->isInEventBaseThread());
653
654   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
655     // No new writes may be performed after the write side of the socket has
656     // been shutdown.
657     //
658     // We could just call callback->writeError() here to fail just this write.
659     // However, fail hard and use invalidState() to fail all outstanding
660     // callbacks and move the socket into the error state.  There's most likely
661     // a bug in the caller's code, so we abort everything rather than trying to
662     // proceed as best we can.
663     return invalidState(callback);
664   }
665
666   uint32_t countWritten = 0;
667   uint32_t partialWritten = 0;
668   int bytesWritten = 0;
669   bool mustRegister = false;
670   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
671     if (writeReqHead_ == nullptr) {
672       // If we are established and there are no other writes pending,
673       // we can attempt to perform the write immediately.
674       assert(writeReqTail_ == nullptr);
675       assert((eventFlags_ & EventHandler::WRITE) == 0);
676
677       bytesWritten = performWrite(vec, count, flags,
678                                   &countWritten, &partialWritten);
679       if (bytesWritten < 0) {
680         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
681                                withAddr("writev failed"), errno);
682         return failWrite(__func__, callback, 0, ex);
683       } else if (countWritten == count) {
684         // We successfully wrote everything.
685         // Invoke the callback and return.
686         if (callback) {
687           callback->writeSuccess();
688         }
689         return;
690       } // else { continue writing the next writeReq }
691       mustRegister = true;
692     }
693   } else if (!connecting()) {
694     // Invalid state for writing
695     return invalidState(callback);
696   }
697
698   // Create a new WriteRequest to add to the queue
699   WriteRequest* req;
700   try {
701     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
702                                         count - countWritten, partialWritten,
703                                         bytesWritten, std::move(ioBuf), flags);
704   } catch (const std::exception& ex) {
705     // we mainly expect to catch std::bad_alloc here
706     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
707         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
708     return failWrite(__func__, callback, bytesWritten, tex);
709   }
710   req->consume();
711   if (writeReqTail_ == nullptr) {
712     assert(writeReqHead_ == nullptr);
713     writeReqHead_ = writeReqTail_ = req;
714   } else {
715     writeReqTail_->append(req);
716     writeReqTail_ = req;
717   }
718
719   // Register for write events if are established and not currently
720   // waiting on write events
721   if (mustRegister) {
722     assert(state_ == StateEnum::ESTABLISHED);
723     assert((eventFlags_ & EventHandler::WRITE) == 0);
724     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
725       assert(state_ == StateEnum::ERROR);
726       return;
727     }
728     if (sendTimeout_ > 0) {
729       // Schedule a timeout to fire if the write takes too long.
730       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
731         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
732                                withAddr("failed to schedule send timeout"));
733         return failWrite(__func__, ex);
734       }
735     }
736   }
737 }
738
739 void AsyncSocket::writeRequest(WriteRequest* req) {
740   if (writeReqTail_ == nullptr) {
741     assert(writeReqHead_ == nullptr);
742     writeReqHead_ = writeReqTail_ = req;
743     req->start();
744   } else {
745     writeReqTail_->append(req);
746     writeReqTail_ = req;
747   }
748 }
749
750 void AsyncSocket::close() {
751   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
752           << ", state=" << state_ << ", shutdownFlags="
753           << std::hex << (int) shutdownFlags_;
754
755   // close() is only different from closeNow() when there are pending writes
756   // that need to drain before we can close.  In all other cases, just call
757   // closeNow().
758   //
759   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
760   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
761   // is still running.  (e.g., If there are multiple pending writes, and we
762   // call writeError() on the first one, it may call close().  In this case we
763   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
764   // writes will still be in the queue.)
765   //
766   // We only need to drain pending writes if we are still in STATE_CONNECTING
767   // or STATE_ESTABLISHED
768   if ((writeReqHead_ == nullptr) ||
769       !(state_ == StateEnum::CONNECTING ||
770       state_ == StateEnum::ESTABLISHED)) {
771     closeNow();
772     return;
773   }
774
775   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
776   // destroyed until close() returns.
777   DestructorGuard dg(this);
778   assert(eventBase_->isInEventBaseThread());
779
780   // Since there are write requests pending, we have to set the
781   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
782   // connect finishes and we finish writing these requests.
783   //
784   // Set SHUT_READ to indicate that reads are shut down, and set the
785   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
786   // pending writes complete.
787   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
788
789   // If a read callback is set, invoke readEOF() immediately to inform it that
790   // the socket has been closed and no more data can be read.
791   if (readCallback_) {
792     // Disable reads if they are enabled
793     if (!updateEventRegistration(0, EventHandler::READ)) {
794       // We're now in the error state; callbacks have been cleaned up
795       assert(state_ == StateEnum::ERROR);
796       assert(readCallback_ == nullptr);
797     } else {
798       ReadCallback* callback = readCallback_;
799       readCallback_ = nullptr;
800       callback->readEOF();
801     }
802   }
803 }
804
805 void AsyncSocket::closeNow() {
806   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
807           << ", state=" << state_ << ", shutdownFlags="
808           << std::hex << (int) shutdownFlags_;
809   DestructorGuard dg(this);
810   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
811
812   switch (state_) {
813     case StateEnum::ESTABLISHED:
814     case StateEnum::CONNECTING:
815     {
816       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
817       state_ = StateEnum::CLOSED;
818
819       // If the write timeout was set, cancel it.
820       writeTimeout_.cancelTimeout();
821
822       // If we are registered for I/O events, unregister.
823       if (eventFlags_ != EventHandler::NONE) {
824         eventFlags_ = EventHandler::NONE;
825         if (!updateEventRegistration()) {
826           // We will have been moved into the error state.
827           assert(state_ == StateEnum::ERROR);
828           return;
829         }
830       }
831
832       if (immediateReadHandler_.isLoopCallbackScheduled()) {
833         immediateReadHandler_.cancelLoopCallback();
834       }
835
836       if (fd_ >= 0) {
837         ioHandler_.changeHandlerFD(-1);
838         doClose();
839       }
840
841       if (connectCallback_) {
842         ConnectCallback* callback = connectCallback_;
843         connectCallback_ = nullptr;
844         callback->connectErr(socketClosedLocallyEx);
845       }
846
847       failAllWrites(socketClosedLocallyEx);
848
849       if (readCallback_) {
850         ReadCallback* callback = readCallback_;
851         readCallback_ = nullptr;
852         callback->readEOF();
853       }
854       return;
855     }
856     case StateEnum::CLOSED:
857       // Do nothing.  It's possible that we are being called recursively
858       // from inside a callback that we invoked inside another call to close()
859       // that is still running.
860       return;
861     case StateEnum::ERROR:
862       // Do nothing.  The error handling code has performed (or is performing)
863       // cleanup.
864       return;
865     case StateEnum::UNINIT:
866       assert(eventFlags_ == EventHandler::NONE);
867       assert(connectCallback_ == nullptr);
868       assert(readCallback_ == nullptr);
869       assert(writeReqHead_ == nullptr);
870       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
871       state_ = StateEnum::CLOSED;
872       return;
873   }
874
875   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
876               << ") called in unknown state " << state_;
877 }
878
879 void AsyncSocket::closeWithReset() {
880   // Enable SO_LINGER, with the linger timeout set to 0.
881   // This will trigger a TCP reset when we close the socket.
882   if (fd_ >= 0) {
883     struct linger optLinger = {1, 0};
884     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
885       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
886               << "on " << fd_ << ": errno=" << errno;
887     }
888   }
889
890   // Then let closeNow() take care of the rest
891   closeNow();
892 }
893
894 void AsyncSocket::shutdownWrite() {
895   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
896           << ", state=" << state_ << ", shutdownFlags="
897           << std::hex << (int) shutdownFlags_;
898
899   // If there are no pending writes, shutdownWrite() is identical to
900   // shutdownWriteNow().
901   if (writeReqHead_ == nullptr) {
902     shutdownWriteNow();
903     return;
904   }
905
906   assert(eventBase_->isInEventBaseThread());
907
908   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
909   // shutdown will be performed once all writes complete.
910   shutdownFlags_ |= SHUT_WRITE_PENDING;
911 }
912
913 void AsyncSocket::shutdownWriteNow() {
914   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
915           << ", fd=" << fd_ << ", state=" << state_
916           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
917
918   if (shutdownFlags_ & SHUT_WRITE) {
919     // Writes are already shutdown; nothing else to do.
920     return;
921   }
922
923   // If SHUT_READ is already set, just call closeNow() to completely
924   // close the socket.  This can happen if close() was called with writes
925   // pending, and then shutdownWriteNow() is called before all pending writes
926   // complete.
927   if (shutdownFlags_ & SHUT_READ) {
928     closeNow();
929     return;
930   }
931
932   DestructorGuard dg(this);
933   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
934
935   switch (static_cast<StateEnum>(state_)) {
936     case StateEnum::ESTABLISHED:
937     {
938       shutdownFlags_ |= SHUT_WRITE;
939
940       // If the write timeout was set, cancel it.
941       writeTimeout_.cancelTimeout();
942
943       // If we are registered for write events, unregister.
944       if (!updateEventRegistration(0, EventHandler::WRITE)) {
945         // We will have been moved into the error state.
946         assert(state_ == StateEnum::ERROR);
947         return;
948       }
949
950       // Shutdown writes on the file descriptor
951       ::shutdown(fd_, SHUT_WR);
952
953       // Immediately fail all write requests
954       failAllWrites(socketShutdownForWritesEx);
955       return;
956     }
957     case StateEnum::CONNECTING:
958     {
959       // Set the SHUT_WRITE_PENDING flag.
960       // When the connection completes, it will check this flag,
961       // shutdown the write half of the socket, and then set SHUT_WRITE.
962       shutdownFlags_ |= SHUT_WRITE_PENDING;
963
964       // Immediately fail all write requests
965       failAllWrites(socketShutdownForWritesEx);
966       return;
967     }
968     case StateEnum::UNINIT:
969       // Callers normally shouldn't call shutdownWriteNow() before the socket
970       // even starts connecting.  Nonetheless, go ahead and set
971       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
972       // immediately shut down the write side of the socket.
973       shutdownFlags_ |= SHUT_WRITE_PENDING;
974       return;
975     case StateEnum::CLOSED:
976     case StateEnum::ERROR:
977       // We should never get here.  SHUT_WRITE should always be set
978       // in STATE_CLOSED and STATE_ERROR.
979       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
980                  << ", fd=" << fd_ << ") in unexpected state " << state_
981                  << " with SHUT_WRITE not set ("
982                  << std::hex << (int) shutdownFlags_ << ")";
983       assert(false);
984       return;
985   }
986
987   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
988               << fd_ << ") called in unknown state " << state_;
989 }
990
991 bool AsyncSocket::readable() const {
992   if (fd_ == -1) {
993     return false;
994   }
995   struct pollfd fds[1];
996   fds[0].fd = fd_;
997   fds[0].events = POLLIN;
998   fds[0].revents = 0;
999   int rc = poll(fds, 1, 0);
1000   return rc == 1;
1001 }
1002
1003 bool AsyncSocket::isPending() const {
1004   return ioHandler_.isPending();
1005 }
1006
1007 bool AsyncSocket::hangup() const {
1008   if (fd_ == -1) {
1009     // sanity check, no one should ask for hangup if we are not connected.
1010     assert(false);
1011     return false;
1012   }
1013 #ifdef POLLRDHUP // Linux-only
1014   struct pollfd fds[1];
1015   fds[0].fd = fd_;
1016   fds[0].events = POLLRDHUP|POLLHUP;
1017   fds[0].revents = 0;
1018   poll(fds, 1, 0);
1019   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1020 #else
1021   return false;
1022 #endif
1023 }
1024
1025 bool AsyncSocket::good() const {
1026   return ((state_ == StateEnum::CONNECTING ||
1027           state_ == StateEnum::ESTABLISHED) &&
1028           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1029 }
1030
1031 bool AsyncSocket::error() const {
1032   return (state_ == StateEnum::ERROR);
1033 }
1034
1035 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1036   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1037           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1038           << ", state=" << state_ << ", events="
1039           << std::hex << eventFlags_ << ")";
1040   assert(eventBase_ == nullptr);
1041   assert(eventBase->isInEventBaseThread());
1042
1043   eventBase_ = eventBase;
1044   ioHandler_.attachEventBase(eventBase);
1045   writeTimeout_.attachEventBase(eventBase);
1046 }
1047
1048 void AsyncSocket::detachEventBase() {
1049   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1050           << ", old evb=" << eventBase_ << ", state=" << state_
1051           << ", events=" << std::hex << eventFlags_ << ")";
1052   assert(eventBase_ != nullptr);
1053   assert(eventBase_->isInEventBaseThread());
1054
1055   eventBase_ = nullptr;
1056   ioHandler_.detachEventBase();
1057   writeTimeout_.detachEventBase();
1058 }
1059
1060 bool AsyncSocket::isDetachable() const {
1061   DCHECK(eventBase_ != nullptr);
1062   DCHECK(eventBase_->isInEventBaseThread());
1063
1064   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1065 }
1066
1067 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1068   address->setFromLocalAddress(fd_);
1069 }
1070
1071 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1072   if (!addr_.isInitialized()) {
1073     addr_.setFromPeerAddress(fd_);
1074   }
1075   *address = addr_;
1076 }
1077
1078 int AsyncSocket::setNoDelay(bool noDelay) {
1079   if (fd_ < 0) {
1080     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1081                << this << "(state=" << state_ << ")";
1082     return EINVAL;
1083
1084   }
1085
1086   int value = noDelay ? 1 : 0;
1087   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1088     int errnoCopy = errno;
1089     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1090             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1091             << strerror(errnoCopy);
1092     return errnoCopy;
1093   }
1094
1095   return 0;
1096 }
1097
1098 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1099
1100   #ifndef TCP_CONGESTION
1101   #define TCP_CONGESTION  13
1102   #endif
1103
1104   if (fd_ < 0) {
1105     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1106                << "socket " << this << "(state=" << state_ << ")";
1107     return EINVAL;
1108
1109   }
1110
1111   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1112         cname.length() + 1) != 0) {
1113     int errnoCopy = errno;
1114     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1115             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1116             << strerror(errnoCopy);
1117     return errnoCopy;
1118   }
1119
1120   return 0;
1121 }
1122
1123 int AsyncSocket::setQuickAck(bool quickack) {
1124   if (fd_ < 0) {
1125     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1126                << this << "(state=" << state_ << ")";
1127     return EINVAL;
1128
1129   }
1130
1131 #ifdef TCP_QUICKACK // Linux-only
1132   int value = quickack ? 1 : 0;
1133   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1134     int errnoCopy = errno;
1135     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1136             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1137             << strerror(errnoCopy);
1138     return errnoCopy;
1139   }
1140
1141   return 0;
1142 #else
1143   return ENOSYS;
1144 #endif
1145 }
1146
1147 int AsyncSocket::setSendBufSize(size_t bufsize) {
1148   if (fd_ < 0) {
1149     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1150                << this << "(state=" << state_ << ")";
1151     return EINVAL;
1152   }
1153
1154   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1155     int errnoCopy = errno;
1156     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1157             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1158             << strerror(errnoCopy);
1159     return errnoCopy;
1160   }
1161
1162   return 0;
1163 }
1164
1165 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1166   if (fd_ < 0) {
1167     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1168                << this << "(state=" << state_ << ")";
1169     return EINVAL;
1170   }
1171
1172   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1173     int errnoCopy = errno;
1174     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1175             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1176             << strerror(errnoCopy);
1177     return errnoCopy;
1178   }
1179
1180   return 0;
1181 }
1182
1183 int AsyncSocket::setTCPProfile(int profd) {
1184   if (fd_ < 0) {
1185     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1186                << this << "(state=" << state_ << ")";
1187     return EINVAL;
1188   }
1189
1190   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1191     int errnoCopy = errno;
1192     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1193             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1194             << strerror(errnoCopy);
1195     return errnoCopy;
1196   }
1197
1198   return 0;
1199 }
1200
1201 void AsyncSocket::ioReady(uint16_t events) noexcept {
1202   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1203           << ", events=" << std::hex << events << ", state=" << state_;
1204   DestructorGuard dg(this);
1205   assert(events & EventHandler::READ_WRITE);
1206   assert(eventBase_->isInEventBaseThread());
1207
1208   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1209   if (relevantEvents == EventHandler::READ) {
1210     handleRead();
1211   } else if (relevantEvents == EventHandler::WRITE) {
1212     handleWrite();
1213   } else if (relevantEvents == EventHandler::READ_WRITE) {
1214     EventBase* originalEventBase = eventBase_;
1215     // If both read and write events are ready, process writes first.
1216     handleWrite();
1217
1218     // Return now if handleWrite() detached us from our EventBase
1219     if (eventBase_ != originalEventBase) {
1220       return;
1221     }
1222
1223     // Only call handleRead() if a read callback is still installed.
1224     // (It's possible that the read callback was uninstalled during
1225     // handleWrite().)
1226     if (readCallback_) {
1227       handleRead();
1228     }
1229   } else {
1230     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1231                << std::hex << events << "(this=" << this << ")";
1232     abort();
1233   }
1234 }
1235
1236 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1237   VLOG(5) << "AsyncSocket::performRead() this=" << this
1238           << ", buf=" << *buf << ", buflen=" << *buflen;
1239
1240   int recvFlags = 0;
1241   if (peek_) {
1242     recvFlags |= MSG_PEEK;
1243   }
1244
1245   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1246   if (bytes < 0) {
1247     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1248       // No more data to read right now.
1249       return READ_BLOCKING;
1250     } else {
1251       return READ_ERROR;
1252     }
1253   } else {
1254     appBytesReceived_ += bytes;
1255     return bytes;
1256   }
1257 }
1258
1259 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1260   // no matter what, buffer should be preapared for non-ssl socket
1261   CHECK(readCallback_);
1262   readCallback_->getReadBuffer(buf, buflen);
1263 }
1264
1265 void AsyncSocket::handleRead() noexcept {
1266   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1267           << ", state=" << state_;
1268   assert(state_ == StateEnum::ESTABLISHED);
1269   assert((shutdownFlags_ & SHUT_READ) == 0);
1270   assert(readCallback_ != nullptr);
1271   assert(eventFlags_ & EventHandler::READ);
1272
1273   // Loop until:
1274   // - a read attempt would block
1275   // - readCallback_ is uninstalled
1276   // - the number of loop iterations exceeds the optional maximum
1277   // - this AsyncSocket is moved to another EventBase
1278   //
1279   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1280   // which is why need to check for it here.
1281   //
1282   // The last bullet point is slightly subtle.  readDataAvailable() may also
1283   // detach this socket from this EventBase.  However, before
1284   // readDataAvailable() returns another thread may pick it up, attach it to
1285   // a different EventBase, and install another readCallback_.  We need to
1286   // exit immediately after readDataAvailable() returns if the eventBase_ has
1287   // changed.  (The caller must perform some sort of locking to transfer the
1288   // AsyncSocket between threads properly.  This will be sufficient to ensure
1289   // that this thread sees the updated eventBase_ variable after
1290   // readDataAvailable() returns.)
1291   uint16_t numReads = 0;
1292   EventBase* originalEventBase = eventBase_;
1293   while (readCallback_ && eventBase_ == originalEventBase) {
1294     // Get the buffer to read into.
1295     void* buf = nullptr;
1296     size_t buflen = 0, offset = 0;
1297     try {
1298       prepareReadBuffer(&buf, &buflen);
1299       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1300     } catch (const AsyncSocketException& ex) {
1301       return failRead(__func__, ex);
1302     } catch (const std::exception& ex) {
1303       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1304                               string("ReadCallback::getReadBuffer() "
1305                                      "threw exception: ") +
1306                               ex.what());
1307       return failRead(__func__, tex);
1308     } catch (...) {
1309       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1310                              "ReadCallback::getReadBuffer() threw "
1311                              "non-exception type");
1312       return failRead(__func__, ex);
1313     }
1314     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1315       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1316                              "ReadCallback::getReadBuffer() returned "
1317                              "empty buffer");
1318       return failRead(__func__, ex);
1319     }
1320
1321     // Perform the read
1322     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1323     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1324             << bytesRead << " bytes";
1325     if (bytesRead > 0) {
1326       if (!isBufferMovable_) {
1327         readCallback_->readDataAvailable(bytesRead);
1328       } else {
1329         CHECK(kOpenSslModeMoveBufferOwnership);
1330         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1331                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1332                 << ", offset=" << offset;
1333         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1334         readBuf->trimStart(offset);
1335         readBuf->trimEnd(buflen - offset - bytesRead);
1336         readCallback_->readBufferAvailable(std::move(readBuf));
1337       }
1338
1339       // Fall through and continue around the loop if the read
1340       // completely filled the available buffer.
1341       // Note that readCallback_ may have been uninstalled or changed inside
1342       // readDataAvailable().
1343       if (size_t(bytesRead) < buflen) {
1344         return;
1345       }
1346     } else if (bytesRead == READ_BLOCKING) {
1347         // No more data to read right now.
1348         return;
1349     } else if (bytesRead == READ_ERROR) {
1350       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1351                              withAddr("recv() failed"), errno);
1352       return failRead(__func__, ex);
1353     } else {
1354       assert(bytesRead == READ_EOF);
1355       // EOF
1356       shutdownFlags_ |= SHUT_READ;
1357       if (!updateEventRegistration(0, EventHandler::READ)) {
1358         // we've already been moved into STATE_ERROR
1359         assert(state_ == StateEnum::ERROR);
1360         assert(readCallback_ == nullptr);
1361         return;
1362       }
1363
1364       ReadCallback* callback = readCallback_;
1365       readCallback_ = nullptr;
1366       callback->readEOF();
1367       return;
1368     }
1369     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1370       if (readCallback_ != nullptr) {
1371         // We might still have data in the socket.
1372         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1373         scheduleImmediateRead();
1374       }
1375       return;
1376     }
1377   }
1378 }
1379
1380 /**
1381  * This function attempts to write as much data as possible, until no more data
1382  * can be written.
1383  *
1384  * - If it sends all available data, it unregisters for write events, and stops
1385  *   the writeTimeout_.
1386  *
1387  * - If not all of the data can be sent immediately, it reschedules
1388  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1389  *   registered for write events.
1390  */
1391 void AsyncSocket::handleWrite() noexcept {
1392   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1393           << ", state=" << state_;
1394   if (state_ == StateEnum::CONNECTING) {
1395     handleConnect();
1396     return;
1397   }
1398
1399   // Normal write
1400   assert(state_ == StateEnum::ESTABLISHED);
1401   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1402   assert(writeReqHead_ != nullptr);
1403
1404   // Loop until we run out of write requests,
1405   // or until this socket is moved to another EventBase.
1406   // (See the comment in handleRead() explaining how this can happen.)
1407   EventBase* originalEventBase = eventBase_;
1408   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1409     if (!writeReqHead_->performWrite()) {
1410       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1411                              withAddr("writev() failed"), errno);
1412       return failWrite(__func__, ex);
1413     } else if (writeReqHead_->isComplete()) {
1414       // We finished this request
1415       WriteRequest* req = writeReqHead_;
1416       writeReqHead_ = req->getNext();
1417
1418       if (writeReqHead_ == nullptr) {
1419         writeReqTail_ = nullptr;
1420         // This is the last write request.
1421         // Unregister for write events and cancel the send timer
1422         // before we invoke the callback.  We have to update the state properly
1423         // before calling the callback, since it may want to detach us from
1424         // the EventBase.
1425         if (eventFlags_ & EventHandler::WRITE) {
1426           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1427             assert(state_ == StateEnum::ERROR);
1428             return;
1429           }
1430           // Stop the send timeout
1431           writeTimeout_.cancelTimeout();
1432         }
1433         assert(!writeTimeout_.isScheduled());
1434
1435         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1436         // we finish sending the last write request.
1437         //
1438         // We have to do this before invoking writeSuccess(), since
1439         // writeSuccess() may detach us from our EventBase.
1440         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1441           assert(connectCallback_ == nullptr);
1442           shutdownFlags_ |= SHUT_WRITE;
1443
1444           if (shutdownFlags_ & SHUT_READ) {
1445             // Reads have already been shutdown.  Fully close the socket and
1446             // move to STATE_CLOSED.
1447             //
1448             // Note: This code currently moves us to STATE_CLOSED even if
1449             // close() hasn't ever been called.  This can occur if we have
1450             // received EOF from the peer and shutdownWrite() has been called
1451             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1452             // case, until close() is actually called?  I can't think of a
1453             // reason why we would need to do so.  No other operations besides
1454             // calling close() or destroying the socket can be performed at
1455             // this point.
1456             assert(readCallback_ == nullptr);
1457             state_ = StateEnum::CLOSED;
1458             if (fd_ >= 0) {
1459               ioHandler_.changeHandlerFD(-1);
1460               doClose();
1461             }
1462           } else {
1463             // Reads are still enabled, so we are only doing a half-shutdown
1464             ::shutdown(fd_, SHUT_WR);
1465           }
1466         }
1467       }
1468
1469       // Invoke the callback
1470       WriteCallback* callback = req->getCallback();
1471       req->destroy();
1472       if (callback) {
1473         callback->writeSuccess();
1474       }
1475       // We'll continue around the loop, trying to write another request
1476     } else {
1477       // Partial write.
1478       writeReqHead_->consume();
1479       // Stop after a partial write; it's highly likely that a subsequent write
1480       // attempt will just return EAGAIN.
1481       //
1482       // Ensure that we are registered for write events.
1483       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1484         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1485           assert(state_ == StateEnum::ERROR);
1486           return;
1487         }
1488       }
1489
1490       // Reschedule the send timeout, since we have made some write progress.
1491       if (sendTimeout_ > 0) {
1492         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1493           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1494               withAddr("failed to reschedule write timeout"));
1495           return failWrite(__func__, ex);
1496         }
1497       }
1498       return;
1499     }
1500   }
1501 }
1502
1503 void AsyncSocket::checkForImmediateRead() noexcept {
1504   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1505   // (However, note that some subclasses do override this method.)
1506   //
1507   // Simply calling handleRead() here would be bad, as this would call
1508   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1509   // buffer even though no data may be available.  This would waste lots of
1510   // memory, since the buffer will sit around unused until the socket actually
1511   // becomes readable.
1512   //
1513   // Checking if the socket is readable now also seems like it would probably
1514   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1515   // would just waste an extra system call.  Even if it is readable, waiting to
1516   // find out from libevent on the next event loop doesn't seem that bad.
1517 }
1518
1519 void AsyncSocket::handleInitialReadWrite() noexcept {
1520   // Our callers should already be holding a DestructorGuard, but grab
1521   // one here just to make sure, in case one of our calling code paths ever
1522   // changes.
1523   DestructorGuard dg(this);
1524
1525   // If we have a readCallback_, make sure we enable read events.  We
1526   // may already be registered for reads if connectSuccess() set
1527   // the read calback.
1528   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1529     assert(state_ == StateEnum::ESTABLISHED);
1530     assert((shutdownFlags_ & SHUT_READ) == 0);
1531     if (!updateEventRegistration(EventHandler::READ, 0)) {
1532       assert(state_ == StateEnum::ERROR);
1533       return;
1534     }
1535     checkForImmediateRead();
1536   } else if (readCallback_ == nullptr) {
1537     // Unregister for read events.
1538     updateEventRegistration(0, EventHandler::READ);
1539   }
1540
1541   // If we have write requests pending, try to send them immediately.
1542   // Since we just finished accepting, there is a very good chance that we can
1543   // write without blocking.
1544   //
1545   // However, we only process them if EventHandler::WRITE is not already set,
1546   // which means that we're already blocked on a write attempt.  (This can
1547   // happen if connectSuccess() called write() before returning.)
1548   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1549     // Call handleWrite() to perform write processing.
1550     handleWrite();
1551   } else if (writeReqHead_ == nullptr) {
1552     // Unregister for write event.
1553     updateEventRegistration(0, EventHandler::WRITE);
1554   }
1555 }
1556
1557 void AsyncSocket::handleConnect() noexcept {
1558   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1559           << ", state=" << state_;
1560   assert(state_ == StateEnum::CONNECTING);
1561   // SHUT_WRITE can never be set while we are still connecting;
1562   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1563   // finishes
1564   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1565
1566   // In case we had a connect timeout, cancel the timeout
1567   writeTimeout_.cancelTimeout();
1568   // We don't use a persistent registration when waiting on a connect event,
1569   // so we have been automatically unregistered now.  Update eventFlags_ to
1570   // reflect reality.
1571   assert(eventFlags_ == EventHandler::WRITE);
1572   eventFlags_ = EventHandler::NONE;
1573
1574   // Call getsockopt() to check if the connect succeeded
1575   int error;
1576   socklen_t len = sizeof(error);
1577   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1578   if (rv != 0) {
1579     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1580                            withAddr("error calling getsockopt() after connect"),
1581                            errno);
1582     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1583                << fd_ << " host=" << addr_.describe()
1584                << ") exception:" << ex.what();
1585     return failConnect(__func__, ex);
1586   }
1587
1588   if (error != 0) {
1589     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1590                            "connect failed", error);
1591     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1592             << fd_ << " host=" << addr_.describe()
1593             << ") exception: " << ex.what();
1594     return failConnect(__func__, ex);
1595   }
1596
1597   // Move into STATE_ESTABLISHED
1598   state_ = StateEnum::ESTABLISHED;
1599
1600   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1601   // perform, immediately shutdown the write half of the socket.
1602   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1603     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1604     // are still connecting we just abort the connect rather than waiting for
1605     // it to complete.
1606     assert((shutdownFlags_ & SHUT_READ) == 0);
1607     ::shutdown(fd_, SHUT_WR);
1608     shutdownFlags_ |= SHUT_WRITE;
1609   }
1610
1611   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1612           << "successfully connected; state=" << state_;
1613
1614   // Remember the EventBase we are attached to, before we start invoking any
1615   // callbacks (since the callbacks may call detachEventBase()).
1616   EventBase* originalEventBase = eventBase_;
1617
1618   // Call the connect callback.
1619   if (connectCallback_) {
1620     ConnectCallback* callback = connectCallback_;
1621     connectCallback_ = nullptr;
1622     callback->connectSuccess();
1623   }
1624
1625   // Note that the connect callback may have changed our state.
1626   // (set or unset the read callback, called write(), closed the socket, etc.)
1627   // The following code needs to handle these situations correctly.
1628   //
1629   // If the socket has been closed, readCallback_ and writeReqHead_ will
1630   // always be nullptr, so that will prevent us from trying to read or write.
1631   //
1632   // The main thing to check for is if eventBase_ is still originalEventBase.
1633   // If not, we have been detached from this event base, so we shouldn't
1634   // perform any more operations.
1635   if (eventBase_ != originalEventBase) {
1636     return;
1637   }
1638
1639   handleInitialReadWrite();
1640 }
1641
1642 void AsyncSocket::timeoutExpired() noexcept {
1643   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1644           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1645   DestructorGuard dg(this);
1646   assert(eventBase_->isInEventBaseThread());
1647
1648   if (state_ == StateEnum::CONNECTING) {
1649     // connect() timed out
1650     // Unregister for I/O events.
1651     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1652                            "connect timed out");
1653     failConnect(__func__, ex);
1654   } else {
1655     // a normal write operation timed out
1656     assert(state_ == StateEnum::ESTABLISHED);
1657     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1658     failWrite(__func__, ex);
1659   }
1660 }
1661
1662 ssize_t AsyncSocket::performWrite(const iovec* vec,
1663                                    uint32_t count,
1664                                    WriteFlags flags,
1665                                    uint32_t* countWritten,
1666                                    uint32_t* partialWritten) {
1667   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1668   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1669   // (since it may terminate the program if the main program doesn't explicitly
1670   // ignore it).
1671   struct msghdr msg;
1672   msg.msg_name = nullptr;
1673   msg.msg_namelen = 0;
1674   msg.msg_iov = const_cast<iovec *>(vec);
1675 #ifdef IOV_MAX // not defined on Android
1676   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1677 #else
1678   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1679 #endif
1680   msg.msg_control = nullptr;
1681   msg.msg_controllen = 0;
1682   msg.msg_flags = 0;
1683
1684   int msg_flags = MSG_DONTWAIT;
1685
1686 #ifdef MSG_NOSIGNAL // Linux-only
1687   msg_flags |= MSG_NOSIGNAL;
1688   if (isSet(flags, WriteFlags::CORK)) {
1689     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1690     // give it the rest of the data rather than immediately sending a partial
1691     // frame, even when TCP_NODELAY is enabled.
1692     msg_flags |= MSG_MORE;
1693   }
1694 #endif
1695   if (isSet(flags, WriteFlags::EOR)) {
1696     // marks that this is the last byte of a record (response)
1697     msg_flags |= MSG_EOR;
1698   }
1699   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1700   if (totalWritten < 0) {
1701     if (errno == EAGAIN) {
1702       // TCP buffer is full; we can't write any more data right now.
1703       *countWritten = 0;
1704       *partialWritten = 0;
1705       return 0;
1706     }
1707     // error
1708     *countWritten = 0;
1709     *partialWritten = 0;
1710     return -1;
1711   }
1712
1713   appBytesWritten_ += totalWritten;
1714
1715   uint32_t bytesWritten;
1716   uint32_t n;
1717   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1718     const iovec* v = vec + n;
1719     if (v->iov_len > bytesWritten) {
1720       // Partial write finished in the middle of this iovec
1721       *countWritten = n;
1722       *partialWritten = bytesWritten;
1723       return totalWritten;
1724     }
1725
1726     bytesWritten -= v->iov_len;
1727   }
1728
1729   assert(bytesWritten == 0);
1730   *countWritten = n;
1731   *partialWritten = 0;
1732   return totalWritten;
1733 }
1734
1735 /**
1736  * Re-register the EventHandler after eventFlags_ has changed.
1737  *
1738  * If an error occurs, fail() is called to move the socket into the error state
1739  * and call all currently installed callbacks.  After an error, the
1740  * AsyncSocket is completely unregistered.
1741  *
1742  * @return Returns true on succcess, or false on error.
1743  */
1744 bool AsyncSocket::updateEventRegistration() {
1745   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1746           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1747           << ", events=" << std::hex << eventFlags_;
1748   assert(eventBase_->isInEventBaseThread());
1749   if (eventFlags_ == EventHandler::NONE) {
1750     ioHandler_.unregisterHandler();
1751     return true;
1752   }
1753
1754   // Always register for persistent events, so we don't have to re-register
1755   // after being called back.
1756   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1757     eventFlags_ = EventHandler::NONE; // we're not registered after error
1758     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1759         withAddr("failed to update AsyncSocket event registration"));
1760     fail("updateEventRegistration", ex);
1761     return false;
1762   }
1763
1764   return true;
1765 }
1766
1767 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1768                                            uint16_t disable) {
1769   uint16_t oldFlags = eventFlags_;
1770   eventFlags_ |= enable;
1771   eventFlags_ &= ~disable;
1772   if (eventFlags_ == oldFlags) {
1773     return true;
1774   } else {
1775     return updateEventRegistration();
1776   }
1777 }
1778
1779 void AsyncSocket::startFail() {
1780   // startFail() should only be called once
1781   assert(state_ != StateEnum::ERROR);
1782   assert(getDestructorGuardCount() > 0);
1783   state_ = StateEnum::ERROR;
1784   // Ensure that SHUT_READ and SHUT_WRITE are set,
1785   // so all future attempts to read or write will be rejected
1786   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1787
1788   if (eventFlags_ != EventHandler::NONE) {
1789     eventFlags_ = EventHandler::NONE;
1790     ioHandler_.unregisterHandler();
1791   }
1792   writeTimeout_.cancelTimeout();
1793
1794   if (fd_ >= 0) {
1795     ioHandler_.changeHandlerFD(-1);
1796     doClose();
1797   }
1798 }
1799
1800 void AsyncSocket::finishFail() {
1801   assert(state_ == StateEnum::ERROR);
1802   assert(getDestructorGuardCount() > 0);
1803
1804   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1805                          withAddr("socket closing after error"));
1806   if (connectCallback_) {
1807     ConnectCallback* callback = connectCallback_;
1808     connectCallback_ = nullptr;
1809     callback->connectErr(ex);
1810   }
1811
1812   failAllWrites(ex);
1813
1814   if (readCallback_) {
1815     ReadCallback* callback = readCallback_;
1816     readCallback_ = nullptr;
1817     callback->readErr(ex);
1818   }
1819 }
1820
1821 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1822   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1823              << state_ << " host=" << addr_.describe()
1824              << "): failed in " << fn << "(): "
1825              << ex.what();
1826   startFail();
1827   finishFail();
1828 }
1829
1830 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1831   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1832                << state_ << " host=" << addr_.describe()
1833                << "): failed while connecting in " << fn << "(): "
1834                << ex.what();
1835   startFail();
1836
1837   if (connectCallback_ != nullptr) {
1838     ConnectCallback* callback = connectCallback_;
1839     connectCallback_ = nullptr;
1840     callback->connectErr(ex);
1841   }
1842
1843   finishFail();
1844 }
1845
1846 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1847   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1848                << state_ << " host=" << addr_.describe()
1849                << "): failed while reading in " << fn << "(): "
1850                << ex.what();
1851   startFail();
1852
1853   if (readCallback_ != nullptr) {
1854     ReadCallback* callback = readCallback_;
1855     readCallback_ = nullptr;
1856     callback->readErr(ex);
1857   }
1858
1859   finishFail();
1860 }
1861
1862 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1863   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1864                << state_ << " host=" << addr_.describe()
1865                << "): failed while writing in " << fn << "(): "
1866                << ex.what();
1867   startFail();
1868
1869   // Only invoke the first write callback, since the error occurred while
1870   // writing this request.  Let any other pending write callbacks be invoked in
1871   // finishFail().
1872   if (writeReqHead_ != nullptr) {
1873     WriteRequest* req = writeReqHead_;
1874     writeReqHead_ = req->getNext();
1875     WriteCallback* callback = req->getCallback();
1876     uint32_t bytesWritten = req->getTotalBytesWritten();
1877     req->destroy();
1878     if (callback) {
1879       callback->writeErr(bytesWritten, ex);
1880     }
1881   }
1882
1883   finishFail();
1884 }
1885
1886 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1887                              size_t bytesWritten,
1888                              const AsyncSocketException& ex) {
1889   // This version of failWrite() is used when the failure occurs before
1890   // we've added the callback to writeReqHead_.
1891   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1892              << state_ << " host=" << addr_.describe()
1893              <<"): failed while writing in " << fn << "(): "
1894              << ex.what();
1895   startFail();
1896
1897   if (callback != nullptr) {
1898     callback->writeErr(bytesWritten, ex);
1899   }
1900
1901   finishFail();
1902 }
1903
1904 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1905   // Invoke writeError() on all write callbacks.
1906   // This is used when writes are forcibly shutdown with write requests
1907   // pending, or when an error occurs with writes pending.
1908   while (writeReqHead_ != nullptr) {
1909     WriteRequest* req = writeReqHead_;
1910     writeReqHead_ = req->getNext();
1911     WriteCallback* callback = req->getCallback();
1912     if (callback) {
1913       callback->writeErr(req->getTotalBytesWritten(), ex);
1914     }
1915     req->destroy();
1916   }
1917 }
1918
1919 void AsyncSocket::invalidState(ConnectCallback* callback) {
1920   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1921              << "): connect() called in invalid state " << state_;
1922
1923   /*
1924    * The invalidState() methods don't use the normal failure mechanisms,
1925    * since we don't know what state we are in.  We don't want to call
1926    * startFail()/finishFail() recursively if we are already in the middle of
1927    * cleaning up.
1928    */
1929
1930   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1931                          "connect() called with socket in invalid state");
1932   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1933     if (callback) {
1934       callback->connectErr(ex);
1935     }
1936   } else {
1937     // We can't use failConnect() here since connectCallback_
1938     // may already be set to another callback.  Invoke this ConnectCallback
1939     // here; any other connectCallback_ will be invoked in finishFail()
1940     startFail();
1941     if (callback) {
1942       callback->connectErr(ex);
1943     }
1944     finishFail();
1945   }
1946 }
1947
1948 void AsyncSocket::invalidState(ReadCallback* callback) {
1949   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1950              << "): setReadCallback(" << callback
1951              << ") called in invalid state " << state_;
1952
1953   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1954                          "setReadCallback() called with socket in "
1955                          "invalid state");
1956   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1957     if (callback) {
1958       callback->readErr(ex);
1959     }
1960   } else {
1961     startFail();
1962     if (callback) {
1963       callback->readErr(ex);
1964     }
1965     finishFail();
1966   }
1967 }
1968
1969 void AsyncSocket::invalidState(WriteCallback* callback) {
1970   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1971              << "): write() called in invalid state " << state_;
1972
1973   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1974                          withAddr("write() called with socket in invalid state"));
1975   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1976     if (callback) {
1977       callback->writeErr(0, ex);
1978     }
1979   } else {
1980     startFail();
1981     if (callback) {
1982       callback->writeErr(0, ex);
1983     }
1984     finishFail();
1985   }
1986 }
1987
1988 void AsyncSocket::doClose() {
1989   if (fd_ == -1) return;
1990   if (shutdownSocketSet_) {
1991     shutdownSocketSet_->close(fd_);
1992   } else {
1993     ::close(fd_);
1994   }
1995   fd_ = -1;
1996 }
1997
1998 std::ostream& operator << (std::ostream& os,
1999                            const AsyncSocket::StateEnum& state) {
2000   os << static_cast<int>(state);
2001   return os;
2002 }
2003
2004 std::string AsyncSocket::withAddr(const std::string& s) {
2005   // Don't use addr_ directly because it may not be initialized
2006   // e.g. if constructed from fd
2007   folly::SocketAddress peer, local;
2008   try {
2009     getPeerAddress(&peer);
2010     getLocalAddress(&local);
2011   } catch (const std::exception&) {
2012     // ignore
2013   } catch (...) {
2014     // ignore
2015   }
2016   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2017 }
2018
2019 } // folly