3ff2b345eba1855ad30057100da9ded2f6be8e81
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34
35 using std::string;
36 using std::unique_ptr;
37
38 namespace folly {
39
40 // static members initializers
41 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
42
43 const AsyncSocketException socketClosedLocallyEx(
44     AsyncSocketException::END_OF_FILE, "socket closed locally");
45 const AsyncSocketException socketShutdownForWritesEx(
46     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
47
48 // TODO: It might help performance to provide a version of BytesWriteRequest that
49 // users could derive from, so we can avoid the extra allocation for each call
50 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
51 // protocols are currently templatized for transports.
52 //
53 // We would need the version for external users where they provide the iovec
54 // storage space, and only our internal version would allocate it at the end of
55 // the WriteRequest.
56
57 /* The default WriteRequest implementation, used for write(), writev() and
58  * writeChain()
59  *
60  * A new BytesWriteRequest operation is allocated on the heap for all write
61  * operations that cannot be completed immediately.
62  */
63 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
64  public:
65   static BytesWriteRequest* newRequest(AsyncSocket* socket,
66                                        WriteCallback* callback,
67                                        const iovec* ops,
68                                        uint32_t opCount,
69                                        uint32_t partialWritten,
70                                        uint32_t bytesWritten,
71                                        unique_ptr<IOBuf>&& ioBuf,
72                                        WriteFlags flags) {
73     assert(opCount > 0);
74     // Since we put a variable size iovec array at the end
75     // of each BytesWriteRequest, we have to manually allocate the memory.
76     void* buf = malloc(sizeof(BytesWriteRequest) +
77                        (opCount * sizeof(struct iovec)));
78     if (buf == nullptr) {
79       throw std::bad_alloc();
80     }
81
82     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
83                                       partialWritten, bytesWritten,
84                                       std::move(ioBuf), flags);
85   }
86
87   void destroy() override {
88     this->~BytesWriteRequest();
89     free(this);
90   }
91
92   bool performWrite() override {
93     WriteFlags writeFlags = flags_;
94     if (getNext() != nullptr) {
95       writeFlags = writeFlags | WriteFlags::CORK;
96     }
97     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
98                                           &opsWritten_, &partialBytes_);
99     return bytesWritten_ >= 0;
100   }
101
102   bool isComplete() override {
103     return opsWritten_ == getOpCount();
104   }
105
106   void consume() override {
107     // Advance opIndex_ forward by opsWritten_
108     opIndex_ += opsWritten_;
109     assert(opIndex_ < opCount_);
110
111     // If we've finished writing any IOBufs, release them
112     if (ioBuf_) {
113       for (uint32_t i = opsWritten_; i != 0; --i) {
114         assert(ioBuf_);
115         ioBuf_ = ioBuf_->pop();
116       }
117     }
118
119     // Move partialBytes_ forward into the current iovec buffer
120     struct iovec* currentOp = writeOps_ + opIndex_;
121     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
122     currentOp->iov_base =
123       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
124     currentOp->iov_len -= partialBytes_;
125
126     // Increment the totalBytesWritten_ count by bytesWritten_;
127     totalBytesWritten_ += bytesWritten_;
128   }
129
130  private:
131   BytesWriteRequest(AsyncSocket* socket,
132                     WriteCallback* callback,
133                     const struct iovec* ops,
134                     uint32_t opCount,
135                     uint32_t partialBytes,
136                     uint32_t bytesWritten,
137                     unique_ptr<IOBuf>&& ioBuf,
138                     WriteFlags flags)
139     : AsyncSocket::WriteRequest(socket, callback)
140     , opCount_(opCount)
141     , opIndex_(0)
142     , flags_(flags)
143     , ioBuf_(std::move(ioBuf))
144     , opsWritten_(0)
145     , partialBytes_(partialBytes)
146     , bytesWritten_(bytesWritten) {
147     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
148   }
149
150   // private destructor, to ensure callers use destroy()
151   virtual ~BytesWriteRequest() = default;
152
153   const struct iovec* getOps() const {
154     assert(opCount_ > opIndex_);
155     return writeOps_ + opIndex_;
156   }
157
158   uint32_t getOpCount() const {
159     assert(opCount_ > opIndex_);
160     return opCount_ - opIndex_;
161   }
162
163   uint32_t opCount_;            ///< number of entries in writeOps_
164   uint32_t opIndex_;            ///< current index into writeOps_
165   WriteFlags flags_;            ///< set for WriteFlags
166   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
167
168   // for consume(), how much we wrote on the last write
169   uint32_t opsWritten_;         ///< complete ops written
170   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
171   ssize_t bytesWritten_;        ///< bytes written altogether
172
173   struct iovec writeOps_[];     ///< write operation(s) list
174 };
175
176 AsyncSocket::AsyncSocket()
177   : eventBase_(nullptr)
178   , writeTimeout_(this, nullptr)
179   , ioHandler_(this, nullptr)
180   , immediateReadHandler_(this) {
181   VLOG(5) << "new AsyncSocket()";
182   init();
183 }
184
185 AsyncSocket::AsyncSocket(EventBase* evb)
186   : eventBase_(evb)
187   , writeTimeout_(this, evb)
188   , ioHandler_(this, evb)
189   , immediateReadHandler_(this) {
190   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
191   init();
192 }
193
194 AsyncSocket::AsyncSocket(EventBase* evb,
195                            const folly::SocketAddress& address,
196                            uint32_t connectTimeout)
197   : AsyncSocket(evb) {
198   connect(nullptr, address, connectTimeout);
199 }
200
201 AsyncSocket::AsyncSocket(EventBase* evb,
202                            const std::string& ip,
203                            uint16_t port,
204                            uint32_t connectTimeout)
205   : AsyncSocket(evb) {
206   connect(nullptr, ip, port, connectTimeout);
207 }
208
209 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
210   : eventBase_(evb)
211   , writeTimeout_(this, evb)
212   , ioHandler_(this, evb, fd)
213   , immediateReadHandler_(this) {
214   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
215           << fd << ")";
216   init();
217   fd_ = fd;
218   setCloseOnExec();
219   state_ = StateEnum::ESTABLISHED;
220 }
221
222 // init() method, since constructor forwarding isn't supported in most
223 // compilers yet.
224 void AsyncSocket::init() {
225   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
226   shutdownFlags_ = 0;
227   state_ = StateEnum::UNINIT;
228   eventFlags_ = EventHandler::NONE;
229   fd_ = -1;
230   sendTimeout_ = 0;
231   maxReadsPerEvent_ = 16;
232   connectCallback_ = nullptr;
233   readCallback_ = nullptr;
234   writeReqHead_ = nullptr;
235   writeReqTail_ = nullptr;
236   shutdownSocketSet_ = nullptr;
237   appBytesWritten_ = 0;
238   appBytesReceived_ = 0;
239 }
240
241 AsyncSocket::~AsyncSocket() {
242   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
243           << ", evb=" << eventBase_ << ", fd=" << fd_
244           << ", state=" << state_ << ")";
245 }
246
247 void AsyncSocket::destroy() {
248   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
249           << ", fd=" << fd_ << ", state=" << state_;
250   // When destroy is called, close the socket immediately
251   closeNow();
252
253   // Then call DelayedDestruction::destroy() to take care of
254   // whether or not we need immediate or delayed destruction
255   DelayedDestruction::destroy();
256 }
257
258 int AsyncSocket::detachFd() {
259   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
260           << ", evb=" << eventBase_ << ", state=" << state_
261           << ", events=" << std::hex << eventFlags_ << ")";
262   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
263   // actually close the descriptor.
264   if (shutdownSocketSet_) {
265     shutdownSocketSet_->remove(fd_);
266   }
267   int fd = fd_;
268   fd_ = -1;
269   // Call closeNow() to invoke all pending callbacks with an error.
270   closeNow();
271   // Update the EventHandler to stop using this fd.
272   // This can only be done after closeNow() unregisters the handler.
273   ioHandler_.changeHandlerFD(-1);
274   return fd;
275 }
276
277 const folly::SocketAddress& AsyncSocket::anyAddress() {
278   static const folly::SocketAddress anyAddress =
279     folly::SocketAddress("0.0.0.0", 0);
280   return anyAddress;
281 }
282
283 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
284   if (shutdownSocketSet_ == newSS) {
285     return;
286   }
287   if (shutdownSocketSet_ && fd_ != -1) {
288     shutdownSocketSet_->remove(fd_);
289   }
290   shutdownSocketSet_ = newSS;
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->add(fd_);
293   }
294 }
295
296 void AsyncSocket::setCloseOnExec() {
297   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
298   if (rv != 0) {
299     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
300                                withAddr("failed to set close-on-exec flag"),
301                                errno);
302   }
303 }
304
305 void AsyncSocket::connect(ConnectCallback* callback,
306                            const folly::SocketAddress& address,
307                            int timeout,
308                            const OptionMap &options,
309                            const folly::SocketAddress& bindAddr) noexcept {
310   DestructorGuard dg(this);
311   assert(eventBase_->isInEventBaseThread());
312
313   addr_ = address;
314
315   // Make sure we're in the uninitialized state
316   if (state_ != StateEnum::UNINIT) {
317     return invalidState(callback);
318   }
319
320   assert(fd_ == -1);
321   state_ = StateEnum::CONNECTING;
322   connectCallback_ = callback;
323
324   sockaddr_storage addrStorage;
325   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
326
327   try {
328     // Create the socket
329     // Technically the first parameter should actually be a protocol family
330     // constant (PF_xxx) rather than an address family (AF_xxx), but the
331     // distinction is mainly just historical.  In pretty much all
332     // implementations the PF_foo and AF_foo constants are identical.
333     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
334     if (fd_ < 0) {
335       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
336                                 withAddr("failed to create socket"), errno);
337     }
338     if (shutdownSocketSet_) {
339       shutdownSocketSet_->add(fd_);
340     }
341     ioHandler_.changeHandlerFD(fd_);
342
343     setCloseOnExec();
344
345     // Put the socket in non-blocking mode
346     int flags = fcntl(fd_, F_GETFL, 0);
347     if (flags == -1) {
348       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
349                                 withAddr("failed to get socket flags"), errno);
350     }
351     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
352     if (rv == -1) {
353       throw AsyncSocketException(
354           AsyncSocketException::INTERNAL_ERROR,
355           withAddr("failed to put socket in non-blocking mode"),
356           errno);
357     }
358
359 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
360     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
361     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
362     if (rv == -1) {
363       throw AsyncSocketException(
364           AsyncSocketException::INTERNAL_ERROR,
365           "failed to enable F_SETNOSIGPIPE on socket",
366           errno);
367     }
368 #endif
369
370     // By default, turn on TCP_NODELAY
371     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
372     // setNoDelay() will log an error message if it fails.
373     if (address.getFamily() != AF_UNIX) {
374       (void)setNoDelay(true);
375     }
376
377     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
378             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
379
380     // bind the socket
381     if (bindAddr != anyAddress()) {
382       int one = 1;
383       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
384         doClose();
385         throw AsyncSocketException(
386           AsyncSocketException::NOT_OPEN,
387           "failed to setsockopt prior to bind on " + bindAddr.describe(),
388           errno);
389       }
390
391       bindAddr.getAddress(&addrStorage);
392
393       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
394         doClose();
395         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
396                                   "failed to bind to async socket: " +
397                                   bindAddr.describe(),
398                                   errno);
399       }
400     }
401
402     // Apply the additional options if any.
403     for (const auto& opt: options) {
404       int rv = opt.first.apply(fd_, opt.second);
405       if (rv != 0) {
406         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
407                                   withAddr("failed to set socket option"),
408                                   errno);
409       }
410     }
411
412     // Perform the connect()
413     address.getAddress(&addrStorage);
414
415     rv = ::connect(fd_, saddr, address.getActualSize());
416     if (rv < 0) {
417       if (errno == EINPROGRESS) {
418         // Connection in progress.
419         if (timeout > 0) {
420           // Start a timer in case the connection takes too long.
421           if (!writeTimeout_.scheduleTimeout(timeout)) {
422             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
423                 withAddr("failed to schedule AsyncSocket connect timeout"));
424           }
425         }
426
427         // Register for write events, so we'll
428         // be notified when the connection finishes/fails.
429         // Note that we don't register for a persistent event here.
430         assert(eventFlags_ == EventHandler::NONE);
431         eventFlags_ = EventHandler::WRITE;
432         if (!ioHandler_.registerHandler(eventFlags_)) {
433           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
434               withAddr("failed to register AsyncSocket connect handler"));
435         }
436         return;
437       } else {
438         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
439                                   "connect failed (immediately)", errno);
440       }
441     }
442
443     // If we're still here the connect() succeeded immediately.
444     // Fall through to call the callback outside of this try...catch block
445   } catch (const AsyncSocketException& ex) {
446     return failConnect(__func__, ex);
447   } catch (const std::exception& ex) {
448     // shouldn't happen, but handle it just in case
449     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
450                << "): unexpected " << typeid(ex).name() << " exception: "
451                << ex.what();
452     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
453                             withAddr(string("unexpected exception: ") +
454                                      ex.what()));
455     return failConnect(__func__, tex);
456   }
457
458   // The connection succeeded immediately
459   // The read callback may not have been set yet, and no writes may be pending
460   // yet, so we don't have to register for any events at the moment.
461   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
462   assert(readCallback_ == nullptr);
463   assert(writeReqHead_ == nullptr);
464   state_ = StateEnum::ESTABLISHED;
465   if (callback) {
466     connectCallback_ = nullptr;
467     callback->connectSuccess();
468   }
469 }
470
471 void AsyncSocket::connect(ConnectCallback* callback,
472                            const string& ip, uint16_t port,
473                            int timeout,
474                            const OptionMap &options) noexcept {
475   DestructorGuard dg(this);
476   try {
477     connectCallback_ = callback;
478     connect(callback, folly::SocketAddress(ip, port), timeout, options);
479   } catch (const std::exception& ex) {
480     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
481                             ex.what());
482     return failConnect(__func__, tex);
483   }
484 }
485
486 void AsyncSocket::cancelConnect() {
487   connectCallback_ = nullptr;
488   if (state_ == StateEnum::CONNECTING) {
489     closeNow();
490   }
491 }
492
493 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
494   sendTimeout_ = milliseconds;
495   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
496
497   // If we are currently pending on write requests, immediately update
498   // writeTimeout_ with the new value.
499   if ((eventFlags_ & EventHandler::WRITE) &&
500       (state_ != StateEnum::CONNECTING)) {
501     assert(state_ == StateEnum::ESTABLISHED);
502     assert((shutdownFlags_ & SHUT_WRITE) == 0);
503     if (sendTimeout_ > 0) {
504       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
505         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
506             withAddr("failed to reschedule send timeout in setSendTimeout"));
507         return failWrite(__func__, ex);
508       }
509     } else {
510       writeTimeout_.cancelTimeout();
511     }
512   }
513 }
514
515 void AsyncSocket::setReadCB(ReadCallback *callback) {
516   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
517           << ", callback=" << callback << ", state=" << state_;
518
519   // Short circuit if callback is the same as the existing readCallback_.
520   //
521   // Note that this is needed for proper functioning during some cleanup cases.
522   // During cleanup we allow setReadCallback(nullptr) to be called even if the
523   // read callback is already unset and we have been detached from an event
524   // base.  This check prevents us from asserting
525   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
526   if (callback == readCallback_) {
527     return;
528   }
529
530   if (shutdownFlags_ & SHUT_READ) {
531     // Reads have already been shut down on this socket.
532     //
533     // Allow setReadCallback(nullptr) to be called in this case, but don't
534     // allow a new callback to be set.
535     //
536     // For example, setReadCallback(nullptr) can happen after an error if we
537     // invoke some other error callback before invoking readError().  The other
538     // error callback that is invoked first may go ahead and clear the read
539     // callback before we get a chance to invoke readError().
540     if (callback != nullptr) {
541       return invalidState(callback);
542     }
543     assert((eventFlags_ & EventHandler::READ) == 0);
544     readCallback_ = nullptr;
545     return;
546   }
547
548   DestructorGuard dg(this);
549   assert(eventBase_->isInEventBaseThread());
550
551   switch ((StateEnum)state_) {
552     case StateEnum::CONNECTING:
553       // For convenience, we allow the read callback to be set while we are
554       // still connecting.  We just store the callback for now.  Once the
555       // connection completes we'll register for read events.
556       readCallback_ = callback;
557       return;
558     case StateEnum::ESTABLISHED:
559     {
560       readCallback_ = callback;
561       uint16_t oldFlags = eventFlags_;
562       if (readCallback_) {
563         eventFlags_ |= EventHandler::READ;
564       } else {
565         eventFlags_ &= ~EventHandler::READ;
566       }
567
568       // Update our registration if our flags have changed
569       if (eventFlags_ != oldFlags) {
570         // We intentionally ignore the return value here.
571         // updateEventRegistration() will move us into the error state if it
572         // fails, and we don't need to do anything else here afterwards.
573         (void)updateEventRegistration();
574       }
575
576       if (readCallback_) {
577         checkForImmediateRead();
578       }
579       return;
580     }
581     case StateEnum::CLOSED:
582     case StateEnum::ERROR:
583       // We should never reach here.  SHUT_READ should always be set
584       // if we are in STATE_CLOSED or STATE_ERROR.
585       assert(false);
586       return invalidState(callback);
587     case StateEnum::UNINIT:
588       // We do not allow setReadCallback() to be called before we start
589       // connecting.
590       return invalidState(callback);
591   }
592
593   // We don't put a default case in the switch statement, so that the compiler
594   // will warn us to update the switch statement if a new state is added.
595   return invalidState(callback);
596 }
597
598 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
599   return readCallback_;
600 }
601
602 void AsyncSocket::write(WriteCallback* callback,
603                          const void* buf, size_t bytes, WriteFlags flags) {
604   iovec op;
605   op.iov_base = const_cast<void*>(buf);
606   op.iov_len = bytes;
607   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
608 }
609
610 void AsyncSocket::writev(WriteCallback* callback,
611                           const iovec* vec,
612                           size_t count,
613                           WriteFlags flags) {
614   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
615 }
616
617 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
618                               WriteFlags flags) {
619   size_t count = buf->countChainElements();
620   if (count <= 64) {
621     iovec vec[count];
622     writeChainImpl(callback, vec, count, std::move(buf), flags);
623   } else {
624     iovec* vec = new iovec[count];
625     writeChainImpl(callback, vec, count, std::move(buf), flags);
626     delete[] vec;
627   }
628 }
629
630 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
631     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
632   size_t veclen = buf->fillIov(vec, count);
633   writeImpl(callback, vec, veclen, std::move(buf), flags);
634 }
635
636 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
637                              size_t count, unique_ptr<IOBuf>&& buf,
638                              WriteFlags flags) {
639   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
640           << ", callback=" << callback << ", count=" << count
641           << ", state=" << state_;
642   DestructorGuard dg(this);
643   unique_ptr<IOBuf>ioBuf(std::move(buf));
644   assert(eventBase_->isInEventBaseThread());
645
646   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
647     // No new writes may be performed after the write side of the socket has
648     // been shutdown.
649     //
650     // We could just call callback->writeError() here to fail just this write.
651     // However, fail hard and use invalidState() to fail all outstanding
652     // callbacks and move the socket into the error state.  There's most likely
653     // a bug in the caller's code, so we abort everything rather than trying to
654     // proceed as best we can.
655     return invalidState(callback);
656   }
657
658   uint32_t countWritten = 0;
659   uint32_t partialWritten = 0;
660   int bytesWritten = 0;
661   bool mustRegister = false;
662   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
663     if (writeReqHead_ == nullptr) {
664       // If we are established and there are no other writes pending,
665       // we can attempt to perform the write immediately.
666       assert(writeReqTail_ == nullptr);
667       assert((eventFlags_ & EventHandler::WRITE) == 0);
668
669       bytesWritten = performWrite(vec, count, flags,
670                                   &countWritten, &partialWritten);
671       if (bytesWritten < 0) {
672         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
673                                withAddr("writev failed"), errno);
674         return failWrite(__func__, callback, 0, ex);
675       } else if (countWritten == count) {
676         // We successfully wrote everything.
677         // Invoke the callback and return.
678         if (callback) {
679           callback->writeSuccess();
680         }
681         return;
682       } // else { continue writing the next writeReq }
683       mustRegister = true;
684     }
685   } else if (!connecting()) {
686     // Invalid state for writing
687     return invalidState(callback);
688   }
689
690   // Create a new WriteRequest to add to the queue
691   WriteRequest* req;
692   try {
693     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
694                                         count - countWritten, partialWritten,
695                                         bytesWritten, std::move(ioBuf), flags);
696   } catch (const std::exception& ex) {
697     // we mainly expect to catch std::bad_alloc here
698     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
699         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
700     return failWrite(__func__, callback, bytesWritten, tex);
701   }
702   req->consume();
703   if (writeReqTail_ == nullptr) {
704     assert(writeReqHead_ == nullptr);
705     writeReqHead_ = writeReqTail_ = req;
706   } else {
707     writeReqTail_->append(req);
708     writeReqTail_ = req;
709   }
710
711   // Register for write events if are established and not currently
712   // waiting on write events
713   if (mustRegister) {
714     assert(state_ == StateEnum::ESTABLISHED);
715     assert((eventFlags_ & EventHandler::WRITE) == 0);
716     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
717       assert(state_ == StateEnum::ERROR);
718       return;
719     }
720     if (sendTimeout_ > 0) {
721       // Schedule a timeout to fire if the write takes too long.
722       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
723         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
724                                withAddr("failed to schedule send timeout"));
725         return failWrite(__func__, ex);
726       }
727     }
728   }
729 }
730
731 void AsyncSocket::writeRequest(WriteRequest* req) {
732   if (writeReqTail_ == nullptr) {
733     assert(writeReqHead_ == nullptr);
734     writeReqHead_ = writeReqTail_ = req;
735     req->start();
736   } else {
737     writeReqTail_->append(req);
738     writeReqTail_ = req;
739   }
740 }
741
742 void AsyncSocket::close() {
743   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
744           << ", state=" << state_ << ", shutdownFlags="
745           << std::hex << (int) shutdownFlags_;
746
747   // close() is only different from closeNow() when there are pending writes
748   // that need to drain before we can close.  In all other cases, just call
749   // closeNow().
750   //
751   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
752   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
753   // is still running.  (e.g., If there are multiple pending writes, and we
754   // call writeError() on the first one, it may call close().  In this case we
755   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
756   // writes will still be in the queue.)
757   //
758   // We only need to drain pending writes if we are still in STATE_CONNECTING
759   // or STATE_ESTABLISHED
760   if ((writeReqHead_ == nullptr) ||
761       !(state_ == StateEnum::CONNECTING ||
762       state_ == StateEnum::ESTABLISHED)) {
763     closeNow();
764     return;
765   }
766
767   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
768   // destroyed until close() returns.
769   DestructorGuard dg(this);
770   assert(eventBase_->isInEventBaseThread());
771
772   // Since there are write requests pending, we have to set the
773   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
774   // connect finishes and we finish writing these requests.
775   //
776   // Set SHUT_READ to indicate that reads are shut down, and set the
777   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
778   // pending writes complete.
779   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
780
781   // If a read callback is set, invoke readEOF() immediately to inform it that
782   // the socket has been closed and no more data can be read.
783   if (readCallback_) {
784     // Disable reads if they are enabled
785     if (!updateEventRegistration(0, EventHandler::READ)) {
786       // We're now in the error state; callbacks have been cleaned up
787       assert(state_ == StateEnum::ERROR);
788       assert(readCallback_ == nullptr);
789     } else {
790       ReadCallback* callback = readCallback_;
791       readCallback_ = nullptr;
792       callback->readEOF();
793     }
794   }
795 }
796
797 void AsyncSocket::closeNow() {
798   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
799           << ", state=" << state_ << ", shutdownFlags="
800           << std::hex << (int) shutdownFlags_;
801   DestructorGuard dg(this);
802   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
803
804   switch (state_) {
805     case StateEnum::ESTABLISHED:
806     case StateEnum::CONNECTING:
807     {
808       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
809       state_ = StateEnum::CLOSED;
810
811       // If the write timeout was set, cancel it.
812       writeTimeout_.cancelTimeout();
813
814       // If we are registered for I/O events, unregister.
815       if (eventFlags_ != EventHandler::NONE) {
816         eventFlags_ = EventHandler::NONE;
817         if (!updateEventRegistration()) {
818           // We will have been moved into the error state.
819           assert(state_ == StateEnum::ERROR);
820           return;
821         }
822       }
823
824       if (immediateReadHandler_.isLoopCallbackScheduled()) {
825         immediateReadHandler_.cancelLoopCallback();
826       }
827
828       if (fd_ >= 0) {
829         ioHandler_.changeHandlerFD(-1);
830         doClose();
831       }
832
833       if (connectCallback_) {
834         ConnectCallback* callback = connectCallback_;
835         connectCallback_ = nullptr;
836         callback->connectErr(socketClosedLocallyEx);
837       }
838
839       failAllWrites(socketClosedLocallyEx);
840
841       if (readCallback_) {
842         ReadCallback* callback = readCallback_;
843         readCallback_ = nullptr;
844         callback->readEOF();
845       }
846       return;
847     }
848     case StateEnum::CLOSED:
849       // Do nothing.  It's possible that we are being called recursively
850       // from inside a callback that we invoked inside another call to close()
851       // that is still running.
852       return;
853     case StateEnum::ERROR:
854       // Do nothing.  The error handling code has performed (or is performing)
855       // cleanup.
856       return;
857     case StateEnum::UNINIT:
858       assert(eventFlags_ == EventHandler::NONE);
859       assert(connectCallback_ == nullptr);
860       assert(readCallback_ == nullptr);
861       assert(writeReqHead_ == nullptr);
862       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
863       state_ = StateEnum::CLOSED;
864       return;
865   }
866
867   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
868               << ") called in unknown state " << state_;
869 }
870
871 void AsyncSocket::closeWithReset() {
872   // Enable SO_LINGER, with the linger timeout set to 0.
873   // This will trigger a TCP reset when we close the socket.
874   if (fd_ >= 0) {
875     struct linger optLinger = {1, 0};
876     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
877       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
878               << "on " << fd_ << ": errno=" << errno;
879     }
880   }
881
882   // Then let closeNow() take care of the rest
883   closeNow();
884 }
885
886 void AsyncSocket::shutdownWrite() {
887   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
888           << ", state=" << state_ << ", shutdownFlags="
889           << std::hex << (int) shutdownFlags_;
890
891   // If there are no pending writes, shutdownWrite() is identical to
892   // shutdownWriteNow().
893   if (writeReqHead_ == nullptr) {
894     shutdownWriteNow();
895     return;
896   }
897
898   assert(eventBase_->isInEventBaseThread());
899
900   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
901   // shutdown will be performed once all writes complete.
902   shutdownFlags_ |= SHUT_WRITE_PENDING;
903 }
904
905 void AsyncSocket::shutdownWriteNow() {
906   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
907           << ", fd=" << fd_ << ", state=" << state_
908           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
909
910   if (shutdownFlags_ & SHUT_WRITE) {
911     // Writes are already shutdown; nothing else to do.
912     return;
913   }
914
915   // If SHUT_READ is already set, just call closeNow() to completely
916   // close the socket.  This can happen if close() was called with writes
917   // pending, and then shutdownWriteNow() is called before all pending writes
918   // complete.
919   if (shutdownFlags_ & SHUT_READ) {
920     closeNow();
921     return;
922   }
923
924   DestructorGuard dg(this);
925   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
926
927   switch (static_cast<StateEnum>(state_)) {
928     case StateEnum::ESTABLISHED:
929     {
930       shutdownFlags_ |= SHUT_WRITE;
931
932       // If the write timeout was set, cancel it.
933       writeTimeout_.cancelTimeout();
934
935       // If we are registered for write events, unregister.
936       if (!updateEventRegistration(0, EventHandler::WRITE)) {
937         // We will have been moved into the error state.
938         assert(state_ == StateEnum::ERROR);
939         return;
940       }
941
942       // Shutdown writes on the file descriptor
943       ::shutdown(fd_, SHUT_WR);
944
945       // Immediately fail all write requests
946       failAllWrites(socketShutdownForWritesEx);
947       return;
948     }
949     case StateEnum::CONNECTING:
950     {
951       // Set the SHUT_WRITE_PENDING flag.
952       // When the connection completes, it will check this flag,
953       // shutdown the write half of the socket, and then set SHUT_WRITE.
954       shutdownFlags_ |= SHUT_WRITE_PENDING;
955
956       // Immediately fail all write requests
957       failAllWrites(socketShutdownForWritesEx);
958       return;
959     }
960     case StateEnum::UNINIT:
961       // Callers normally shouldn't call shutdownWriteNow() before the socket
962       // even starts connecting.  Nonetheless, go ahead and set
963       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
964       // immediately shut down the write side of the socket.
965       shutdownFlags_ |= SHUT_WRITE_PENDING;
966       return;
967     case StateEnum::CLOSED:
968     case StateEnum::ERROR:
969       // We should never get here.  SHUT_WRITE should always be set
970       // in STATE_CLOSED and STATE_ERROR.
971       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
972                  << ", fd=" << fd_ << ") in unexpected state " << state_
973                  << " with SHUT_WRITE not set ("
974                  << std::hex << (int) shutdownFlags_ << ")";
975       assert(false);
976       return;
977   }
978
979   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
980               << fd_ << ") called in unknown state " << state_;
981 }
982
983 bool AsyncSocket::readable() const {
984   if (fd_ == -1) {
985     return false;
986   }
987   struct pollfd fds[1];
988   fds[0].fd = fd_;
989   fds[0].events = POLLIN;
990   fds[0].revents = 0;
991   int rc = poll(fds, 1, 0);
992   return rc == 1;
993 }
994
995 bool AsyncSocket::isPending() const {
996   return ioHandler_.isPending();
997 }
998
999 bool AsyncSocket::hangup() const {
1000   if (fd_ == -1) {
1001     // sanity check, no one should ask for hangup if we are not connected.
1002     assert(false);
1003     return false;
1004   }
1005 #ifdef POLLRDHUP // Linux-only
1006   struct pollfd fds[1];
1007   fds[0].fd = fd_;
1008   fds[0].events = POLLRDHUP|POLLHUP;
1009   fds[0].revents = 0;
1010   poll(fds, 1, 0);
1011   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1012 #else
1013   return false;
1014 #endif
1015 }
1016
1017 bool AsyncSocket::good() const {
1018   return ((state_ == StateEnum::CONNECTING ||
1019           state_ == StateEnum::ESTABLISHED) &&
1020           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1021 }
1022
1023 bool AsyncSocket::error() const {
1024   return (state_ == StateEnum::ERROR);
1025 }
1026
1027 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1028   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1029           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1030           << ", state=" << state_ << ", events="
1031           << std::hex << eventFlags_ << ")";
1032   assert(eventBase_ == nullptr);
1033   assert(eventBase->isInEventBaseThread());
1034
1035   eventBase_ = eventBase;
1036   ioHandler_.attachEventBase(eventBase);
1037   writeTimeout_.attachEventBase(eventBase);
1038 }
1039
1040 void AsyncSocket::detachEventBase() {
1041   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1042           << ", old evb=" << eventBase_ << ", state=" << state_
1043           << ", events=" << std::hex << eventFlags_ << ")";
1044   assert(eventBase_ != nullptr);
1045   assert(eventBase_->isInEventBaseThread());
1046
1047   eventBase_ = nullptr;
1048   ioHandler_.detachEventBase();
1049   writeTimeout_.detachEventBase();
1050 }
1051
1052 bool AsyncSocket::isDetachable() const {
1053   DCHECK(eventBase_ != nullptr);
1054   DCHECK(eventBase_->isInEventBaseThread());
1055
1056   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1057 }
1058
1059 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1060   address->setFromLocalAddress(fd_);
1061 }
1062
1063 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1064   if (!addr_.isInitialized()) {
1065     addr_.setFromPeerAddress(fd_);
1066   }
1067   *address = addr_;
1068 }
1069
1070 int AsyncSocket::setNoDelay(bool noDelay) {
1071   if (fd_ < 0) {
1072     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1073                << this << "(state=" << state_ << ")";
1074     return EINVAL;
1075
1076   }
1077
1078   int value = noDelay ? 1 : 0;
1079   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1080     int errnoCopy = errno;
1081     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1082             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1083             << strerror(errnoCopy);
1084     return errnoCopy;
1085   }
1086
1087   return 0;
1088 }
1089
1090 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1091
1092   #ifndef TCP_CONGESTION
1093   #define TCP_CONGESTION  13
1094   #endif
1095
1096   if (fd_ < 0) {
1097     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1098                << "socket " << this << "(state=" << state_ << ")";
1099     return EINVAL;
1100
1101   }
1102
1103   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1104         cname.length() + 1) != 0) {
1105     int errnoCopy = errno;
1106     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1107             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1108             << strerror(errnoCopy);
1109     return errnoCopy;
1110   }
1111
1112   return 0;
1113 }
1114
1115 int AsyncSocket::setQuickAck(bool quickack) {
1116   if (fd_ < 0) {
1117     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1118                << this << "(state=" << state_ << ")";
1119     return EINVAL;
1120
1121   }
1122
1123 #ifdef TCP_QUICKACK // Linux-only
1124   int value = quickack ? 1 : 0;
1125   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1126     int errnoCopy = errno;
1127     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1128             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1129             << strerror(errnoCopy);
1130     return errnoCopy;
1131   }
1132
1133   return 0;
1134 #else
1135   return ENOSYS;
1136 #endif
1137 }
1138
1139 int AsyncSocket::setSendBufSize(size_t bufsize) {
1140   if (fd_ < 0) {
1141     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1142                << this << "(state=" << state_ << ")";
1143     return EINVAL;
1144   }
1145
1146   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1147     int errnoCopy = errno;
1148     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1149             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1150             << strerror(errnoCopy);
1151     return errnoCopy;
1152   }
1153
1154   return 0;
1155 }
1156
1157 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1158   if (fd_ < 0) {
1159     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1160                << this << "(state=" << state_ << ")";
1161     return EINVAL;
1162   }
1163
1164   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1165     int errnoCopy = errno;
1166     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1167             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1168             << strerror(errnoCopy);
1169     return errnoCopy;
1170   }
1171
1172   return 0;
1173 }
1174
1175 int AsyncSocket::setTCPProfile(int profd) {
1176   if (fd_ < 0) {
1177     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1178                << this << "(state=" << state_ << ")";
1179     return EINVAL;
1180   }
1181
1182   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1183     int errnoCopy = errno;
1184     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1185             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1186             << strerror(errnoCopy);
1187     return errnoCopy;
1188   }
1189
1190   return 0;
1191 }
1192
1193 void AsyncSocket::ioReady(uint16_t events) noexcept {
1194   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1195           << ", events=" << std::hex << events << ", state=" << state_;
1196   DestructorGuard dg(this);
1197   assert(events & EventHandler::READ_WRITE);
1198   assert(eventBase_->isInEventBaseThread());
1199
1200   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1201   if (relevantEvents == EventHandler::READ) {
1202     handleRead();
1203   } else if (relevantEvents == EventHandler::WRITE) {
1204     handleWrite();
1205   } else if (relevantEvents == EventHandler::READ_WRITE) {
1206     EventBase* originalEventBase = eventBase_;
1207     // If both read and write events are ready, process writes first.
1208     handleWrite();
1209
1210     // Return now if handleWrite() detached us from our EventBase
1211     if (eventBase_ != originalEventBase) {
1212       return;
1213     }
1214
1215     // Only call handleRead() if a read callback is still installed.
1216     // (It's possible that the read callback was uninstalled during
1217     // handleWrite().)
1218     if (readCallback_) {
1219       handleRead();
1220     }
1221   } else {
1222     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1223                << std::hex << events << "(this=" << this << ")";
1224     abort();
1225   }
1226 }
1227
1228 ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
1229   ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
1230   if (bytes < 0) {
1231     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1232       // No more data to read right now.
1233       return READ_BLOCKING;
1234     } else {
1235       return READ_ERROR;
1236     }
1237   } else {
1238     appBytesReceived_ += bytes;
1239     return bytes;
1240   }
1241 }
1242
1243 void AsyncSocket::handleRead() noexcept {
1244   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1245           << ", state=" << state_;
1246   assert(state_ == StateEnum::ESTABLISHED);
1247   assert((shutdownFlags_ & SHUT_READ) == 0);
1248   assert(readCallback_ != nullptr);
1249   assert(eventFlags_ & EventHandler::READ);
1250
1251   // Loop until:
1252   // - a read attempt would block
1253   // - readCallback_ is uninstalled
1254   // - the number of loop iterations exceeds the optional maximum
1255   // - this AsyncSocket is moved to another EventBase
1256   //
1257   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1258   // which is why need to check for it here.
1259   //
1260   // The last bullet point is slightly subtle.  readDataAvailable() may also
1261   // detach this socket from this EventBase.  However, before
1262   // readDataAvailable() returns another thread may pick it up, attach it to
1263   // a different EventBase, and install another readCallback_.  We need to
1264   // exit immediately after readDataAvailable() returns if the eventBase_ has
1265   // changed.  (The caller must perform some sort of locking to transfer the
1266   // AsyncSocket between threads properly.  This will be sufficient to ensure
1267   // that this thread sees the updated eventBase_ variable after
1268   // readDataAvailable() returns.)
1269   uint16_t numReads = 0;
1270   EventBase* originalEventBase = eventBase_;
1271   while (readCallback_ && eventBase_ == originalEventBase) {
1272     // Get the buffer to read into.
1273     void* buf = nullptr;
1274     size_t buflen = 0;
1275     try {
1276       readCallback_->getReadBuffer(&buf, &buflen);
1277     } catch (const AsyncSocketException& ex) {
1278       return failRead(__func__, ex);
1279     } catch (const std::exception& ex) {
1280       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1281                               string("ReadCallback::getReadBuffer() "
1282                                      "threw exception: ") +
1283                               ex.what());
1284       return failRead(__func__, tex);
1285     } catch (...) {
1286       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1287                              "ReadCallback::getReadBuffer() threw "
1288                              "non-exception type");
1289       return failRead(__func__, ex);
1290     }
1291     if (buf == nullptr || buflen == 0) {
1292       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1293                              "ReadCallback::getReadBuffer() returned "
1294                              "empty buffer");
1295       return failRead(__func__, ex);
1296     }
1297
1298     // Perform the read
1299     ssize_t bytesRead = performRead(buf, buflen);
1300     if (bytesRead > 0) {
1301       readCallback_->readDataAvailable(bytesRead);
1302       // Fall through and continue around the loop if the read
1303       // completely filled the available buffer.
1304       // Note that readCallback_ may have been uninstalled or changed inside
1305       // readDataAvailable().
1306       if (size_t(bytesRead) < buflen) {
1307         return;
1308       }
1309     } else if (bytesRead == READ_BLOCKING) {
1310         // No more data to read right now.
1311         return;
1312     } else if (bytesRead == READ_ERROR) {
1313       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1314                              withAddr("recv() failed"), errno);
1315       return failRead(__func__, ex);
1316     } else {
1317       assert(bytesRead == READ_EOF);
1318       // EOF
1319       shutdownFlags_ |= SHUT_READ;
1320       if (!updateEventRegistration(0, EventHandler::READ)) {
1321         // we've already been moved into STATE_ERROR
1322         assert(state_ == StateEnum::ERROR);
1323         assert(readCallback_ == nullptr);
1324         return;
1325       }
1326
1327       ReadCallback* callback = readCallback_;
1328       readCallback_ = nullptr;
1329       callback->readEOF();
1330       return;
1331     }
1332     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1333       // We might still have data in the socket.
1334       // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1335       scheduleImmediateRead();
1336       return;
1337     }
1338   }
1339 }
1340
1341 /**
1342  * This function attempts to write as much data as possible, until no more data
1343  * can be written.
1344  *
1345  * - If it sends all available data, it unregisters for write events, and stops
1346  *   the writeTimeout_.
1347  *
1348  * - If not all of the data can be sent immediately, it reschedules
1349  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1350  *   registered for write events.
1351  */
1352 void AsyncSocket::handleWrite() noexcept {
1353   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1354           << ", state=" << state_;
1355   if (state_ == StateEnum::CONNECTING) {
1356     handleConnect();
1357     return;
1358   }
1359
1360   // Normal write
1361   assert(state_ == StateEnum::ESTABLISHED);
1362   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1363   assert(writeReqHead_ != nullptr);
1364
1365   // Loop until we run out of write requests,
1366   // or until this socket is moved to another EventBase.
1367   // (See the comment in handleRead() explaining how this can happen.)
1368   EventBase* originalEventBase = eventBase_;
1369   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1370     if (!writeReqHead_->performWrite()) {
1371       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1372                              withAddr("writev() failed"), errno);
1373       return failWrite(__func__, ex);
1374     } else if (writeReqHead_->isComplete()) {
1375       // We finished this request
1376       WriteRequest* req = writeReqHead_;
1377       writeReqHead_ = req->getNext();
1378
1379       if (writeReqHead_ == nullptr) {
1380         writeReqTail_ = nullptr;
1381         // This is the last write request.
1382         // Unregister for write events and cancel the send timer
1383         // before we invoke the callback.  We have to update the state properly
1384         // before calling the callback, since it may want to detach us from
1385         // the EventBase.
1386         if (eventFlags_ & EventHandler::WRITE) {
1387           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1388             assert(state_ == StateEnum::ERROR);
1389             return;
1390           }
1391           // Stop the send timeout
1392           writeTimeout_.cancelTimeout();
1393         }
1394         assert(!writeTimeout_.isScheduled());
1395
1396         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1397         // we finish sending the last write request.
1398         //
1399         // We have to do this before invoking writeSuccess(), since
1400         // writeSuccess() may detach us from our EventBase.
1401         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1402           assert(connectCallback_ == nullptr);
1403           shutdownFlags_ |= SHUT_WRITE;
1404
1405           if (shutdownFlags_ & SHUT_READ) {
1406             // Reads have already been shutdown.  Fully close the socket and
1407             // move to STATE_CLOSED.
1408             //
1409             // Note: This code currently moves us to STATE_CLOSED even if
1410             // close() hasn't ever been called.  This can occur if we have
1411             // received EOF from the peer and shutdownWrite() has been called
1412             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1413             // case, until close() is actually called?  I can't think of a
1414             // reason why we would need to do so.  No other operations besides
1415             // calling close() or destroying the socket can be performed at
1416             // this point.
1417             assert(readCallback_ == nullptr);
1418             state_ = StateEnum::CLOSED;
1419             if (fd_ >= 0) {
1420               ioHandler_.changeHandlerFD(-1);
1421               doClose();
1422             }
1423           } else {
1424             // Reads are still enabled, so we are only doing a half-shutdown
1425             ::shutdown(fd_, SHUT_WR);
1426           }
1427         }
1428       }
1429
1430       // Invoke the callback
1431       WriteCallback* callback = req->getCallback();
1432       req->destroy();
1433       if (callback) {
1434         callback->writeSuccess();
1435       }
1436       // We'll continue around the loop, trying to write another request
1437     } else {
1438       // Partial write.
1439       writeReqHead_->consume();
1440       // Stop after a partial write; it's highly likely that a subsequent write
1441       // attempt will just return EAGAIN.
1442       //
1443       // Ensure that we are registered for write events.
1444       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1445         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1446           assert(state_ == StateEnum::ERROR);
1447           return;
1448         }
1449       }
1450
1451       // Reschedule the send timeout, since we have made some write progress.
1452       if (sendTimeout_ > 0) {
1453         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1454           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1455               withAddr("failed to reschedule write timeout"));
1456           return failWrite(__func__, ex);
1457         }
1458       }
1459       return;
1460     }
1461   }
1462 }
1463
1464 void AsyncSocket::checkForImmediateRead() noexcept {
1465   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1466   // (However, note that some subclasses do override this method.)
1467   //
1468   // Simply calling handleRead() here would be bad, as this would call
1469   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1470   // buffer even though no data may be available.  This would waste lots of
1471   // memory, since the buffer will sit around unused until the socket actually
1472   // becomes readable.
1473   //
1474   // Checking if the socket is readable now also seems like it would probably
1475   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1476   // would just waste an extra system call.  Even if it is readable, waiting to
1477   // find out from libevent on the next event loop doesn't seem that bad.
1478 }
1479
1480 void AsyncSocket::handleInitialReadWrite() noexcept {
1481   // Our callers should already be holding a DestructorGuard, but grab
1482   // one here just to make sure, in case one of our calling code paths ever
1483   // changes.
1484   DestructorGuard dg(this);
1485
1486   // If we have a readCallback_, make sure we enable read events.  We
1487   // may already be registered for reads if connectSuccess() set
1488   // the read calback.
1489   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1490     assert(state_ == StateEnum::ESTABLISHED);
1491     assert((shutdownFlags_ & SHUT_READ) == 0);
1492     if (!updateEventRegistration(EventHandler::READ, 0)) {
1493       assert(state_ == StateEnum::ERROR);
1494       return;
1495     }
1496     checkForImmediateRead();
1497   } else if (readCallback_ == nullptr) {
1498     // Unregister for read events.
1499     updateEventRegistration(0, EventHandler::READ);
1500   }
1501
1502   // If we have write requests pending, try to send them immediately.
1503   // Since we just finished accepting, there is a very good chance that we can
1504   // write without blocking.
1505   //
1506   // However, we only process them if EventHandler::WRITE is not already set,
1507   // which means that we're already blocked on a write attempt.  (This can
1508   // happen if connectSuccess() called write() before returning.)
1509   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1510     // Call handleWrite() to perform write processing.
1511     handleWrite();
1512   } else if (writeReqHead_ == nullptr) {
1513     // Unregister for write event.
1514     updateEventRegistration(0, EventHandler::WRITE);
1515   }
1516 }
1517
1518 void AsyncSocket::handleConnect() noexcept {
1519   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1520           << ", state=" << state_;
1521   assert(state_ == StateEnum::CONNECTING);
1522   // SHUT_WRITE can never be set while we are still connecting;
1523   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1524   // finishes
1525   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1526
1527   // In case we had a connect timeout, cancel the timeout
1528   writeTimeout_.cancelTimeout();
1529   // We don't use a persistent registration when waiting on a connect event,
1530   // so we have been automatically unregistered now.  Update eventFlags_ to
1531   // reflect reality.
1532   assert(eventFlags_ == EventHandler::WRITE);
1533   eventFlags_ = EventHandler::NONE;
1534
1535   // Call getsockopt() to check if the connect succeeded
1536   int error;
1537   socklen_t len = sizeof(error);
1538   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1539   if (rv != 0) {
1540     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1541                            withAddr("error calling getsockopt() after connect"),
1542                            errno);
1543     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1544                << fd_ << " host=" << addr_.describe()
1545                << ") exception:" << ex.what();
1546     return failConnect(__func__, ex);
1547   }
1548
1549   if (error != 0) {
1550     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1551                            "connect failed", error);
1552     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1553             << fd_ << " host=" << addr_.describe()
1554             << ") exception: " << ex.what();
1555     return failConnect(__func__, ex);
1556   }
1557
1558   // Move into STATE_ESTABLISHED
1559   state_ = StateEnum::ESTABLISHED;
1560
1561   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1562   // perform, immediately shutdown the write half of the socket.
1563   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1564     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1565     // are still connecting we just abort the connect rather than waiting for
1566     // it to complete.
1567     assert((shutdownFlags_ & SHUT_READ) == 0);
1568     ::shutdown(fd_, SHUT_WR);
1569     shutdownFlags_ |= SHUT_WRITE;
1570   }
1571
1572   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1573           << "successfully connected; state=" << state_;
1574
1575   // Remember the EventBase we are attached to, before we start invoking any
1576   // callbacks (since the callbacks may call detachEventBase()).
1577   EventBase* originalEventBase = eventBase_;
1578
1579   // Call the connect callback.
1580   if (connectCallback_) {
1581     ConnectCallback* callback = connectCallback_;
1582     connectCallback_ = nullptr;
1583     callback->connectSuccess();
1584   }
1585
1586   // Note that the connect callback may have changed our state.
1587   // (set or unset the read callback, called write(), closed the socket, etc.)
1588   // The following code needs to handle these situations correctly.
1589   //
1590   // If the socket has been closed, readCallback_ and writeReqHead_ will
1591   // always be nullptr, so that will prevent us from trying to read or write.
1592   //
1593   // The main thing to check for is if eventBase_ is still originalEventBase.
1594   // If not, we have been detached from this event base, so we shouldn't
1595   // perform any more operations.
1596   if (eventBase_ != originalEventBase) {
1597     return;
1598   }
1599
1600   handleInitialReadWrite();
1601 }
1602
1603 void AsyncSocket::timeoutExpired() noexcept {
1604   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1605           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1606   DestructorGuard dg(this);
1607   assert(eventBase_->isInEventBaseThread());
1608
1609   if (state_ == StateEnum::CONNECTING) {
1610     // connect() timed out
1611     // Unregister for I/O events.
1612     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1613                            "connect timed out");
1614     failConnect(__func__, ex);
1615   } else {
1616     // a normal write operation timed out
1617     assert(state_ == StateEnum::ESTABLISHED);
1618     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1619     failWrite(__func__, ex);
1620   }
1621 }
1622
1623 ssize_t AsyncSocket::performWrite(const iovec* vec,
1624                                    uint32_t count,
1625                                    WriteFlags flags,
1626                                    uint32_t* countWritten,
1627                                    uint32_t* partialWritten) {
1628   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1629   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1630   // (since it may terminate the program if the main program doesn't explicitly
1631   // ignore it).
1632   struct msghdr msg;
1633   msg.msg_name = nullptr;
1634   msg.msg_namelen = 0;
1635   msg.msg_iov = const_cast<iovec *>(vec);
1636 #ifdef IOV_MAX // not defined on Android
1637   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1638 #else
1639   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1640 #endif
1641   msg.msg_control = nullptr;
1642   msg.msg_controllen = 0;
1643   msg.msg_flags = 0;
1644
1645   int msg_flags = MSG_DONTWAIT;
1646
1647 #ifdef MSG_NOSIGNAL // Linux-only
1648   msg_flags |= MSG_NOSIGNAL;
1649   if (isSet(flags, WriteFlags::CORK)) {
1650     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1651     // give it the rest of the data rather than immediately sending a partial
1652     // frame, even when TCP_NODELAY is enabled.
1653     msg_flags |= MSG_MORE;
1654   }
1655 #endif
1656   if (isSet(flags, WriteFlags::EOR)) {
1657     // marks that this is the last byte of a record (response)
1658     msg_flags |= MSG_EOR;
1659   }
1660   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1661   if (totalWritten < 0) {
1662     if (errno == EAGAIN) {
1663       // TCP buffer is full; we can't write any more data right now.
1664       *countWritten = 0;
1665       *partialWritten = 0;
1666       return 0;
1667     }
1668     // error
1669     *countWritten = 0;
1670     *partialWritten = 0;
1671     return -1;
1672   }
1673
1674   appBytesWritten_ += totalWritten;
1675
1676   uint32_t bytesWritten;
1677   uint32_t n;
1678   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1679     const iovec* v = vec + n;
1680     if (v->iov_len > bytesWritten) {
1681       // Partial write finished in the middle of this iovec
1682       *countWritten = n;
1683       *partialWritten = bytesWritten;
1684       return totalWritten;
1685     }
1686
1687     bytesWritten -= v->iov_len;
1688   }
1689
1690   assert(bytesWritten == 0);
1691   *countWritten = n;
1692   *partialWritten = 0;
1693   return totalWritten;
1694 }
1695
1696 /**
1697  * Re-register the EventHandler after eventFlags_ has changed.
1698  *
1699  * If an error occurs, fail() is called to move the socket into the error state
1700  * and call all currently installed callbacks.  After an error, the
1701  * AsyncSocket is completely unregistered.
1702  *
1703  * @return Returns true on succcess, or false on error.
1704  */
1705 bool AsyncSocket::updateEventRegistration() {
1706   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1707           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1708           << ", events=" << std::hex << eventFlags_;
1709   assert(eventBase_->isInEventBaseThread());
1710   if (eventFlags_ == EventHandler::NONE) {
1711     ioHandler_.unregisterHandler();
1712     return true;
1713   }
1714
1715   // Always register for persistent events, so we don't have to re-register
1716   // after being called back.
1717   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1718     eventFlags_ = EventHandler::NONE; // we're not registered after error
1719     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1720         withAddr("failed to update AsyncSocket event registration"));
1721     fail("updateEventRegistration", ex);
1722     return false;
1723   }
1724
1725   return true;
1726 }
1727
1728 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1729                                            uint16_t disable) {
1730   uint16_t oldFlags = eventFlags_;
1731   eventFlags_ |= enable;
1732   eventFlags_ &= ~disable;
1733   if (eventFlags_ == oldFlags) {
1734     return true;
1735   } else {
1736     return updateEventRegistration();
1737   }
1738 }
1739
1740 void AsyncSocket::startFail() {
1741   // startFail() should only be called once
1742   assert(state_ != StateEnum::ERROR);
1743   assert(getDestructorGuardCount() > 0);
1744   state_ = StateEnum::ERROR;
1745   // Ensure that SHUT_READ and SHUT_WRITE are set,
1746   // so all future attempts to read or write will be rejected
1747   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1748
1749   if (eventFlags_ != EventHandler::NONE) {
1750     eventFlags_ = EventHandler::NONE;
1751     ioHandler_.unregisterHandler();
1752   }
1753   writeTimeout_.cancelTimeout();
1754
1755   if (fd_ >= 0) {
1756     ioHandler_.changeHandlerFD(-1);
1757     doClose();
1758   }
1759 }
1760
1761 void AsyncSocket::finishFail() {
1762   assert(state_ == StateEnum::ERROR);
1763   assert(getDestructorGuardCount() > 0);
1764
1765   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1766                          withAddr("socket closing after error"));
1767   if (connectCallback_) {
1768     ConnectCallback* callback = connectCallback_;
1769     connectCallback_ = nullptr;
1770     callback->connectErr(ex);
1771   }
1772
1773   failAllWrites(ex);
1774
1775   if (readCallback_) {
1776     ReadCallback* callback = readCallback_;
1777     readCallback_ = nullptr;
1778     callback->readErr(ex);
1779   }
1780 }
1781
1782 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1783   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1784              << state_ << " host=" << addr_.describe()
1785              << "): failed in " << fn << "(): "
1786              << ex.what();
1787   startFail();
1788   finishFail();
1789 }
1790
1791 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1792   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1793                << state_ << " host=" << addr_.describe()
1794                << "): failed while connecting in " << fn << "(): "
1795                << ex.what();
1796   startFail();
1797
1798   if (connectCallback_ != nullptr) {
1799     ConnectCallback* callback = connectCallback_;
1800     connectCallback_ = nullptr;
1801     callback->connectErr(ex);
1802   }
1803
1804   finishFail();
1805 }
1806
1807 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1808   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1809                << state_ << " host=" << addr_.describe()
1810                << "): failed while reading in " << fn << "(): "
1811                << ex.what();
1812   startFail();
1813
1814   if (readCallback_ != nullptr) {
1815     ReadCallback* callback = readCallback_;
1816     readCallback_ = nullptr;
1817     callback->readErr(ex);
1818   }
1819
1820   finishFail();
1821 }
1822
1823 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1824   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1825                << state_ << " host=" << addr_.describe()
1826                << "): failed while writing in " << fn << "(): "
1827                << ex.what();
1828   startFail();
1829
1830   // Only invoke the first write callback, since the error occurred while
1831   // writing this request.  Let any other pending write callbacks be invoked in
1832   // finishFail().
1833   if (writeReqHead_ != nullptr) {
1834     WriteRequest* req = writeReqHead_;
1835     writeReqHead_ = req->getNext();
1836     WriteCallback* callback = req->getCallback();
1837     uint32_t bytesWritten = req->getTotalBytesWritten();
1838     req->destroy();
1839     if (callback) {
1840       callback->writeErr(bytesWritten, ex);
1841     }
1842   }
1843
1844   finishFail();
1845 }
1846
1847 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1848                              size_t bytesWritten,
1849                              const AsyncSocketException& ex) {
1850   // This version of failWrite() is used when the failure occurs before
1851   // we've added the callback to writeReqHead_.
1852   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1853              << state_ << " host=" << addr_.describe()
1854              <<"): failed while writing in " << fn << "(): "
1855              << ex.what();
1856   startFail();
1857
1858   if (callback != nullptr) {
1859     callback->writeErr(bytesWritten, ex);
1860   }
1861
1862   finishFail();
1863 }
1864
1865 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1866   // Invoke writeError() on all write callbacks.
1867   // This is used when writes are forcibly shutdown with write requests
1868   // pending, or when an error occurs with writes pending.
1869   while (writeReqHead_ != nullptr) {
1870     WriteRequest* req = writeReqHead_;
1871     writeReqHead_ = req->getNext();
1872     WriteCallback* callback = req->getCallback();
1873     if (callback) {
1874       callback->writeErr(req->getTotalBytesWritten(), ex);
1875     }
1876     req->destroy();
1877   }
1878 }
1879
1880 void AsyncSocket::invalidState(ConnectCallback* callback) {
1881   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1882              << "): connect() called in invalid state " << state_;
1883
1884   /*
1885    * The invalidState() methods don't use the normal failure mechanisms,
1886    * since we don't know what state we are in.  We don't want to call
1887    * startFail()/finishFail() recursively if we are already in the middle of
1888    * cleaning up.
1889    */
1890
1891   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1892                          "connect() called with socket in invalid state");
1893   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1894     if (callback) {
1895       callback->connectErr(ex);
1896     }
1897   } else {
1898     // We can't use failConnect() here since connectCallback_
1899     // may already be set to another callback.  Invoke this ConnectCallback
1900     // here; any other connectCallback_ will be invoked in finishFail()
1901     startFail();
1902     if (callback) {
1903       callback->connectErr(ex);
1904     }
1905     finishFail();
1906   }
1907 }
1908
1909 void AsyncSocket::invalidState(ReadCallback* callback) {
1910   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1911              << "): setReadCallback(" << callback
1912              << ") called in invalid state " << state_;
1913
1914   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1915                          "setReadCallback() called with socket in "
1916                          "invalid state");
1917   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1918     if (callback) {
1919       callback->readErr(ex);
1920     }
1921   } else {
1922     startFail();
1923     if (callback) {
1924       callback->readErr(ex);
1925     }
1926     finishFail();
1927   }
1928 }
1929
1930 void AsyncSocket::invalidState(WriteCallback* callback) {
1931   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1932              << "): write() called in invalid state " << state_;
1933
1934   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1935                          withAddr("write() called with socket in invalid state"));
1936   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1937     if (callback) {
1938       callback->writeErr(0, ex);
1939     }
1940   } else {
1941     startFail();
1942     if (callback) {
1943       callback->writeErr(0, ex);
1944     }
1945     finishFail();
1946   }
1947 }
1948
1949 void AsyncSocket::doClose() {
1950   if (fd_ == -1) return;
1951   if (shutdownSocketSet_) {
1952     shutdownSocketSet_->close(fd_);
1953   } else {
1954     ::close(fd_);
1955   }
1956   fd_ = -1;
1957 }
1958
1959 std::ostream& operator << (std::ostream& os,
1960                            const AsyncSocket::StateEnum& state) {
1961   os << static_cast<int>(state);
1962   return os;
1963 }
1964
1965 std::string AsyncSocket::withAddr(const std::string& s) {
1966   // Don't use addr_ directly because it may not be initialized
1967   // e.g. if constructed from fd
1968   folly::SocketAddress peer, local;
1969   try {
1970     getPeerAddress(&peer);
1971     getLocalAddress(&local);
1972   } catch (const std::exception&) {
1973     // ignore
1974   } catch (...) {
1975     // ignore
1976   }
1977   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
1978 }
1979
1980 } // folly