make AsyncSocket::WriteRequest an interface
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22
23 #include <poll.h>
24 #include <errno.h>
25 #include <limits.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <netinet/tcp.h>
32
33 using std::string;
34 using std::unique_ptr;
35
36 namespace folly {
37
38 // static members initializers
39 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
40
41 const AsyncSocketException socketClosedLocallyEx(
42     AsyncSocketException::END_OF_FILE, "socket closed locally");
43 const AsyncSocketException socketShutdownForWritesEx(
44     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
45
46 // TODO: It might help performance to provide a version of WriteRequest that
47 // users could derive from, so we can avoid the extra allocation for each call
48 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
49 // protocols are currently templatized for transports.
50 //
51 // We would need the version for external users where they provide the iovec
52 // storage space, and only our internal version would allocate it at the end of
53 // the WriteRequest.
54
55 /**
56  * A WriteRequest object tracks information about a pending write operation.
57  */
58 class AsyncSocket::WriteRequest {
59  public:
60   WriteRequest(AsyncSocket* socket,
61                WriteRequest* next,
62                WriteCallback* callback,
63                uint32_t totalBytesWritten) :
64     socket_(socket), next_(next), callback_(callback),
65     totalBytesWritten_(totalBytesWritten) {}
66
67   virtual void destroy() = 0;
68
69   virtual bool performWrite() = 0;
70
71   virtual void consume() = 0;
72
73   virtual bool isComplete() = 0;
74
75   WriteRequest* getNext() const {
76     return next_;
77   }
78
79   WriteCallback* getCallback() const {
80     return callback_;
81   }
82
83   uint32_t getTotalBytesWritten() const {
84     return totalBytesWritten_;
85   }
86
87   void append(WriteRequest* next) {
88     assert(next_ == nullptr);
89     next_ = next;
90   }
91
92  protected:
93   // protected destructor, to ensure callers use destroy()
94   virtual ~WriteRequest() {}
95
96   AsyncSocket* socket_;         ///< parent socket
97   WriteRequest* next_;          ///< pointer to next WriteRequest
98   WriteCallback* callback_;     ///< completion callback
99   uint32_t totalBytesWritten_;  ///< total bytes written
100 };
101
102 /* The default WriteRequest implementation, used for write(), writev() and
103  * writeChain()
104  *
105  * A new BytesWriteRequest operation is allocated on the heap for all write
106  * operations that cannot be completed immediately.
107  */
108 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
109  public:
110   static BytesWriteRequest* newRequest(AsyncSocket* socket,
111                                        WriteCallback* callback,
112                                        const iovec* ops,
113                                        uint32_t opCount,
114                                        uint32_t partialWritten,
115                                        uint32_t bytesWritten,
116                                        unique_ptr<IOBuf>&& ioBuf,
117                                        WriteFlags flags) {
118     assert(opCount > 0);
119     // Since we put a variable size iovec array at the end
120     // of each BytesWriteRequest, we have to manually allocate the memory.
121     void* buf = malloc(sizeof(BytesWriteRequest) +
122                        (opCount * sizeof(struct iovec)));
123     if (buf == nullptr) {
124       throw std::bad_alloc();
125     }
126
127     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
128                                       partialWritten, bytesWritten,
129                                       std::move(ioBuf), flags);
130   }
131
132   void destroy() override {
133     this->~BytesWriteRequest();
134     free(this);
135   }
136
137   bool performWrite() override {
138     WriteFlags writeFlags = flags_;
139     if (getNext() != nullptr) {
140       writeFlags = writeFlags | WriteFlags::CORK;
141     }
142     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
143                                           &opsWritten_, &partialBytes_);
144     return bytesWritten_ >= 0;
145   }
146
147   bool isComplete() override {
148     return opsWritten_ == getOpCount();
149   }
150
151   void consume() override {
152     // Advance opIndex_ forward by opsWritten_
153     opIndex_ += opsWritten_;
154     assert(opIndex_ < opCount_);
155
156     // If we've finished writing any IOBufs, release them
157     if (ioBuf_) {
158       for (uint32_t i = opsWritten_; i != 0; --i) {
159         assert(ioBuf_);
160         ioBuf_ = ioBuf_->pop();
161       }
162     }
163
164     // Move partialBytes_ forward into the current iovec buffer
165     struct iovec* currentOp = writeOps_ + opIndex_;
166     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
167     currentOp->iov_base =
168       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
169     currentOp->iov_len -= partialBytes_;
170
171     // Increment the totalBytesWritten_ count by bytesWritten_;
172     totalBytesWritten_ += bytesWritten_;
173   }
174
175  private:
176   BytesWriteRequest(AsyncSocket* socket,
177                     WriteCallback* callback,
178                     const struct iovec* ops,
179                     uint32_t opCount,
180                     uint32_t partialBytes,
181                     uint32_t bytesWritten,
182                     unique_ptr<IOBuf>&& ioBuf,
183                     WriteFlags flags)
184     : AsyncSocket::WriteRequest(socket, nullptr, callback, 0)
185     , opCount_(opCount)
186     , opIndex_(0)
187     , flags_(flags)
188     , ioBuf_(std::move(ioBuf))
189     , opsWritten_(0)
190     , partialBytes_(partialBytes)
191     , bytesWritten_(bytesWritten) {
192     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
193   }
194
195   // private destructor, to ensure callers use destroy()
196   virtual ~BytesWriteRequest() {}
197
198   const struct iovec* getOps() const {
199     assert(opCount_ > opIndex_);
200     return writeOps_ + opIndex_;
201   }
202
203   uint32_t getOpCount() const {
204     assert(opCount_ > opIndex_);
205     return opCount_ - opIndex_;
206   }
207
208   uint32_t opCount_;            ///< number of entries in writeOps_
209   uint32_t opIndex_;            ///< current index into writeOps_
210   WriteFlags flags_;            ///< set for WriteFlags
211   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
212
213   // for consume(), how much we wrote on the last write
214   uint32_t opsWritten_;         ///< complete ops written
215   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
216   ssize_t bytesWritten_;        ///< bytes written altogether
217
218   struct iovec writeOps_[];     ///< write operation(s) list
219 };
220
221 AsyncSocket::AsyncSocket()
222   : eventBase_(nullptr)
223   , writeTimeout_(this, nullptr)
224   , ioHandler_(this, nullptr) {
225   VLOG(5) << "new AsyncSocket()";
226   init();
227 }
228
229 AsyncSocket::AsyncSocket(EventBase* evb)
230   : eventBase_(evb)
231   , writeTimeout_(this, evb)
232   , ioHandler_(this, evb) {
233   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
234   init();
235 }
236
237 AsyncSocket::AsyncSocket(EventBase* evb,
238                            const folly::SocketAddress& address,
239                            uint32_t connectTimeout)
240   : AsyncSocket(evb) {
241   connect(nullptr, address, connectTimeout);
242 }
243
244 AsyncSocket::AsyncSocket(EventBase* evb,
245                            const std::string& ip,
246                            uint16_t port,
247                            uint32_t connectTimeout)
248   : AsyncSocket(evb) {
249   connect(nullptr, ip, port, connectTimeout);
250 }
251
252 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
253   : eventBase_(evb)
254   , writeTimeout_(this, evb)
255   , ioHandler_(this, evb, fd) {
256   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
257           << fd << ")";
258   init();
259   fd_ = fd;
260   setCloseOnExec();
261   state_ = StateEnum::ESTABLISHED;
262 }
263
264 // init() method, since constructor forwarding isn't supported in most
265 // compilers yet.
266 void AsyncSocket::init() {
267   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
268   shutdownFlags_ = 0;
269   state_ = StateEnum::UNINIT;
270   eventFlags_ = EventHandler::NONE;
271   fd_ = -1;
272   sendTimeout_ = 0;
273   maxReadsPerEvent_ = 16;
274   connectCallback_ = nullptr;
275   readCallback_ = nullptr;
276   writeReqHead_ = nullptr;
277   writeReqTail_ = nullptr;
278   shutdownSocketSet_ = nullptr;
279   appBytesWritten_ = 0;
280   appBytesReceived_ = 0;
281 }
282
283 AsyncSocket::~AsyncSocket() {
284   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
285           << ", evb=" << eventBase_ << ", fd=" << fd_
286           << ", state=" << state_ << ")";
287 }
288
289 void AsyncSocket::destroy() {
290   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
291           << ", fd=" << fd_ << ", state=" << state_;
292   // When destroy is called, close the socket immediately
293   closeNow();
294
295   // Then call DelayedDestruction::destroy() to take care of
296   // whether or not we need immediate or delayed destruction
297   DelayedDestruction::destroy();
298 }
299
300 int AsyncSocket::detachFd() {
301   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
302           << ", evb=" << eventBase_ << ", state=" << state_
303           << ", events=" << std::hex << eventFlags_ << ")";
304   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
305   // actually close the descriptor.
306   if (shutdownSocketSet_) {
307     shutdownSocketSet_->remove(fd_);
308   }
309   int fd = fd_;
310   fd_ = -1;
311   // Call closeNow() to invoke all pending callbacks with an error.
312   closeNow();
313   // Update the EventHandler to stop using this fd.
314   // This can only be done after closeNow() unregisters the handler.
315   ioHandler_.changeHandlerFD(-1);
316   return fd;
317 }
318
319 const folly::SocketAddress& AsyncSocket::anyAddress() {
320   static const folly::SocketAddress anyAddress =
321     folly::SocketAddress("0.0.0.0", 0);
322   return anyAddress;
323 }
324
325 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
326   if (shutdownSocketSet_ == newSS) {
327     return;
328   }
329   if (shutdownSocketSet_ && fd_ != -1) {
330     shutdownSocketSet_->remove(fd_);
331   }
332   shutdownSocketSet_ = newSS;
333   if (shutdownSocketSet_ && fd_ != -1) {
334     shutdownSocketSet_->add(fd_);
335   }
336 }
337
338 void AsyncSocket::setCloseOnExec() {
339   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
340   if (rv != 0) {
341     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
342                                withAddr("failed to set close-on-exec flag"),
343                                errno);
344   }
345 }
346
347 void AsyncSocket::connect(ConnectCallback* callback,
348                            const folly::SocketAddress& address,
349                            int timeout,
350                            const OptionMap &options,
351                            const folly::SocketAddress& bindAddr) noexcept {
352   DestructorGuard dg(this);
353   assert(eventBase_->isInEventBaseThread());
354
355   addr_ = address;
356
357   // Make sure we're in the uninitialized state
358   if (state_ != StateEnum::UNINIT) {
359     return invalidState(callback);
360   }
361
362   assert(fd_ == -1);
363   state_ = StateEnum::CONNECTING;
364   connectCallback_ = callback;
365
366   sockaddr_storage addrStorage;
367   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
368
369   try {
370     // Create the socket
371     // Technically the first parameter should actually be a protocol family
372     // constant (PF_xxx) rather than an address family (AF_xxx), but the
373     // distinction is mainly just historical.  In pretty much all
374     // implementations the PF_foo and AF_foo constants are identical.
375     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
376     if (fd_ < 0) {
377       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
378                                 withAddr("failed to create socket"), errno);
379     }
380     if (shutdownSocketSet_) {
381       shutdownSocketSet_->add(fd_);
382     }
383     ioHandler_.changeHandlerFD(fd_);
384
385     setCloseOnExec();
386
387     // Put the socket in non-blocking mode
388     int flags = fcntl(fd_, F_GETFL, 0);
389     if (flags == -1) {
390       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
391                                 withAddr("failed to get socket flags"), errno);
392     }
393     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
394     if (rv == -1) {
395       throw AsyncSocketException(
396           AsyncSocketException::INTERNAL_ERROR,
397           withAddr("failed to put socket in non-blocking mode"),
398           errno);
399     }
400
401 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
402     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
403     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
404     if (rv == -1) {
405       throw AsyncSocketException(
406           AsyncSocketException::INTERNAL_ERROR,
407           "failed to enable F_SETNOSIGPIPE on socket",
408           errno);
409     }
410 #endif
411
412     // By default, turn on TCP_NODELAY
413     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
414     // setNoDelay() will log an error message if it fails.
415     if (address.getFamily() != AF_UNIX) {
416       (void)setNoDelay(true);
417     }
418
419     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
420             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
421
422     // bind the socket
423     if (bindAddr != anyAddress()) {
424       int one = 1;
425       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
426         doClose();
427         throw AsyncSocketException(
428           AsyncSocketException::NOT_OPEN,
429           "failed to setsockopt prior to bind on " + bindAddr.describe(),
430           errno);
431       }
432
433       bindAddr.getAddress(&addrStorage);
434
435       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
436         doClose();
437         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
438                                   "failed to bind to async socket: " +
439                                   bindAddr.describe(),
440                                   errno);
441       }
442     }
443
444     // Apply the additional options if any.
445     for (const auto& opt: options) {
446       int rv = opt.first.apply(fd_, opt.second);
447       if (rv != 0) {
448         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
449                                   withAddr("failed to set socket option"),
450                                   errno);
451       }
452     }
453
454     // Perform the connect()
455     address.getAddress(&addrStorage);
456
457     rv = ::connect(fd_, saddr, address.getActualSize());
458     if (rv < 0) {
459       if (errno == EINPROGRESS) {
460         // Connection in progress.
461         if (timeout > 0) {
462           // Start a timer in case the connection takes too long.
463           if (!writeTimeout_.scheduleTimeout(timeout)) {
464             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
465                 withAddr("failed to schedule AsyncSocket connect timeout"));
466           }
467         }
468
469         // Register for write events, so we'll
470         // be notified when the connection finishes/fails.
471         // Note that we don't register for a persistent event here.
472         assert(eventFlags_ == EventHandler::NONE);
473         eventFlags_ = EventHandler::WRITE;
474         if (!ioHandler_.registerHandler(eventFlags_)) {
475           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
476               withAddr("failed to register AsyncSocket connect handler"));
477         }
478         return;
479       } else {
480         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
481                                   "connect failed (immediately)", errno);
482       }
483     }
484
485     // If we're still here the connect() succeeded immediately.
486     // Fall through to call the callback outside of this try...catch block
487   } catch (const AsyncSocketException& ex) {
488     return failConnect(__func__, ex);
489   } catch (const std::exception& ex) {
490     // shouldn't happen, but handle it just in case
491     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
492                << "): unexpected " << typeid(ex).name() << " exception: "
493                << ex.what();
494     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
495                             withAddr(string("unexpected exception: ") +
496                                      ex.what()));
497     return failConnect(__func__, tex);
498   }
499
500   // The connection succeeded immediately
501   // The read callback may not have been set yet, and no writes may be pending
502   // yet, so we don't have to register for any events at the moment.
503   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
504   assert(readCallback_ == nullptr);
505   assert(writeReqHead_ == nullptr);
506   state_ = StateEnum::ESTABLISHED;
507   if (callback) {
508     connectCallback_ = nullptr;
509     callback->connectSuccess();
510   }
511 }
512
513 void AsyncSocket::connect(ConnectCallback* callback,
514                            const string& ip, uint16_t port,
515                            int timeout,
516                            const OptionMap &options) noexcept {
517   DestructorGuard dg(this);
518   try {
519     connectCallback_ = callback;
520     connect(callback, folly::SocketAddress(ip, port), timeout, options);
521   } catch (const std::exception& ex) {
522     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
523                             ex.what());
524     return failConnect(__func__, tex);
525   }
526 }
527
528 void AsyncSocket::cancelConnect() {
529   connectCallback_ = nullptr;
530   if (state_ == StateEnum::CONNECTING) {
531     closeNow();
532   }
533 }
534
535 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
536   sendTimeout_ = milliseconds;
537   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
538
539   // If we are currently pending on write requests, immediately update
540   // writeTimeout_ with the new value.
541   if ((eventFlags_ & EventHandler::WRITE) &&
542       (state_ != StateEnum::CONNECTING)) {
543     assert(state_ == StateEnum::ESTABLISHED);
544     assert((shutdownFlags_ & SHUT_WRITE) == 0);
545     if (sendTimeout_ > 0) {
546       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
547         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
548             withAddr("failed to reschedule send timeout in setSendTimeout"));
549         return failWrite(__func__, ex);
550       }
551     } else {
552       writeTimeout_.cancelTimeout();
553     }
554   }
555 }
556
557 void AsyncSocket::setReadCB(ReadCallback *callback) {
558   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
559           << ", callback=" << callback << ", state=" << state_;
560
561   // Short circuit if callback is the same as the existing readCallback_.
562   //
563   // Note that this is needed for proper functioning during some cleanup cases.
564   // During cleanup we allow setReadCallback(nullptr) to be called even if the
565   // read callback is already unset and we have been detached from an event
566   // base.  This check prevents us from asserting
567   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
568   if (callback == readCallback_) {
569     return;
570   }
571
572   if (shutdownFlags_ & SHUT_READ) {
573     // Reads have already been shut down on this socket.
574     //
575     // Allow setReadCallback(nullptr) to be called in this case, but don't
576     // allow a new callback to be set.
577     //
578     // For example, setReadCallback(nullptr) can happen after an error if we
579     // invoke some other error callback before invoking readError().  The other
580     // error callback that is invoked first may go ahead and clear the read
581     // callback before we get a chance to invoke readError().
582     if (callback != nullptr) {
583       return invalidState(callback);
584     }
585     assert((eventFlags_ & EventHandler::READ) == 0);
586     readCallback_ = nullptr;
587     return;
588   }
589
590   DestructorGuard dg(this);
591   assert(eventBase_->isInEventBaseThread());
592
593   switch ((StateEnum)state_) {
594     case StateEnum::CONNECTING:
595       // For convenience, we allow the read callback to be set while we are
596       // still connecting.  We just store the callback for now.  Once the
597       // connection completes we'll register for read events.
598       readCallback_ = callback;
599       return;
600     case StateEnum::ESTABLISHED:
601     {
602       readCallback_ = callback;
603       uint16_t oldFlags = eventFlags_;
604       if (readCallback_) {
605         eventFlags_ |= EventHandler::READ;
606       } else {
607         eventFlags_ &= ~EventHandler::READ;
608       }
609
610       // Update our registration if our flags have changed
611       if (eventFlags_ != oldFlags) {
612         // We intentionally ignore the return value here.
613         // updateEventRegistration() will move us into the error state if it
614         // fails, and we don't need to do anything else here afterwards.
615         (void)updateEventRegistration();
616       }
617
618       if (readCallback_) {
619         checkForImmediateRead();
620       }
621       return;
622     }
623     case StateEnum::CLOSED:
624     case StateEnum::ERROR:
625       // We should never reach here.  SHUT_READ should always be set
626       // if we are in STATE_CLOSED or STATE_ERROR.
627       assert(false);
628       return invalidState(callback);
629     case StateEnum::UNINIT:
630       // We do not allow setReadCallback() to be called before we start
631       // connecting.
632       return invalidState(callback);
633   }
634
635   // We don't put a default case in the switch statement, so that the compiler
636   // will warn us to update the switch statement if a new state is added.
637   return invalidState(callback);
638 }
639
640 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
641   return readCallback_;
642 }
643
644 void AsyncSocket::write(WriteCallback* callback,
645                          const void* buf, size_t bytes, WriteFlags flags) {
646   iovec op;
647   op.iov_base = const_cast<void*>(buf);
648   op.iov_len = bytes;
649   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
650 }
651
652 void AsyncSocket::writev(WriteCallback* callback,
653                           const iovec* vec,
654                           size_t count,
655                           WriteFlags flags) {
656   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
657 }
658
659 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
660                               WriteFlags flags) {
661   size_t count = buf->countChainElements();
662   if (count <= 64) {
663     iovec vec[count];
664     writeChainImpl(callback, vec, count, std::move(buf), flags);
665   } else {
666     iovec* vec = new iovec[count];
667     writeChainImpl(callback, vec, count, std::move(buf), flags);
668     delete[] vec;
669   }
670 }
671
672 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
673     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
674   size_t veclen = buf->fillIov(vec, count);
675   writeImpl(callback, vec, veclen, std::move(buf), flags);
676 }
677
678 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
679                              size_t count, unique_ptr<IOBuf>&& buf,
680                              WriteFlags flags) {
681   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
682           << ", callback=" << callback << ", count=" << count
683           << ", state=" << state_;
684   DestructorGuard dg(this);
685   unique_ptr<IOBuf>ioBuf(std::move(buf));
686   assert(eventBase_->isInEventBaseThread());
687
688   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
689     // No new writes may be performed after the write side of the socket has
690     // been shutdown.
691     //
692     // We could just call callback->writeError() here to fail just this write.
693     // However, fail hard and use invalidState() to fail all outstanding
694     // callbacks and move the socket into the error state.  There's most likely
695     // a bug in the caller's code, so we abort everything rather than trying to
696     // proceed as best we can.
697     return invalidState(callback);
698   }
699
700   uint32_t countWritten = 0;
701   uint32_t partialWritten = 0;
702   int bytesWritten = 0;
703   bool mustRegister = false;
704   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
705     if (writeReqHead_ == nullptr) {
706       // If we are established and there are no other writes pending,
707       // we can attempt to perform the write immediately.
708       assert(writeReqTail_ == nullptr);
709       assert((eventFlags_ & EventHandler::WRITE) == 0);
710
711       bytesWritten = performWrite(vec, count, flags,
712                                   &countWritten, &partialWritten);
713       if (bytesWritten < 0) {
714         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
715                                withAddr("writev failed"), errno);
716         return failWrite(__func__, callback, 0, ex);
717       } else if (countWritten == count) {
718         // We successfully wrote everything.
719         // Invoke the callback and return.
720         if (callback) {
721           callback->writeSuccess();
722         }
723         return;
724       } // else { continue writing the next writeReq }
725       mustRegister = true;
726     }
727   } else if (!connecting()) {
728     // Invalid state for writing
729     return invalidState(callback);
730   }
731
732   // Create a new WriteRequest to add to the queue
733   WriteRequest* req;
734   try {
735     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
736                                         count - countWritten, partialWritten,
737                                         bytesWritten, std::move(ioBuf), flags);
738   } catch (const std::exception& ex) {
739     // we mainly expect to catch std::bad_alloc here
740     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
741         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
742     return failWrite(__func__, callback, bytesWritten, tex);
743   }
744   req->consume();
745   if (writeReqTail_ == nullptr) {
746     assert(writeReqHead_ == nullptr);
747     writeReqHead_ = writeReqTail_ = req;
748   } else {
749     writeReqTail_->append(req);
750     writeReqTail_ = req;
751   }
752
753   // Register for write events if are established and not currently
754   // waiting on write events
755   if (mustRegister) {
756     assert(state_ == StateEnum::ESTABLISHED);
757     assert((eventFlags_ & EventHandler::WRITE) == 0);
758     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
759       assert(state_ == StateEnum::ERROR);
760       return;
761     }
762     if (sendTimeout_ > 0) {
763       // Schedule a timeout to fire if the write takes too long.
764       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
765         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
766                                withAddr("failed to schedule send timeout"));
767         return failWrite(__func__, ex);
768       }
769     }
770   }
771 }
772
773 void AsyncSocket::close() {
774   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
775           << ", state=" << state_ << ", shutdownFlags="
776           << std::hex << (int) shutdownFlags_;
777
778   // close() is only different from closeNow() when there are pending writes
779   // that need to drain before we can close.  In all other cases, just call
780   // closeNow().
781   //
782   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
783   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
784   // is still running.  (e.g., If there are multiple pending writes, and we
785   // call writeError() on the first one, it may call close().  In this case we
786   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
787   // writes will still be in the queue.)
788   //
789   // We only need to drain pending writes if we are still in STATE_CONNECTING
790   // or STATE_ESTABLISHED
791   if ((writeReqHead_ == nullptr) ||
792       !(state_ == StateEnum::CONNECTING ||
793       state_ == StateEnum::ESTABLISHED)) {
794     closeNow();
795     return;
796   }
797
798   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
799   // destroyed until close() returns.
800   DestructorGuard dg(this);
801   assert(eventBase_->isInEventBaseThread());
802
803   // Since there are write requests pending, we have to set the
804   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
805   // connect finishes and we finish writing these requests.
806   //
807   // Set SHUT_READ to indicate that reads are shut down, and set the
808   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
809   // pending writes complete.
810   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
811
812   // If a read callback is set, invoke readEOF() immediately to inform it that
813   // the socket has been closed and no more data can be read.
814   if (readCallback_) {
815     // Disable reads if they are enabled
816     if (!updateEventRegistration(0, EventHandler::READ)) {
817       // We're now in the error state; callbacks have been cleaned up
818       assert(state_ == StateEnum::ERROR);
819       assert(readCallback_ == nullptr);
820     } else {
821       ReadCallback* callback = readCallback_;
822       readCallback_ = nullptr;
823       callback->readEOF();
824     }
825   }
826 }
827
828 void AsyncSocket::closeNow() {
829   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
830           << ", state=" << state_ << ", shutdownFlags="
831           << std::hex << (int) shutdownFlags_;
832   DestructorGuard dg(this);
833   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
834
835   switch (state_) {
836     case StateEnum::ESTABLISHED:
837     case StateEnum::CONNECTING:
838     {
839       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
840       state_ = StateEnum::CLOSED;
841
842       // If the write timeout was set, cancel it.
843       writeTimeout_.cancelTimeout();
844
845       // If we are registered for I/O events, unregister.
846       if (eventFlags_ != EventHandler::NONE) {
847         eventFlags_ = EventHandler::NONE;
848         if (!updateEventRegistration()) {
849           // We will have been moved into the error state.
850           assert(state_ == StateEnum::ERROR);
851           return;
852         }
853       }
854
855       if (fd_ >= 0) {
856         ioHandler_.changeHandlerFD(-1);
857         doClose();
858       }
859
860       if (connectCallback_) {
861         ConnectCallback* callback = connectCallback_;
862         connectCallback_ = nullptr;
863         callback->connectErr(socketClosedLocallyEx);
864       }
865
866       failAllWrites(socketClosedLocallyEx);
867
868       if (readCallback_) {
869         ReadCallback* callback = readCallback_;
870         readCallback_ = nullptr;
871         callback->readEOF();
872       }
873       return;
874     }
875     case StateEnum::CLOSED:
876       // Do nothing.  It's possible that we are being called recursively
877       // from inside a callback that we invoked inside another call to close()
878       // that is still running.
879       return;
880     case StateEnum::ERROR:
881       // Do nothing.  The error handling code has performed (or is performing)
882       // cleanup.
883       return;
884     case StateEnum::UNINIT:
885       assert(eventFlags_ == EventHandler::NONE);
886       assert(connectCallback_ == nullptr);
887       assert(readCallback_ == nullptr);
888       assert(writeReqHead_ == nullptr);
889       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
890       state_ = StateEnum::CLOSED;
891       return;
892   }
893
894   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
895               << ") called in unknown state " << state_;
896 }
897
898 void AsyncSocket::closeWithReset() {
899   // Enable SO_LINGER, with the linger timeout set to 0.
900   // This will trigger a TCP reset when we close the socket.
901   if (fd_ >= 0) {
902     struct linger optLinger = {1, 0};
903     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
904       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
905               << "on " << fd_ << ": errno=" << errno;
906     }
907   }
908
909   // Then let closeNow() take care of the rest
910   closeNow();
911 }
912
913 void AsyncSocket::shutdownWrite() {
914   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
915           << ", state=" << state_ << ", shutdownFlags="
916           << std::hex << (int) shutdownFlags_;
917
918   // If there are no pending writes, shutdownWrite() is identical to
919   // shutdownWriteNow().
920   if (writeReqHead_ == nullptr) {
921     shutdownWriteNow();
922     return;
923   }
924
925   assert(eventBase_->isInEventBaseThread());
926
927   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
928   // shutdown will be performed once all writes complete.
929   shutdownFlags_ |= SHUT_WRITE_PENDING;
930 }
931
932 void AsyncSocket::shutdownWriteNow() {
933   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
934           << ", fd=" << fd_ << ", state=" << state_
935           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
936
937   if (shutdownFlags_ & SHUT_WRITE) {
938     // Writes are already shutdown; nothing else to do.
939     return;
940   }
941
942   // If SHUT_READ is already set, just call closeNow() to completely
943   // close the socket.  This can happen if close() was called with writes
944   // pending, and then shutdownWriteNow() is called before all pending writes
945   // complete.
946   if (shutdownFlags_ & SHUT_READ) {
947     closeNow();
948     return;
949   }
950
951   DestructorGuard dg(this);
952   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
953
954   switch (static_cast<StateEnum>(state_)) {
955     case StateEnum::ESTABLISHED:
956     {
957       shutdownFlags_ |= SHUT_WRITE;
958
959       // If the write timeout was set, cancel it.
960       writeTimeout_.cancelTimeout();
961
962       // If we are registered for write events, unregister.
963       if (!updateEventRegistration(0, EventHandler::WRITE)) {
964         // We will have been moved into the error state.
965         assert(state_ == StateEnum::ERROR);
966         return;
967       }
968
969       // Shutdown writes on the file descriptor
970       ::shutdown(fd_, SHUT_WR);
971
972       // Immediately fail all write requests
973       failAllWrites(socketShutdownForWritesEx);
974       return;
975     }
976     case StateEnum::CONNECTING:
977     {
978       // Set the SHUT_WRITE_PENDING flag.
979       // When the connection completes, it will check this flag,
980       // shutdown the write half of the socket, and then set SHUT_WRITE.
981       shutdownFlags_ |= SHUT_WRITE_PENDING;
982
983       // Immediately fail all write requests
984       failAllWrites(socketShutdownForWritesEx);
985       return;
986     }
987     case StateEnum::UNINIT:
988       // Callers normally shouldn't call shutdownWriteNow() before the socket
989       // even starts connecting.  Nonetheless, go ahead and set
990       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
991       // immediately shut down the write side of the socket.
992       shutdownFlags_ |= SHUT_WRITE_PENDING;
993       return;
994     case StateEnum::CLOSED:
995     case StateEnum::ERROR:
996       // We should never get here.  SHUT_WRITE should always be set
997       // in STATE_CLOSED and STATE_ERROR.
998       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
999                  << ", fd=" << fd_ << ") in unexpected state " << state_
1000                  << " with SHUT_WRITE not set ("
1001                  << std::hex << (int) shutdownFlags_ << ")";
1002       assert(false);
1003       return;
1004   }
1005
1006   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1007               << fd_ << ") called in unknown state " << state_;
1008 }
1009
1010 bool AsyncSocket::readable() const {
1011   if (fd_ == -1) {
1012     return false;
1013   }
1014   struct pollfd fds[1];
1015   fds[0].fd = fd_;
1016   fds[0].events = POLLIN;
1017   fds[0].revents = 0;
1018   int rc = poll(fds, 1, 0);
1019   return rc == 1;
1020 }
1021
1022 bool AsyncSocket::isPending() const {
1023   return ioHandler_.isPending();
1024 }
1025
1026 bool AsyncSocket::hangup() const {
1027   if (fd_ == -1) {
1028     // sanity check, no one should ask for hangup if we are not connected.
1029     assert(false);
1030     return false;
1031   }
1032 #ifdef POLLRDHUP // Linux-only
1033   struct pollfd fds[1];
1034   fds[0].fd = fd_;
1035   fds[0].events = POLLRDHUP|POLLHUP;
1036   fds[0].revents = 0;
1037   poll(fds, 1, 0);
1038   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1039 #else
1040   return false;
1041 #endif
1042 }
1043
1044 bool AsyncSocket::good() const {
1045   return ((state_ == StateEnum::CONNECTING ||
1046           state_ == StateEnum::ESTABLISHED) &&
1047           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1048 }
1049
1050 bool AsyncSocket::error() const {
1051   return (state_ == StateEnum::ERROR);
1052 }
1053
1054 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1055   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1056           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1057           << ", state=" << state_ << ", events="
1058           << std::hex << eventFlags_ << ")";
1059   assert(eventBase_ == nullptr);
1060   assert(eventBase->isInEventBaseThread());
1061
1062   eventBase_ = eventBase;
1063   ioHandler_.attachEventBase(eventBase);
1064   writeTimeout_.attachEventBase(eventBase);
1065 }
1066
1067 void AsyncSocket::detachEventBase() {
1068   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1069           << ", old evb=" << eventBase_ << ", state=" << state_
1070           << ", events=" << std::hex << eventFlags_ << ")";
1071   assert(eventBase_ != nullptr);
1072   assert(eventBase_->isInEventBaseThread());
1073
1074   eventBase_ = nullptr;
1075   ioHandler_.detachEventBase();
1076   writeTimeout_.detachEventBase();
1077 }
1078
1079 bool AsyncSocket::isDetachable() const {
1080   DCHECK(eventBase_ != nullptr);
1081   DCHECK(eventBase_->isInEventBaseThread());
1082
1083   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1084 }
1085
1086 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1087   address->setFromLocalAddress(fd_);
1088 }
1089
1090 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1091   if (!addr_.isInitialized()) {
1092     addr_.setFromPeerAddress(fd_);
1093   }
1094   *address = addr_;
1095 }
1096
1097 int AsyncSocket::setNoDelay(bool noDelay) {
1098   if (fd_ < 0) {
1099     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1100                << this << "(state=" << state_ << ")";
1101     return EINVAL;
1102
1103   }
1104
1105   int value = noDelay ? 1 : 0;
1106   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1107     int errnoCopy = errno;
1108     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1109             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1110             << strerror(errnoCopy);
1111     return errnoCopy;
1112   }
1113
1114   return 0;
1115 }
1116
1117 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1118
1119   #ifndef TCP_CONGESTION
1120   #define TCP_CONGESTION  13
1121   #endif
1122
1123   if (fd_ < 0) {
1124     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1125                << "socket " << this << "(state=" << state_ << ")";
1126     return EINVAL;
1127
1128   }
1129
1130   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1131         cname.length() + 1) != 0) {
1132     int errnoCopy = errno;
1133     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1134             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1135             << strerror(errnoCopy);
1136     return errnoCopy;
1137   }
1138
1139   return 0;
1140 }
1141
1142 int AsyncSocket::setQuickAck(bool quickack) {
1143   if (fd_ < 0) {
1144     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1145                << this << "(state=" << state_ << ")";
1146     return EINVAL;
1147
1148   }
1149
1150 #ifdef TCP_QUICKACK // Linux-only
1151   int value = quickack ? 1 : 0;
1152   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1153     int errnoCopy = errno;
1154     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1155             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1156             << strerror(errnoCopy);
1157     return errnoCopy;
1158   }
1159
1160   return 0;
1161 #else
1162   return ENOSYS;
1163 #endif
1164 }
1165
1166 int AsyncSocket::setSendBufSize(size_t bufsize) {
1167   if (fd_ < 0) {
1168     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1169                << this << "(state=" << state_ << ")";
1170     return EINVAL;
1171   }
1172
1173   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1174     int errnoCopy = errno;
1175     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1176             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1177             << strerror(errnoCopy);
1178     return errnoCopy;
1179   }
1180
1181   return 0;
1182 }
1183
1184 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1185   if (fd_ < 0) {
1186     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1187                << this << "(state=" << state_ << ")";
1188     return EINVAL;
1189   }
1190
1191   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1192     int errnoCopy = errno;
1193     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1194             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1195             << strerror(errnoCopy);
1196     return errnoCopy;
1197   }
1198
1199   return 0;
1200 }
1201
1202 int AsyncSocket::setTCPProfile(int profd) {
1203   if (fd_ < 0) {
1204     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1205                << this << "(state=" << state_ << ")";
1206     return EINVAL;
1207   }
1208
1209   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1210     int errnoCopy = errno;
1211     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1212             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1213             << strerror(errnoCopy);
1214     return errnoCopy;
1215   }
1216
1217   return 0;
1218 }
1219
1220 void AsyncSocket::ioReady(uint16_t events) noexcept {
1221   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1222           << ", events=" << std::hex << events << ", state=" << state_;
1223   DestructorGuard dg(this);
1224   assert(events & EventHandler::READ_WRITE);
1225   assert(eventBase_->isInEventBaseThread());
1226
1227   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1228   if (relevantEvents == EventHandler::READ) {
1229     handleRead();
1230   } else if (relevantEvents == EventHandler::WRITE) {
1231     handleWrite();
1232   } else if (relevantEvents == EventHandler::READ_WRITE) {
1233     EventBase* originalEventBase = eventBase_;
1234     // If both read and write events are ready, process writes first.
1235     handleWrite();
1236
1237     // Return now if handleWrite() detached us from our EventBase
1238     if (eventBase_ != originalEventBase) {
1239       return;
1240     }
1241
1242     // Only call handleRead() if a read callback is still installed.
1243     // (It's possible that the read callback was uninstalled during
1244     // handleWrite().)
1245     if (readCallback_) {
1246       handleRead();
1247     }
1248   } else {
1249     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1250                << std::hex << events << "(this=" << this << ")";
1251     abort();
1252   }
1253 }
1254
1255 ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
1256   ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
1257   if (bytes < 0) {
1258     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1259       // No more data to read right now.
1260       return READ_BLOCKING;
1261     } else {
1262       return READ_ERROR;
1263     }
1264   } else {
1265     appBytesReceived_ += bytes;
1266     return bytes;
1267   }
1268 }
1269
1270 void AsyncSocket::handleRead() noexcept {
1271   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1272           << ", state=" << state_;
1273   assert(state_ == StateEnum::ESTABLISHED);
1274   assert((shutdownFlags_ & SHUT_READ) == 0);
1275   assert(readCallback_ != nullptr);
1276   assert(eventFlags_ & EventHandler::READ);
1277
1278   // Loop until:
1279   // - a read attempt would block
1280   // - readCallback_ is uninstalled
1281   // - the number of loop iterations exceeds the optional maximum
1282   // - this AsyncSocket is moved to another EventBase
1283   //
1284   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1285   // which is why need to check for it here.
1286   //
1287   // The last bullet point is slightly subtle.  readDataAvailable() may also
1288   // detach this socket from this EventBase.  However, before
1289   // readDataAvailable() returns another thread may pick it up, attach it to
1290   // a different EventBase, and install another readCallback_.  We need to
1291   // exit immediately after readDataAvailable() returns if the eventBase_ has
1292   // changed.  (The caller must perform some sort of locking to transfer the
1293   // AsyncSocket between threads properly.  This will be sufficient to ensure
1294   // that this thread sees the updated eventBase_ variable after
1295   // readDataAvailable() returns.)
1296   uint16_t numReads = 0;
1297   EventBase* originalEventBase = eventBase_;
1298   while (readCallback_ && eventBase_ == originalEventBase) {
1299     // Get the buffer to read into.
1300     void* buf = nullptr;
1301     size_t buflen = 0;
1302     try {
1303       readCallback_->getReadBuffer(&buf, &buflen);
1304     } catch (const AsyncSocketException& ex) {
1305       return failRead(__func__, ex);
1306     } catch (const std::exception& ex) {
1307       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1308                               string("ReadCallback::getReadBuffer() "
1309                                      "threw exception: ") +
1310                               ex.what());
1311       return failRead(__func__, tex);
1312     } catch (...) {
1313       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1314                              "ReadCallback::getReadBuffer() threw "
1315                              "non-exception type");
1316       return failRead(__func__, ex);
1317     }
1318     if (buf == nullptr || buflen == 0) {
1319       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1320                              "ReadCallback::getReadBuffer() returned "
1321                              "empty buffer");
1322       return failRead(__func__, ex);
1323     }
1324
1325     // Perform the read
1326     ssize_t bytesRead = performRead(buf, buflen);
1327     if (bytesRead > 0) {
1328       readCallback_->readDataAvailable(bytesRead);
1329       // Fall through and continue around the loop if the read
1330       // completely filled the available buffer.
1331       // Note that readCallback_ may have been uninstalled or changed inside
1332       // readDataAvailable().
1333       if (size_t(bytesRead) < buflen) {
1334         return;
1335       }
1336     } else if (bytesRead == READ_BLOCKING) {
1337         // No more data to read right now.
1338         return;
1339     } else if (bytesRead == READ_ERROR) {
1340       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1341                              withAddr("recv() failed"), errno);
1342       return failRead(__func__, ex);
1343     } else {
1344       assert(bytesRead == READ_EOF);
1345       // EOF
1346       shutdownFlags_ |= SHUT_READ;
1347       if (!updateEventRegistration(0, EventHandler::READ)) {
1348         // we've already been moved into STATE_ERROR
1349         assert(state_ == StateEnum::ERROR);
1350         assert(readCallback_ == nullptr);
1351         return;
1352       }
1353
1354       ReadCallback* callback = readCallback_;
1355       readCallback_ = nullptr;
1356       callback->readEOF();
1357       return;
1358     }
1359     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1360       return;
1361     }
1362   }
1363 }
1364
1365 /**
1366  * This function attempts to write as much data as possible, until no more data
1367  * can be written.
1368  *
1369  * - If it sends all available data, it unregisters for write events, and stops
1370  *   the writeTimeout_.
1371  *
1372  * - If not all of the data can be sent immediately, it reschedules
1373  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1374  *   registered for write events.
1375  */
1376 void AsyncSocket::handleWrite() noexcept {
1377   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1378           << ", state=" << state_;
1379   if (state_ == StateEnum::CONNECTING) {
1380     handleConnect();
1381     return;
1382   }
1383
1384   // Normal write
1385   assert(state_ == StateEnum::ESTABLISHED);
1386   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1387   assert(writeReqHead_ != nullptr);
1388
1389   // Loop until we run out of write requests,
1390   // or until this socket is moved to another EventBase.
1391   // (See the comment in handleRead() explaining how this can happen.)
1392   EventBase* originalEventBase = eventBase_;
1393   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1394     if (!writeReqHead_->performWrite()) {
1395       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1396                              withAddr("writev() failed"), errno);
1397       return failWrite(__func__, ex);
1398     } else if (writeReqHead_->isComplete()) {
1399       // We finished this request
1400       WriteRequest* req = writeReqHead_;
1401       writeReqHead_ = req->getNext();
1402
1403       if (writeReqHead_ == nullptr) {
1404         writeReqTail_ = nullptr;
1405         // This is the last write request.
1406         // Unregister for write events and cancel the send timer
1407         // before we invoke the callback.  We have to update the state properly
1408         // before calling the callback, since it may want to detach us from
1409         // the EventBase.
1410         if (eventFlags_ & EventHandler::WRITE) {
1411           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1412             assert(state_ == StateEnum::ERROR);
1413             return;
1414           }
1415           // Stop the send timeout
1416           writeTimeout_.cancelTimeout();
1417         }
1418         assert(!writeTimeout_.isScheduled());
1419
1420         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1421         // we finish sending the last write request.
1422         //
1423         // We have to do this before invoking writeSuccess(), since
1424         // writeSuccess() may detach us from our EventBase.
1425         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1426           assert(connectCallback_ == nullptr);
1427           shutdownFlags_ |= SHUT_WRITE;
1428
1429           if (shutdownFlags_ & SHUT_READ) {
1430             // Reads have already been shutdown.  Fully close the socket and
1431             // move to STATE_CLOSED.
1432             //
1433             // Note: This code currently moves us to STATE_CLOSED even if
1434             // close() hasn't ever been called.  This can occur if we have
1435             // received EOF from the peer and shutdownWrite() has been called
1436             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1437             // case, until close() is actually called?  I can't think of a
1438             // reason why we would need to do so.  No other operations besides
1439             // calling close() or destroying the socket can be performed at
1440             // this point.
1441             assert(readCallback_ == nullptr);
1442             state_ = StateEnum::CLOSED;
1443             if (fd_ >= 0) {
1444               ioHandler_.changeHandlerFD(-1);
1445               doClose();
1446             }
1447           } else {
1448             // Reads are still enabled, so we are only doing a half-shutdown
1449             ::shutdown(fd_, SHUT_WR);
1450           }
1451         }
1452       }
1453
1454       // Invoke the callback
1455       WriteCallback* callback = req->getCallback();
1456       req->destroy();
1457       if (callback) {
1458         callback->writeSuccess();
1459       }
1460       // We'll continue around the loop, trying to write another request
1461     } else {
1462       // Partial write.
1463       writeReqHead_->consume();
1464       // Stop after a partial write; it's highly likely that a subsequent write
1465       // attempt will just return EAGAIN.
1466       //
1467       // Ensure that we are registered for write events.
1468       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1469         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1470           assert(state_ == StateEnum::ERROR);
1471           return;
1472         }
1473       }
1474
1475       // Reschedule the send timeout, since we have made some write progress.
1476       if (sendTimeout_ > 0) {
1477         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1478           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1479               withAddr("failed to reschedule write timeout"));
1480           return failWrite(__func__, ex);
1481         }
1482       }
1483       return;
1484     }
1485   }
1486 }
1487
1488 void AsyncSocket::checkForImmediateRead() noexcept {
1489   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1490   // (However, note that some subclasses do override this method.)
1491   //
1492   // Simply calling handleRead() here would be bad, as this would call
1493   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1494   // buffer even though no data may be available.  This would waste lots of
1495   // memory, since the buffer will sit around unused until the socket actually
1496   // becomes readable.
1497   //
1498   // Checking if the socket is readable now also seems like it would probably
1499   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1500   // would just waste an extra system call.  Even if it is readable, waiting to
1501   // find out from libevent on the next event loop doesn't seem that bad.
1502 }
1503
1504 void AsyncSocket::handleInitialReadWrite() noexcept {
1505   // Our callers should already be holding a DestructorGuard, but grab
1506   // one here just to make sure, in case one of our calling code paths ever
1507   // changes.
1508   DestructorGuard dg(this);
1509
1510   // If we have a readCallback_, make sure we enable read events.  We
1511   // may already be registered for reads if connectSuccess() set
1512   // the read calback.
1513   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1514     assert(state_ == StateEnum::ESTABLISHED);
1515     assert((shutdownFlags_ & SHUT_READ) == 0);
1516     if (!updateEventRegistration(EventHandler::READ, 0)) {
1517       assert(state_ == StateEnum::ERROR);
1518       return;
1519     }
1520     checkForImmediateRead();
1521   } else if (readCallback_ == nullptr) {
1522     // Unregister for read events.
1523     updateEventRegistration(0, EventHandler::READ);
1524   }
1525
1526   // If we have write requests pending, try to send them immediately.
1527   // Since we just finished accepting, there is a very good chance that we can
1528   // write without blocking.
1529   //
1530   // However, we only process them if EventHandler::WRITE is not already set,
1531   // which means that we're already blocked on a write attempt.  (This can
1532   // happen if connectSuccess() called write() before returning.)
1533   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1534     // Call handleWrite() to perform write processing.
1535     handleWrite();
1536   } else if (writeReqHead_ == nullptr) {
1537     // Unregister for write event.
1538     updateEventRegistration(0, EventHandler::WRITE);
1539   }
1540 }
1541
1542 void AsyncSocket::handleConnect() noexcept {
1543   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1544           << ", state=" << state_;
1545   assert(state_ == StateEnum::CONNECTING);
1546   // SHUT_WRITE can never be set while we are still connecting;
1547   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1548   // finishes
1549   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1550
1551   // In case we had a connect timeout, cancel the timeout
1552   writeTimeout_.cancelTimeout();
1553   // We don't use a persistent registration when waiting on a connect event,
1554   // so we have been automatically unregistered now.  Update eventFlags_ to
1555   // reflect reality.
1556   assert(eventFlags_ == EventHandler::WRITE);
1557   eventFlags_ = EventHandler::NONE;
1558
1559   // Call getsockopt() to check if the connect succeeded
1560   int error;
1561   socklen_t len = sizeof(error);
1562   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1563   if (rv != 0) {
1564     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1565                            withAddr("error calling getsockopt() after connect"),
1566                            errno);
1567     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1568                << fd_ << " host=" << addr_.describe()
1569                << ") exception:" << ex.what();
1570     return failConnect(__func__, ex);
1571   }
1572
1573   if (error != 0) {
1574     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1575                            "connect failed", error);
1576     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1577             << fd_ << " host=" << addr_.describe()
1578             << ") exception: " << ex.what();
1579     return failConnect(__func__, ex);
1580   }
1581
1582   // Move into STATE_ESTABLISHED
1583   state_ = StateEnum::ESTABLISHED;
1584
1585   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1586   // perform, immediately shutdown the write half of the socket.
1587   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1588     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1589     // are still connecting we just abort the connect rather than waiting for
1590     // it to complete.
1591     assert((shutdownFlags_ & SHUT_READ) == 0);
1592     ::shutdown(fd_, SHUT_WR);
1593     shutdownFlags_ |= SHUT_WRITE;
1594   }
1595
1596   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1597           << "successfully connected; state=" << state_;
1598
1599   // Remember the EventBase we are attached to, before we start invoking any
1600   // callbacks (since the callbacks may call detachEventBase()).
1601   EventBase* originalEventBase = eventBase_;
1602
1603   // Call the connect callback.
1604   if (connectCallback_) {
1605     ConnectCallback* callback = connectCallback_;
1606     connectCallback_ = nullptr;
1607     callback->connectSuccess();
1608   }
1609
1610   // Note that the connect callback may have changed our state.
1611   // (set or unset the read callback, called write(), closed the socket, etc.)
1612   // The following code needs to handle these situations correctly.
1613   //
1614   // If the socket has been closed, readCallback_ and writeReqHead_ will
1615   // always be nullptr, so that will prevent us from trying to read or write.
1616   //
1617   // The main thing to check for is if eventBase_ is still originalEventBase.
1618   // If not, we have been detached from this event base, so we shouldn't
1619   // perform any more operations.
1620   if (eventBase_ != originalEventBase) {
1621     return;
1622   }
1623
1624   handleInitialReadWrite();
1625 }
1626
1627 void AsyncSocket::timeoutExpired() noexcept {
1628   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1629           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1630   DestructorGuard dg(this);
1631   assert(eventBase_->isInEventBaseThread());
1632
1633   if (state_ == StateEnum::CONNECTING) {
1634     // connect() timed out
1635     // Unregister for I/O events.
1636     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1637                            "connect timed out");
1638     failConnect(__func__, ex);
1639   } else {
1640     // a normal write operation timed out
1641     assert(state_ == StateEnum::ESTABLISHED);
1642     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1643     failWrite(__func__, ex);
1644   }
1645 }
1646
1647 ssize_t AsyncSocket::performWrite(const iovec* vec,
1648                                    uint32_t count,
1649                                    WriteFlags flags,
1650                                    uint32_t* countWritten,
1651                                    uint32_t* partialWritten) {
1652   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1653   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1654   // (since it may terminate the program if the main program doesn't explicitly
1655   // ignore it).
1656   struct msghdr msg;
1657   msg.msg_name = nullptr;
1658   msg.msg_namelen = 0;
1659   msg.msg_iov = const_cast<iovec *>(vec);
1660 #ifdef IOV_MAX // not defined on Android
1661   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1662 #else
1663   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1664 #endif
1665   msg.msg_control = nullptr;
1666   msg.msg_controllen = 0;
1667   msg.msg_flags = 0;
1668
1669   int msg_flags = MSG_DONTWAIT;
1670
1671 #ifdef MSG_NOSIGNAL // Linux-only
1672   msg_flags |= MSG_NOSIGNAL;
1673   if (isSet(flags, WriteFlags::CORK)) {
1674     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1675     // give it the rest of the data rather than immediately sending a partial
1676     // frame, even when TCP_NODELAY is enabled.
1677     msg_flags |= MSG_MORE;
1678   }
1679 #endif
1680   if (isSet(flags, WriteFlags::EOR)) {
1681     // marks that this is the last byte of a record (response)
1682     msg_flags |= MSG_EOR;
1683   }
1684   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1685   if (totalWritten < 0) {
1686     if (errno == EAGAIN) {
1687       // TCP buffer is full; we can't write any more data right now.
1688       *countWritten = 0;
1689       *partialWritten = 0;
1690       return 0;
1691     }
1692     // error
1693     *countWritten = 0;
1694     *partialWritten = 0;
1695     return -1;
1696   }
1697
1698   appBytesWritten_ += totalWritten;
1699
1700   uint32_t bytesWritten;
1701   uint32_t n;
1702   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1703     const iovec* v = vec + n;
1704     if (v->iov_len > bytesWritten) {
1705       // Partial write finished in the middle of this iovec
1706       *countWritten = n;
1707       *partialWritten = bytesWritten;
1708       return totalWritten;
1709     }
1710
1711     bytesWritten -= v->iov_len;
1712   }
1713
1714   assert(bytesWritten == 0);
1715   *countWritten = n;
1716   *partialWritten = 0;
1717   return totalWritten;
1718 }
1719
1720 /**
1721  * Re-register the EventHandler after eventFlags_ has changed.
1722  *
1723  * If an error occurs, fail() is called to move the socket into the error state
1724  * and call all currently installed callbacks.  After an error, the
1725  * AsyncSocket is completely unregistered.
1726  *
1727  * @return Returns true on succcess, or false on error.
1728  */
1729 bool AsyncSocket::updateEventRegistration() {
1730   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1731           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1732           << ", events=" << std::hex << eventFlags_;
1733   assert(eventBase_->isInEventBaseThread());
1734   if (eventFlags_ == EventHandler::NONE) {
1735     ioHandler_.unregisterHandler();
1736     return true;
1737   }
1738
1739   // Always register for persistent events, so we don't have to re-register
1740   // after being called back.
1741   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1742     eventFlags_ = EventHandler::NONE; // we're not registered after error
1743     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1744         withAddr("failed to update AsyncSocket event registration"));
1745     fail("updateEventRegistration", ex);
1746     return false;
1747   }
1748
1749   return true;
1750 }
1751
1752 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1753                                            uint16_t disable) {
1754   uint16_t oldFlags = eventFlags_;
1755   eventFlags_ |= enable;
1756   eventFlags_ &= ~disable;
1757   if (eventFlags_ == oldFlags) {
1758     return true;
1759   } else {
1760     return updateEventRegistration();
1761   }
1762 }
1763
1764 void AsyncSocket::startFail() {
1765   // startFail() should only be called once
1766   assert(state_ != StateEnum::ERROR);
1767   assert(getDestructorGuardCount() > 0);
1768   state_ = StateEnum::ERROR;
1769   // Ensure that SHUT_READ and SHUT_WRITE are set,
1770   // so all future attempts to read or write will be rejected
1771   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1772
1773   if (eventFlags_ != EventHandler::NONE) {
1774     eventFlags_ = EventHandler::NONE;
1775     ioHandler_.unregisterHandler();
1776   }
1777   writeTimeout_.cancelTimeout();
1778
1779   if (fd_ >= 0) {
1780     ioHandler_.changeHandlerFD(-1);
1781     doClose();
1782   }
1783 }
1784
1785 void AsyncSocket::finishFail() {
1786   assert(state_ == StateEnum::ERROR);
1787   assert(getDestructorGuardCount() > 0);
1788
1789   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1790                          withAddr("socket closing after error"));
1791   if (connectCallback_) {
1792     ConnectCallback* callback = connectCallback_;
1793     connectCallback_ = nullptr;
1794     callback->connectErr(ex);
1795   }
1796
1797   failAllWrites(ex);
1798
1799   if (readCallback_) {
1800     ReadCallback* callback = readCallback_;
1801     readCallback_ = nullptr;
1802     callback->readErr(ex);
1803   }
1804 }
1805
1806 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1807   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1808              << state_ << " host=" << addr_.describe()
1809              << "): failed in " << fn << "(): "
1810              << ex.what();
1811   startFail();
1812   finishFail();
1813 }
1814
1815 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1816   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1817                << state_ << " host=" << addr_.describe()
1818                << "): failed while connecting in " << fn << "(): "
1819                << ex.what();
1820   startFail();
1821
1822   if (connectCallback_ != nullptr) {
1823     ConnectCallback* callback = connectCallback_;
1824     connectCallback_ = nullptr;
1825     callback->connectErr(ex);
1826   }
1827
1828   finishFail();
1829 }
1830
1831 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1832   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1833                << state_ << " host=" << addr_.describe()
1834                << "): failed while reading in " << fn << "(): "
1835                << ex.what();
1836   startFail();
1837
1838   if (readCallback_ != nullptr) {
1839     ReadCallback* callback = readCallback_;
1840     readCallback_ = nullptr;
1841     callback->readErr(ex);
1842   }
1843
1844   finishFail();
1845 }
1846
1847 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1848   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1849                << state_ << " host=" << addr_.describe()
1850                << "): failed while writing in " << fn << "(): "
1851                << ex.what();
1852   startFail();
1853
1854   // Only invoke the first write callback, since the error occurred while
1855   // writing this request.  Let any other pending write callbacks be invoked in
1856   // finishFail().
1857   if (writeReqHead_ != nullptr) {
1858     WriteRequest* req = writeReqHead_;
1859     writeReqHead_ = req->getNext();
1860     WriteCallback* callback = req->getCallback();
1861     uint32_t bytesWritten = req->getTotalBytesWritten();
1862     req->destroy();
1863     if (callback) {
1864       callback->writeErr(bytesWritten, ex);
1865     }
1866   }
1867
1868   finishFail();
1869 }
1870
1871 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1872                              size_t bytesWritten,
1873                              const AsyncSocketException& ex) {
1874   // This version of failWrite() is used when the failure occurs before
1875   // we've added the callback to writeReqHead_.
1876   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1877              << state_ << " host=" << addr_.describe()
1878              <<"): failed while writing in " << fn << "(): "
1879              << ex.what();
1880   startFail();
1881
1882   if (callback != nullptr) {
1883     callback->writeErr(bytesWritten, ex);
1884   }
1885
1886   finishFail();
1887 }
1888
1889 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1890   // Invoke writeError() on all write callbacks.
1891   // This is used when writes are forcibly shutdown with write requests
1892   // pending, or when an error occurs with writes pending.
1893   while (writeReqHead_ != nullptr) {
1894     WriteRequest* req = writeReqHead_;
1895     writeReqHead_ = req->getNext();
1896     WriteCallback* callback = req->getCallback();
1897     if (callback) {
1898       callback->writeErr(req->getTotalBytesWritten(), ex);
1899     }
1900     req->destroy();
1901   }
1902 }
1903
1904 void AsyncSocket::invalidState(ConnectCallback* callback) {
1905   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1906              << "): connect() called in invalid state " << state_;
1907
1908   /*
1909    * The invalidState() methods don't use the normal failure mechanisms,
1910    * since we don't know what state we are in.  We don't want to call
1911    * startFail()/finishFail() recursively if we are already in the middle of
1912    * cleaning up.
1913    */
1914
1915   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1916                          "connect() called with socket in invalid state");
1917   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1918     if (callback) {
1919       callback->connectErr(ex);
1920     }
1921   } else {
1922     // We can't use failConnect() here since connectCallback_
1923     // may already be set to another callback.  Invoke this ConnectCallback
1924     // here; any other connectCallback_ will be invoked in finishFail()
1925     startFail();
1926     if (callback) {
1927       callback->connectErr(ex);
1928     }
1929     finishFail();
1930   }
1931 }
1932
1933 void AsyncSocket::invalidState(ReadCallback* callback) {
1934   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1935              << "): setReadCallback(" << callback
1936              << ") called in invalid state " << state_;
1937
1938   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1939                          "setReadCallback() called with socket in "
1940                          "invalid state");
1941   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1942     if (callback) {
1943       callback->readErr(ex);
1944     }
1945   } else {
1946     startFail();
1947     if (callback) {
1948       callback->readErr(ex);
1949     }
1950     finishFail();
1951   }
1952 }
1953
1954 void AsyncSocket::invalidState(WriteCallback* callback) {
1955   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1956              << "): write() called in invalid state " << state_;
1957
1958   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1959                          withAddr("write() called with socket in invalid state"));
1960   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1961     if (callback) {
1962       callback->writeErr(0, ex);
1963     }
1964   } else {
1965     startFail();
1966     if (callback) {
1967       callback->writeErr(0, ex);
1968     }
1969     finishFail();
1970   }
1971 }
1972
1973 void AsyncSocket::doClose() {
1974   if (fd_ == -1) return;
1975   if (shutdownSocketSet_) {
1976     shutdownSocketSet_->close(fd_);
1977   } else {
1978     ::close(fd_);
1979   }
1980   fd_ = -1;
1981 }
1982
1983 std::ostream& operator << (std::ostream& os,
1984                            const AsyncSocket::StateEnum& state) {
1985   os << static_cast<int>(state);
1986   return os;
1987 }
1988
1989 std::string AsyncSocket::withAddr(const std::string& s) {
1990   // Don't use addr_ directly because it may not be initialized
1991   // e.g. if constructed from fd
1992   folly::SocketAddress peer, local;
1993   try {
1994     getPeerAddress(&peer);
1995     getLocalAddress(&local);
1996   } catch (const std::exception&) {
1997     // ignore
1998   } catch (...) {
1999     // ignore
2000   }
2001   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2002 }
2003
2004 } // folly