Fix -Wsign-compare
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22
23 #include <poll.h>
24 #include <errno.h>
25 #include <limits.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <netinet/tcp.h>
32
33 using std::string;
34 using std::unique_ptr;
35
36 namespace folly {
37
38 // static members initializers
39 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
40 const folly::SocketAddress AsyncSocket::anyAddress =
41   folly::SocketAddress("0.0.0.0", 0);
42
43 const AsyncSocketException socketClosedLocallyEx(
44     AsyncSocketException::END_OF_FILE, "socket closed locally");
45 const AsyncSocketException socketShutdownForWritesEx(
46     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
47
48 // TODO: It might help performance to provide a version of WriteRequest that
49 // users could derive from, so we can avoid the extra allocation for each call
50 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
51 // protocols are currently templatized for transports.
52 //
53 // We would need the version for external users where they provide the iovec
54 // storage space, and only our internal version would allocate it at the end of
55 // the WriteRequest.
56
57 /**
58  * A WriteRequest object tracks information about a pending write() or writev()
59  * operation.
60  *
61  * A new WriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::WriteRequest {
65  public:
66   static WriteRequest* newRequest(WriteCallback* callback,
67                                   const iovec* ops,
68                                   uint32_t opCount,
69                                   unique_ptr<IOBuf>&& ioBuf,
70                                   WriteFlags flags) {
71     assert(opCount > 0);
72     // Since we put a variable size iovec array at the end
73     // of each WriteRequest, we have to manually allocate the memory.
74     void* buf = malloc(sizeof(WriteRequest) +
75                        (opCount * sizeof(struct iovec)));
76     if (buf == nullptr) {
77       throw std::bad_alloc();
78     }
79
80     return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf),
81                                  flags);
82   }
83
84   void destroy() {
85     this->~WriteRequest();
86     free(this);
87   }
88
89   bool cork() const {
90     return isSet(flags_, WriteFlags::CORK);
91   }
92
93   WriteFlags flags() const {
94     return flags_;
95   }
96
97   WriteRequest* getNext() const {
98     return next_;
99   }
100
101   WriteCallback* getCallback() const {
102     return callback_;
103   }
104
105   uint32_t getBytesWritten() const {
106     return bytesWritten_;
107   }
108
109   const struct iovec* getOps() const {
110     assert(opCount_ > opIndex_);
111     return writeOps_ + opIndex_;
112   }
113
114   uint32_t getOpCount() const {
115     assert(opCount_ > opIndex_);
116     return opCount_ - opIndex_;
117   }
118
119   void consume(uint32_t wholeOps, uint32_t partialBytes,
120                uint32_t totalBytesWritten) {
121     // Advance opIndex_ forward by wholeOps
122     opIndex_ += wholeOps;
123     assert(opIndex_ < opCount_);
124
125     // If we've finished writing any IOBufs, release them
126     if (ioBuf_) {
127       for (uint32_t i = wholeOps; i != 0; --i) {
128         assert(ioBuf_);
129         ioBuf_ = ioBuf_->pop();
130       }
131     }
132
133     // Move partialBytes forward into the current iovec buffer
134     struct iovec* currentOp = writeOps_ + opIndex_;
135     assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0));
136     currentOp->iov_base =
137       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes;
138     currentOp->iov_len -= partialBytes;
139
140     // Increment the bytesWritten_ count by totalBytesWritten
141     bytesWritten_ += totalBytesWritten;
142   }
143
144   void append(WriteRequest* next) {
145     assert(next_ == nullptr);
146     next_ = next;
147   }
148
149  private:
150   WriteRequest(WriteCallback* callback,
151                const struct iovec* ops,
152                uint32_t opCount,
153                unique_ptr<IOBuf>&& ioBuf,
154                WriteFlags flags)
155     : next_(nullptr)
156     , callback_(callback)
157     , bytesWritten_(0)
158     , opCount_(opCount)
159     , opIndex_(0)
160     , flags_(flags)
161     , ioBuf_(std::move(ioBuf)) {
162     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
163   }
164
165   // Private destructor, to ensure callers use destroy()
166   ~WriteRequest() {}
167
168   WriteRequest* next_;          ///< pointer to next WriteRequest
169   WriteCallback* callback_;     ///< completion callback
170   uint32_t bytesWritten_;       ///< bytes written
171   uint32_t opCount_;            ///< number of entries in writeOps_
172   uint32_t opIndex_;            ///< current index into writeOps_
173   WriteFlags flags_;            ///< set for WriteFlags
174   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
175   struct iovec writeOps_[];     ///< write operation(s) list
176 };
177
178 AsyncSocket::AsyncSocket(EventBase* evb)
179   : eventBase_(evb)
180   , writeTimeout_(this, evb)
181   , ioHandler_(this, evb) {
182   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb,
187                            const folly::SocketAddress& address,
188                            uint32_t connectTimeout)
189   : eventBase_(evb)
190   , writeTimeout_(this, evb)
191   , ioHandler_(this, evb) {
192   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
193   init();
194   connect(nullptr, address, connectTimeout);
195 }
196
197 AsyncSocket::AsyncSocket(EventBase* evb,
198                            const std::string& ip,
199                            uint16_t port,
200                            uint32_t connectTimeout)
201   : eventBase_(evb)
202   , writeTimeout_(this, evb)
203   , ioHandler_(this, evb) {
204   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
205   init();
206   connect(nullptr, ip, port, connectTimeout);
207 }
208
209 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
210   : eventBase_(evb)
211   , writeTimeout_(this, evb)
212   , ioHandler_(this, evb, fd) {
213   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
214           << fd << ")";
215   init();
216   fd_ = fd;
217   state_ = StateEnum::ESTABLISHED;
218 }
219
220 // init() method, since constructor forwarding isn't supported in most
221 // compilers yet.
222 void AsyncSocket::init() {
223   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
224   shutdownFlags_ = 0;
225   state_ = StateEnum::UNINIT;
226   eventFlags_ = EventHandler::NONE;
227   fd_ = -1;
228   sendTimeout_ = 0;
229   maxReadsPerEvent_ = 0;
230   connectCallback_ = nullptr;
231   readCallback_ = nullptr;
232   writeReqHead_ = nullptr;
233   writeReqTail_ = nullptr;
234   shutdownSocketSet_ = nullptr;
235   appBytesWritten_ = 0;
236   appBytesReceived_ = 0;
237 }
238
239 AsyncSocket::~AsyncSocket() {
240   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
241           << ", evb=" << eventBase_ << ", fd=" << fd_
242           << ", state=" << state_ << ")";
243 }
244
245 void AsyncSocket::destroy() {
246   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
247           << ", fd=" << fd_ << ", state=" << state_;
248   // When destroy is called, close the socket immediately
249   closeNow();
250
251   // Then call DelayedDestruction::destroy() to take care of
252   // whether or not we need immediate or delayed destruction
253   DelayedDestruction::destroy();
254 }
255
256 int AsyncSocket::detachFd() {
257   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
258           << ", evb=" << eventBase_ << ", state=" << state_
259           << ", events=" << std::hex << eventFlags_ << ")";
260   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
261   // actually close the descriptor.
262   if (shutdownSocketSet_) {
263     shutdownSocketSet_->remove(fd_);
264   }
265   int fd = fd_;
266   fd_ = -1;
267   // Call closeNow() to invoke all pending callbacks with an error.
268   closeNow();
269   // Update the EventHandler to stop using this fd.
270   // This can only be done after closeNow() unregisters the handler.
271   ioHandler_.changeHandlerFD(-1);
272   return fd;
273 }
274
275 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
276   if (shutdownSocketSet_ == newSS) {
277     return;
278   }
279   if (shutdownSocketSet_ && fd_ != -1) {
280     shutdownSocketSet_->remove(fd_);
281   }
282   shutdownSocketSet_ = newSS;
283   if (shutdownSocketSet_ && fd_ != -1) {
284     shutdownSocketSet_->add(fd_);
285   }
286 }
287
288 void AsyncSocket::connect(ConnectCallback* callback,
289                            const folly::SocketAddress& address,
290                            int timeout,
291                            const OptionMap &options,
292                            const folly::SocketAddress& bindAddr) noexcept {
293   DestructorGuard dg(this);
294   assert(eventBase_->isInEventBaseThread());
295
296   addr_ = address;
297
298   // Make sure we're in the uninitialized state
299   if (state_ != StateEnum::UNINIT) {
300     return invalidState(callback);
301   }
302
303   assert(fd_ == -1);
304   state_ = StateEnum::CONNECTING;
305   connectCallback_ = callback;
306
307   sockaddr_storage addrStorage;
308   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
309
310   try {
311     // Create the socket
312     // Technically the first parameter should actually be a protocol family
313     // constant (PF_xxx) rather than an address family (AF_xxx), but the
314     // distinction is mainly just historical.  In pretty much all
315     // implementations the PF_foo and AF_foo constants are identical.
316     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
317     if (fd_ < 0) {
318       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
319                                 withAddr("failed to create socket"), errno);
320     }
321     if (shutdownSocketSet_) {
322       shutdownSocketSet_->add(fd_);
323     }
324     ioHandler_.changeHandlerFD(fd_);
325
326     // Set the FD_CLOEXEC flag so that the socket will be closed if the program
327     // later forks and execs.
328     int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
329     if (rv != 0) {
330       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
331                                 withAddr("failed to set close-on-exec flag"),
332                                 errno);
333     }
334
335     // Put the socket in non-blocking mode
336     int flags = fcntl(fd_, F_GETFL, 0);
337     if (flags == -1) {
338       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
339                                 withAddr("failed to get socket flags"), errno);
340     }
341     rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
342     if (rv == -1) {
343       throw AsyncSocketException(
344           AsyncSocketException::INTERNAL_ERROR,
345           withAddr("failed to put socket in non-blocking mode"),
346           errno);
347     }
348
349 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
350     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
351     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
352     if (rv == -1) {
353       throw AsyncSocketException(
354           AsyncSocketException::INTERNAL_ERROR,
355           "failed to enable F_SETNOSIGPIPE on socket",
356           errno);
357     }
358 #endif
359
360     // By default, turn on TCP_NODELAY
361     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
362     // setNoDelay() will log an error message if it fails.
363     if (address.getFamily() != AF_UNIX) {
364       (void)setNoDelay(true);
365     }
366
367     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
368             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
369
370     // bind the socket
371     if (bindAddr != anyAddress) {
372       int one = 1;
373       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
374         doClose();
375         throw AsyncSocketException(
376           AsyncSocketException::NOT_OPEN,
377           "failed to setsockopt prior to bind on " + bindAddr.describe(),
378           errno);
379       }
380
381       bindAddr.getAddress(&addrStorage);
382
383       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
384         doClose();
385         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
386                                   "failed to bind to async socket: " +
387                                   bindAddr.describe(),
388                                   errno);
389       }
390     }
391
392     // Apply the additional options if any.
393     for (const auto& opt: options) {
394       int rv = opt.first.apply(fd_, opt.second);
395       if (rv != 0) {
396         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
397                                   withAddr("failed to set socket option"),
398                                   errno);
399       }
400     }
401
402     // Perform the connect()
403     address.getAddress(&addrStorage);
404
405     rv = ::connect(fd_, saddr, address.getActualSize());
406     if (rv < 0) {
407       if (errno == EINPROGRESS) {
408         // Connection in progress.
409         if (timeout > 0) {
410           // Start a timer in case the connection takes too long.
411           if (!writeTimeout_.scheduleTimeout(timeout)) {
412             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
413                 withAddr("failed to schedule AsyncSocket connect timeout"));
414           }
415         }
416
417         // Register for write events, so we'll
418         // be notified when the connection finishes/fails.
419         // Note that we don't register for a persistent event here.
420         assert(eventFlags_ == EventHandler::NONE);
421         eventFlags_ = EventHandler::WRITE;
422         if (!ioHandler_.registerHandler(eventFlags_)) {
423           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
424               withAddr("failed to register AsyncSocket connect handler"));
425         }
426         return;
427       } else {
428         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
429                                   "connect failed (immediately)", errno);
430       }
431     }
432
433     // If we're still here the connect() succeeded immediately.
434     // Fall through to call the callback outside of this try...catch block
435   } catch (const AsyncSocketException& ex) {
436     return failConnect(__func__, ex);
437   } catch (const std::exception& ex) {
438     // shouldn't happen, but handle it just in case
439     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
440                << "): unexpected " << typeid(ex).name() << " exception: "
441                << ex.what();
442     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
443                             withAddr(string("unexpected exception: ") +
444                                      ex.what()));
445     return failConnect(__func__, tex);
446   }
447
448   // The connection succeeded immediately
449   // The read callback may not have been set yet, and no writes may be pending
450   // yet, so we don't have to register for any events at the moment.
451   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
452   assert(readCallback_ == nullptr);
453   assert(writeReqHead_ == nullptr);
454   state_ = StateEnum::ESTABLISHED;
455   if (callback) {
456     connectCallback_ = nullptr;
457     callback->connectSuccess();
458   }
459 }
460
461 void AsyncSocket::connect(ConnectCallback* callback,
462                            const string& ip, uint16_t port,
463                            int timeout,
464                            const OptionMap &options) noexcept {
465   DestructorGuard dg(this);
466   try {
467     connectCallback_ = callback;
468     connect(callback, folly::SocketAddress(ip, port), timeout, options);
469   } catch (const std::exception& ex) {
470     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
471                             ex.what());
472     return failConnect(__func__, tex);
473   }
474 }
475
476 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
477   sendTimeout_ = milliseconds;
478   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
479
480   // If we are currently pending on write requests, immediately update
481   // writeTimeout_ with the new value.
482   if ((eventFlags_ & EventHandler::WRITE) &&
483       (state_ != StateEnum::CONNECTING)) {
484     assert(state_ == StateEnum::ESTABLISHED);
485     assert((shutdownFlags_ & SHUT_WRITE) == 0);
486     if (sendTimeout_ > 0) {
487       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
488         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
489             withAddr("failed to reschedule send timeout in setSendTimeout"));
490         return failWrite(__func__, ex);
491       }
492     } else {
493       writeTimeout_.cancelTimeout();
494     }
495   }
496 }
497
498 void AsyncSocket::setReadCB(ReadCallback *callback) {
499   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
500           << ", callback=" << callback << ", state=" << state_;
501
502   // Short circuit if callback is the same as the existing readCallback_.
503   //
504   // Note that this is needed for proper functioning during some cleanup cases.
505   // During cleanup we allow setReadCallback(nullptr) to be called even if the
506   // read callback is already unset and we have been detached from an event
507   // base.  This check prevents us from asserting
508   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
509   if (callback == readCallback_) {
510     return;
511   }
512
513   if (shutdownFlags_ & SHUT_READ) {
514     // Reads have already been shut down on this socket.
515     //
516     // Allow setReadCallback(nullptr) to be called in this case, but don't
517     // allow a new callback to be set.
518     //
519     // For example, setReadCallback(nullptr) can happen after an error if we
520     // invoke some other error callback before invoking readError().  The other
521     // error callback that is invoked first may go ahead and clear the read
522     // callback before we get a chance to invoke readError().
523     if (callback != nullptr) {
524       return invalidState(callback);
525     }
526     assert((eventFlags_ & EventHandler::READ) == 0);
527     readCallback_ = nullptr;
528     return;
529   }
530
531   DestructorGuard dg(this);
532   assert(eventBase_->isInEventBaseThread());
533
534   switch ((StateEnum)state_) {
535     case StateEnum::CONNECTING:
536       // For convenience, we allow the read callback to be set while we are
537       // still connecting.  We just store the callback for now.  Once the
538       // connection completes we'll register for read events.
539       readCallback_ = callback;
540       return;
541     case StateEnum::ESTABLISHED:
542     {
543       readCallback_ = callback;
544       uint16_t oldFlags = eventFlags_;
545       if (readCallback_) {
546         eventFlags_ |= EventHandler::READ;
547       } else {
548         eventFlags_ &= ~EventHandler::READ;
549       }
550
551       // Update our registration if our flags have changed
552       if (eventFlags_ != oldFlags) {
553         // We intentionally ignore the return value here.
554         // updateEventRegistration() will move us into the error state if it
555         // fails, and we don't need to do anything else here afterwards.
556         (void)updateEventRegistration();
557       }
558
559       if (readCallback_) {
560         checkForImmediateRead();
561       }
562       return;
563     }
564     case StateEnum::CLOSED:
565     case StateEnum::ERROR:
566       // We should never reach here.  SHUT_READ should always be set
567       // if we are in STATE_CLOSED or STATE_ERROR.
568       assert(false);
569       return invalidState(callback);
570     case StateEnum::UNINIT:
571       // We do not allow setReadCallback() to be called before we start
572       // connecting.
573       return invalidState(callback);
574   }
575
576   // We don't put a default case in the switch statement, so that the compiler
577   // will warn us to update the switch statement if a new state is added.
578   return invalidState(callback);
579 }
580
581 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
582   return readCallback_;
583 }
584
585 void AsyncSocket::write(WriteCallback* callback,
586                          const void* buf, size_t bytes, WriteFlags flags) {
587   iovec op;
588   op.iov_base = const_cast<void*>(buf);
589   op.iov_len = bytes;
590   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
591 }
592
593 void AsyncSocket::writev(WriteCallback* callback,
594                           const iovec* vec,
595                           size_t count,
596                           WriteFlags flags) {
597   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
598 }
599
600 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
601                               WriteFlags flags) {
602   size_t count = buf->countChainElements();
603   if (count <= 64) {
604     iovec vec[count];
605     writeChainImpl(callback, vec, count, std::move(buf), flags);
606   } else {
607     iovec* vec = new iovec[count];
608     writeChainImpl(callback, vec, count, std::move(buf), flags);
609     delete[] vec;
610   }
611 }
612
613 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
614     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
615   const IOBuf* head = buf.get();
616   const IOBuf* next = head;
617   unsigned i = 0;
618   do {
619     vec[i].iov_base = const_cast<uint8_t *>(next->data());
620     vec[i].iov_len = next->length();
621     // IOBuf can get confused by empty iovec buffers, so increment the
622     // output pointer only if the iovec buffer is non-empty.  We could
623     // end the loop with i < count, but that's ok.
624     if (vec[i].iov_len != 0) {
625       i++;
626     }
627     next = next->next();
628   } while (next != head);
629   writeImpl(callback, vec, i, std::move(buf), flags);
630 }
631
632 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
633                              size_t count, unique_ptr<IOBuf>&& buf,
634                              WriteFlags flags) {
635   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
636           << ", callback=" << callback << ", count=" << count
637           << ", state=" << state_;
638   DestructorGuard dg(this);
639   unique_ptr<IOBuf>ioBuf(std::move(buf));
640   assert(eventBase_->isInEventBaseThread());
641
642   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
643     // No new writes may be performed after the write side of the socket has
644     // been shutdown.
645     //
646     // We could just call callback->writeError() here to fail just this write.
647     // However, fail hard and use invalidState() to fail all outstanding
648     // callbacks and move the socket into the error state.  There's most likely
649     // a bug in the caller's code, so we abort everything rather than trying to
650     // proceed as best we can.
651     return invalidState(callback);
652   }
653
654   uint32_t countWritten = 0;
655   uint32_t partialWritten = 0;
656   int bytesWritten = 0;
657   bool mustRegister = false;
658   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
659     if (writeReqHead_ == nullptr) {
660       // If we are established and there are no other writes pending,
661       // we can attempt to perform the write immediately.
662       assert(writeReqTail_ == nullptr);
663       assert((eventFlags_ & EventHandler::WRITE) == 0);
664
665       bytesWritten = performWrite(vec, count, flags,
666                                   &countWritten, &partialWritten);
667       if (bytesWritten < 0) {
668         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
669                                withAddr("writev failed"), errno);
670         return failWrite(__func__, callback, 0, ex);
671       } else if (countWritten == count) {
672         // We successfully wrote everything.
673         // Invoke the callback and return.
674         if (callback) {
675           callback->writeSuccess();
676         }
677         return;
678       } // else { continue writing the next writeReq }
679       mustRegister = true;
680     }
681   } else if (!connecting()) {
682     // Invalid state for writing
683     return invalidState(callback);
684   }
685
686   // Create a new WriteRequest to add to the queue
687   WriteRequest* req;
688   try {
689     req = WriteRequest::newRequest(callback, vec + countWritten,
690                                    count - countWritten, std::move(ioBuf),
691                                    flags);
692   } catch (const std::exception& ex) {
693     // we mainly expect to catch std::bad_alloc here
694     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
695         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
696     return failWrite(__func__, callback, bytesWritten, tex);
697   }
698   req->consume(0, partialWritten, bytesWritten);
699   if (writeReqTail_ == nullptr) {
700     assert(writeReqHead_ == nullptr);
701     writeReqHead_ = writeReqTail_ = req;
702   } else {
703     writeReqTail_->append(req);
704     writeReqTail_ = req;
705   }
706
707   // Register for write events if are established and not currently
708   // waiting on write events
709   if (mustRegister) {
710     assert(state_ == StateEnum::ESTABLISHED);
711     assert((eventFlags_ & EventHandler::WRITE) == 0);
712     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
713       assert(state_ == StateEnum::ERROR);
714       return;
715     }
716     if (sendTimeout_ > 0) {
717       // Schedule a timeout to fire if the write takes too long.
718       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
719         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
720                                withAddr("failed to schedule send timeout"));
721         return failWrite(__func__, ex);
722       }
723     }
724   }
725 }
726
727 void AsyncSocket::close() {
728   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
729           << ", state=" << state_ << ", shutdownFlags="
730           << std::hex << (int) shutdownFlags_;
731
732   // close() is only different from closeNow() when there are pending writes
733   // that need to drain before we can close.  In all other cases, just call
734   // closeNow().
735   //
736   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
737   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
738   // is still running.  (e.g., If there are multiple pending writes, and we
739   // call writeError() on the first one, it may call close().  In this case we
740   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
741   // writes will still be in the queue.)
742   //
743   // We only need to drain pending writes if we are still in STATE_CONNECTING
744   // or STATE_ESTABLISHED
745   if ((writeReqHead_ == nullptr) ||
746       !(state_ == StateEnum::CONNECTING ||
747       state_ == StateEnum::ESTABLISHED)) {
748     closeNow();
749     return;
750   }
751
752   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
753   // destroyed until close() returns.
754   DestructorGuard dg(this);
755   assert(eventBase_->isInEventBaseThread());
756
757   // Since there are write requests pending, we have to set the
758   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
759   // connect finishes and we finish writing these requests.
760   //
761   // Set SHUT_READ to indicate that reads are shut down, and set the
762   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
763   // pending writes complete.
764   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
765
766   // If a read callback is set, invoke readEOF() immediately to inform it that
767   // the socket has been closed and no more data can be read.
768   if (readCallback_) {
769     // Disable reads if they are enabled
770     if (!updateEventRegistration(0, EventHandler::READ)) {
771       // We're now in the error state; callbacks have been cleaned up
772       assert(state_ == StateEnum::ERROR);
773       assert(readCallback_ == nullptr);
774     } else {
775       ReadCallback* callback = readCallback_;
776       readCallback_ = nullptr;
777       callback->readEOF();
778     }
779   }
780 }
781
782 void AsyncSocket::closeNow() {
783   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
784           << ", state=" << state_ << ", shutdownFlags="
785           << std::hex << (int) shutdownFlags_;
786   DestructorGuard dg(this);
787   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
788
789   switch (state_) {
790     case StateEnum::ESTABLISHED:
791     case StateEnum::CONNECTING:
792     {
793       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
794       state_ = StateEnum::CLOSED;
795
796       // If the write timeout was set, cancel it.
797       writeTimeout_.cancelTimeout();
798
799       // If we are registered for I/O events, unregister.
800       if (eventFlags_ != EventHandler::NONE) {
801         eventFlags_ = EventHandler::NONE;
802         if (!updateEventRegistration()) {
803           // We will have been moved into the error state.
804           assert(state_ == StateEnum::ERROR);
805           return;
806         }
807       }
808
809       if (fd_ >= 0) {
810         ioHandler_.changeHandlerFD(-1);
811         doClose();
812       }
813
814       if (connectCallback_) {
815         ConnectCallback* callback = connectCallback_;
816         connectCallback_ = nullptr;
817         callback->connectErr(socketClosedLocallyEx);
818       }
819
820       failAllWrites(socketClosedLocallyEx);
821
822       if (readCallback_) {
823         ReadCallback* callback = readCallback_;
824         readCallback_ = nullptr;
825         callback->readEOF();
826       }
827       return;
828     }
829     case StateEnum::CLOSED:
830       // Do nothing.  It's possible that we are being called recursively
831       // from inside a callback that we invoked inside another call to close()
832       // that is still running.
833       return;
834     case StateEnum::ERROR:
835       // Do nothing.  The error handling code has performed (or is performing)
836       // cleanup.
837       return;
838     case StateEnum::UNINIT:
839       assert(eventFlags_ == EventHandler::NONE);
840       assert(connectCallback_ == nullptr);
841       assert(readCallback_ == nullptr);
842       assert(writeReqHead_ == nullptr);
843       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
844       state_ = StateEnum::CLOSED;
845       return;
846   }
847
848   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
849               << ") called in unknown state " << state_;
850 }
851
852 void AsyncSocket::closeWithReset() {
853   // Enable SO_LINGER, with the linger timeout set to 0.
854   // This will trigger a TCP reset when we close the socket.
855   if (fd_ >= 0) {
856     struct linger optLinger = {1, 0};
857     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
858       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
859               << "on " << fd_ << ": errno=" << errno;
860     }
861   }
862
863   // Then let closeNow() take care of the rest
864   closeNow();
865 }
866
867 void AsyncSocket::shutdownWrite() {
868   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
869           << ", state=" << state_ << ", shutdownFlags="
870           << std::hex << (int) shutdownFlags_;
871
872   // If there are no pending writes, shutdownWrite() is identical to
873   // shutdownWriteNow().
874   if (writeReqHead_ == nullptr) {
875     shutdownWriteNow();
876     return;
877   }
878
879   assert(eventBase_->isInEventBaseThread());
880
881   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
882   // shutdown will be performed once all writes complete.
883   shutdownFlags_ |= SHUT_WRITE_PENDING;
884 }
885
886 void AsyncSocket::shutdownWriteNow() {
887   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
888           << ", fd=" << fd_ << ", state=" << state_
889           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
890
891   if (shutdownFlags_ & SHUT_WRITE) {
892     // Writes are already shutdown; nothing else to do.
893     return;
894   }
895
896   // If SHUT_READ is already set, just call closeNow() to completely
897   // close the socket.  This can happen if close() was called with writes
898   // pending, and then shutdownWriteNow() is called before all pending writes
899   // complete.
900   if (shutdownFlags_ & SHUT_READ) {
901     closeNow();
902     return;
903   }
904
905   DestructorGuard dg(this);
906   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
907
908   switch (static_cast<StateEnum>(state_)) {
909     case StateEnum::ESTABLISHED:
910     {
911       shutdownFlags_ |= SHUT_WRITE;
912
913       // If the write timeout was set, cancel it.
914       writeTimeout_.cancelTimeout();
915
916       // If we are registered for write events, unregister.
917       if (!updateEventRegistration(0, EventHandler::WRITE)) {
918         // We will have been moved into the error state.
919         assert(state_ == StateEnum::ERROR);
920         return;
921       }
922
923       // Shutdown writes on the file descriptor
924       ::shutdown(fd_, SHUT_WR);
925
926       // Immediately fail all write requests
927       failAllWrites(socketShutdownForWritesEx);
928       return;
929     }
930     case StateEnum::CONNECTING:
931     {
932       // Set the SHUT_WRITE_PENDING flag.
933       // When the connection completes, it will check this flag,
934       // shutdown the write half of the socket, and then set SHUT_WRITE.
935       shutdownFlags_ |= SHUT_WRITE_PENDING;
936
937       // Immediately fail all write requests
938       failAllWrites(socketShutdownForWritesEx);
939       return;
940     }
941     case StateEnum::UNINIT:
942       // Callers normally shouldn't call shutdownWriteNow() before the socket
943       // even starts connecting.  Nonetheless, go ahead and set
944       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
945       // immediately shut down the write side of the socket.
946       shutdownFlags_ |= SHUT_WRITE_PENDING;
947       return;
948     case StateEnum::CLOSED:
949     case StateEnum::ERROR:
950       // We should never get here.  SHUT_WRITE should always be set
951       // in STATE_CLOSED and STATE_ERROR.
952       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
953                  << ", fd=" << fd_ << ") in unexpected state " << state_
954                  << " with SHUT_WRITE not set ("
955                  << std::hex << (int) shutdownFlags_ << ")";
956       assert(false);
957       return;
958   }
959
960   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
961               << fd_ << ") called in unknown state " << state_;
962 }
963
964 bool AsyncSocket::readable() const {
965   if (fd_ == -1) {
966     return false;
967   }
968   struct pollfd fds[1];
969   fds[0].fd = fd_;
970   fds[0].events = POLLIN;
971   fds[0].revents = 0;
972   int rc = poll(fds, 1, 0);
973   return rc == 1;
974 }
975
976 bool AsyncSocket::isPending() const {
977   return ioHandler_.isPending();
978 }
979
980 bool AsyncSocket::hangup() const {
981   if (fd_ == -1) {
982     // sanity check, no one should ask for hangup if we are not connected.
983     assert(false);
984     return false;
985   }
986 #ifdef POLLRDHUP // Linux-only
987   struct pollfd fds[1];
988   fds[0].fd = fd_;
989   fds[0].events = POLLRDHUP|POLLHUP;
990   fds[0].revents = 0;
991   poll(fds, 1, 0);
992   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
993 #else
994   return false;
995 #endif
996 }
997
998 bool AsyncSocket::good() const {
999   return ((state_ == StateEnum::CONNECTING ||
1000           state_ == StateEnum::ESTABLISHED) &&
1001           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1002 }
1003
1004 bool AsyncSocket::error() const {
1005   return (state_ == StateEnum::ERROR);
1006 }
1007
1008 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1009   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1010           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1011           << ", state=" << state_ << ", events="
1012           << std::hex << eventFlags_ << ")";
1013   assert(eventBase_ == nullptr);
1014   assert(eventBase->isInEventBaseThread());
1015
1016   eventBase_ = eventBase;
1017   ioHandler_.attachEventBase(eventBase);
1018   writeTimeout_.attachEventBase(eventBase);
1019 }
1020
1021 void AsyncSocket::detachEventBase() {
1022   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1023           << ", old evb=" << eventBase_ << ", state=" << state_
1024           << ", events=" << std::hex << eventFlags_ << ")";
1025   assert(eventBase_ != nullptr);
1026   assert(eventBase_->isInEventBaseThread());
1027
1028   eventBase_ = nullptr;
1029   ioHandler_.detachEventBase();
1030   writeTimeout_.detachEventBase();
1031 }
1032
1033 bool AsyncSocket::isDetachable() const {
1034   DCHECK(eventBase_ != nullptr);
1035   DCHECK(eventBase_->isInEventBaseThread());
1036
1037   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1038 }
1039
1040 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1041   address->setFromLocalAddress(fd_);
1042 }
1043
1044 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1045   if (!addr_.isInitialized()) {
1046     addr_.setFromPeerAddress(fd_);
1047   }
1048   *address = addr_;
1049 }
1050
1051 int AsyncSocket::setNoDelay(bool noDelay) {
1052   if (fd_ < 0) {
1053     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1054                << this << "(state=" << state_ << ")";
1055     return EINVAL;
1056
1057   }
1058
1059   int value = noDelay ? 1 : 0;
1060   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1061     int errnoCopy = errno;
1062     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1063             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1064             << strerror(errnoCopy);
1065     return errnoCopy;
1066   }
1067
1068   return 0;
1069 }
1070
1071 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1072
1073   #ifndef TCP_CONGESTION
1074   #define TCP_CONGESTION  13
1075   #endif
1076
1077   if (fd_ < 0) {
1078     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1079                << "socket " << this << "(state=" << state_ << ")";
1080     return EINVAL;
1081
1082   }
1083
1084   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1085         cname.length() + 1) != 0) {
1086     int errnoCopy = errno;
1087     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1088             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1089             << strerror(errnoCopy);
1090     return errnoCopy;
1091   }
1092
1093   return 0;
1094 }
1095
1096 int AsyncSocket::setQuickAck(bool quickack) {
1097   if (fd_ < 0) {
1098     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1099                << this << "(state=" << state_ << ")";
1100     return EINVAL;
1101
1102   }
1103
1104 #ifdef TCP_QUICKACK // Linux-only
1105   int value = quickack ? 1 : 0;
1106   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1107     int errnoCopy = errno;
1108     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1109             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1110             << strerror(errnoCopy);
1111     return errnoCopy;
1112   }
1113
1114   return 0;
1115 #else
1116   return ENOSYS;
1117 #endif
1118 }
1119
1120 int AsyncSocket::setSendBufSize(size_t bufsize) {
1121   if (fd_ < 0) {
1122     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1123                << this << "(state=" << state_ << ")";
1124     return EINVAL;
1125   }
1126
1127   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1128     int errnoCopy = errno;
1129     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1130             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1131             << strerror(errnoCopy);
1132     return errnoCopy;
1133   }
1134
1135   return 0;
1136 }
1137
1138 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1139   if (fd_ < 0) {
1140     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1141                << this << "(state=" << state_ << ")";
1142     return EINVAL;
1143   }
1144
1145   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1146     int errnoCopy = errno;
1147     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1148             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1149             << strerror(errnoCopy);
1150     return errnoCopy;
1151   }
1152
1153   return 0;
1154 }
1155
1156 int AsyncSocket::setTCPProfile(int profd) {
1157   if (fd_ < 0) {
1158     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1159                << this << "(state=" << state_ << ")";
1160     return EINVAL;
1161   }
1162
1163   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1164     int errnoCopy = errno;
1165     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1166             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1167             << strerror(errnoCopy);
1168     return errnoCopy;
1169   }
1170
1171   return 0;
1172 }
1173
1174 void AsyncSocket::ioReady(uint16_t events) noexcept {
1175   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1176           << ", events=" << std::hex << events << ", state=" << state_;
1177   DestructorGuard dg(this);
1178   assert(events & EventHandler::READ_WRITE);
1179   assert(eventBase_->isInEventBaseThread());
1180
1181   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1182   if (relevantEvents == EventHandler::READ) {
1183     handleRead();
1184   } else if (relevantEvents == EventHandler::WRITE) {
1185     handleWrite();
1186   } else if (relevantEvents == EventHandler::READ_WRITE) {
1187     EventBase* originalEventBase = eventBase_;
1188     // If both read and write events are ready, process writes first.
1189     handleWrite();
1190
1191     // Return now if handleWrite() detached us from our EventBase
1192     if (eventBase_ != originalEventBase) {
1193       return;
1194     }
1195
1196     // Only call handleRead() if a read callback is still installed.
1197     // (It's possible that the read callback was uninstalled during
1198     // handleWrite().)
1199     if (readCallback_) {
1200       handleRead();
1201     }
1202   } else {
1203     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1204                << std::hex << events << "(this=" << this << ")";
1205     abort();
1206   }
1207 }
1208
1209 ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
1210   ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
1211   if (bytes < 0) {
1212     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1213       // No more data to read right now.
1214       return READ_BLOCKING;
1215     } else {
1216       return READ_ERROR;
1217     }
1218   } else {
1219     appBytesReceived_ += bytes;
1220     return bytes;
1221   }
1222 }
1223
1224 void AsyncSocket::handleRead() noexcept {
1225   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1226           << ", state=" << state_;
1227   assert(state_ == StateEnum::ESTABLISHED);
1228   assert((shutdownFlags_ & SHUT_READ) == 0);
1229   assert(readCallback_ != nullptr);
1230   assert(eventFlags_ & EventHandler::READ);
1231
1232   // Loop until:
1233   // - a read attempt would block
1234   // - readCallback_ is uninstalled
1235   // - the number of loop iterations exceeds the optional maximum
1236   // - this AsyncSocket is moved to another EventBase
1237   //
1238   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1239   // which is why need to check for it here.
1240   //
1241   // The last bullet point is slightly subtle.  readDataAvailable() may also
1242   // detach this socket from this EventBase.  However, before
1243   // readDataAvailable() returns another thread may pick it up, attach it to
1244   // a different EventBase, and install another readCallback_.  We need to
1245   // exit immediately after readDataAvailable() returns if the eventBase_ has
1246   // changed.  (The caller must perform some sort of locking to transfer the
1247   // AsyncSocket between threads properly.  This will be sufficient to ensure
1248   // that this thread sees the updated eventBase_ variable after
1249   // readDataAvailable() returns.)
1250   uint16_t numReads = 0;
1251   EventBase* originalEventBase = eventBase_;
1252   while (readCallback_ && eventBase_ == originalEventBase) {
1253     // Get the buffer to read into.
1254     void* buf = nullptr;
1255     size_t buflen = 0;
1256     try {
1257       readCallback_->getReadBuffer(&buf, &buflen);
1258     } catch (const AsyncSocketException& ex) {
1259       return failRead(__func__, ex);
1260     } catch (const std::exception& ex) {
1261       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1262                               string("ReadCallback::getReadBuffer() "
1263                                      "threw exception: ") +
1264                               ex.what());
1265       return failRead(__func__, tex);
1266     } catch (...) {
1267       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1268                              "ReadCallback::getReadBuffer() threw "
1269                              "non-exception type");
1270       return failRead(__func__, ex);
1271     }
1272     if (buf == nullptr || buflen == 0) {
1273       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1274                              "ReadCallback::getReadBuffer() returned "
1275                              "empty buffer");
1276       return failRead(__func__, ex);
1277     }
1278
1279     // Perform the read
1280     ssize_t bytesRead = performRead(buf, buflen);
1281     if (bytesRead > 0) {
1282       readCallback_->readDataAvailable(bytesRead);
1283       // Fall through and continue around the loop if the read
1284       // completely filled the available buffer.
1285       // Note that readCallback_ may have been uninstalled or changed inside
1286       // readDataAvailable().
1287       if (size_t(bytesRead) < buflen) {
1288         return;
1289       }
1290     } else if (bytesRead == READ_BLOCKING) {
1291         // No more data to read right now.
1292         return;
1293     } else if (bytesRead == READ_ERROR) {
1294       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1295                              withAddr("recv() failed"), errno);
1296       return failRead(__func__, ex);
1297     } else {
1298       assert(bytesRead == READ_EOF);
1299       // EOF
1300       shutdownFlags_ |= SHUT_READ;
1301       if (!updateEventRegistration(0, EventHandler::READ)) {
1302         // we've already been moved into STATE_ERROR
1303         assert(state_ == StateEnum::ERROR);
1304         assert(readCallback_ == nullptr);
1305         return;
1306       }
1307
1308       ReadCallback* callback = readCallback_;
1309       readCallback_ = nullptr;
1310       callback->readEOF();
1311       return;
1312     }
1313     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1314       return;
1315     }
1316   }
1317 }
1318
1319 /**
1320  * This function attempts to write as much data as possible, until no more data
1321  * can be written.
1322  *
1323  * - If it sends all available data, it unregisters for write events, and stops
1324  *   the writeTimeout_.
1325  *
1326  * - If not all of the data can be sent immediately, it reschedules
1327  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1328  *   registered for write events.
1329  */
1330 void AsyncSocket::handleWrite() noexcept {
1331   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1332           << ", state=" << state_;
1333   if (state_ == StateEnum::CONNECTING) {
1334     handleConnect();
1335     return;
1336   }
1337
1338   // Normal write
1339   assert(state_ == StateEnum::ESTABLISHED);
1340   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1341   assert(writeReqHead_ != nullptr);
1342
1343   // Loop until we run out of write requests,
1344   // or until this socket is moved to another EventBase.
1345   // (See the comment in handleRead() explaining how this can happen.)
1346   EventBase* originalEventBase = eventBase_;
1347   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1348     uint32_t countWritten;
1349     uint32_t partialWritten;
1350     WriteFlags writeFlags = writeReqHead_->flags();
1351     if (writeReqHead_->getNext() != nullptr) {
1352       writeFlags = writeFlags | WriteFlags::CORK;
1353     }
1354     int bytesWritten = performWrite(writeReqHead_->getOps(),
1355                                     writeReqHead_->getOpCount(),
1356                                     writeFlags, &countWritten, &partialWritten);
1357     if (bytesWritten < 0) {
1358       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1359                              withAddr("writev() failed"), errno);
1360       return failWrite(__func__, ex);
1361     } else if (countWritten == writeReqHead_->getOpCount()) {
1362       // We finished this request
1363       WriteRequest* req = writeReqHead_;
1364       writeReqHead_ = req->getNext();
1365
1366       if (writeReqHead_ == nullptr) {
1367         writeReqTail_ = nullptr;
1368         // This is the last write request.
1369         // Unregister for write events and cancel the send timer
1370         // before we invoke the callback.  We have to update the state properly
1371         // before calling the callback, since it may want to detach us from
1372         // the EventBase.
1373         if (eventFlags_ & EventHandler::WRITE) {
1374           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1375             assert(state_ == StateEnum::ERROR);
1376             return;
1377           }
1378           // Stop the send timeout
1379           writeTimeout_.cancelTimeout();
1380         }
1381         assert(!writeTimeout_.isScheduled());
1382
1383         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1384         // we finish sending the last write request.
1385         //
1386         // We have to do this before invoking writeSuccess(), since
1387         // writeSuccess() may detach us from our EventBase.
1388         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1389           assert(connectCallback_ == nullptr);
1390           shutdownFlags_ |= SHUT_WRITE;
1391
1392           if (shutdownFlags_ & SHUT_READ) {
1393             // Reads have already been shutdown.  Fully close the socket and
1394             // move to STATE_CLOSED.
1395             //
1396             // Note: This code currently moves us to STATE_CLOSED even if
1397             // close() hasn't ever been called.  This can occur if we have
1398             // received EOF from the peer and shutdownWrite() has been called
1399             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1400             // case, until close() is actually called?  I can't think of a
1401             // reason why we would need to do so.  No other operations besides
1402             // calling close() or destroying the socket can be performed at
1403             // this point.
1404             assert(readCallback_ == nullptr);
1405             state_ = StateEnum::CLOSED;
1406             if (fd_ >= 0) {
1407               ioHandler_.changeHandlerFD(-1);
1408               doClose();
1409             }
1410           } else {
1411             // Reads are still enabled, so we are only doing a half-shutdown
1412             ::shutdown(fd_, SHUT_WR);
1413           }
1414         }
1415       }
1416
1417       // Invoke the callback
1418       WriteCallback* callback = req->getCallback();
1419       req->destroy();
1420       if (callback) {
1421         callback->writeSuccess();
1422       }
1423       // We'll continue around the loop, trying to write another request
1424     } else {
1425       // Partial write.
1426       writeReqHead_->consume(countWritten, partialWritten, bytesWritten);
1427       // Stop after a partial write; it's highly likely that a subsequent write
1428       // attempt will just return EAGAIN.
1429       //
1430       // Ensure that we are registered for write events.
1431       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1432         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1433           assert(state_ == StateEnum::ERROR);
1434           return;
1435         }
1436       }
1437
1438       // Reschedule the send timeout, since we have made some write progress.
1439       if (sendTimeout_ > 0) {
1440         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1441           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1442               withAddr("failed to reschedule write timeout"));
1443           return failWrite(__func__, ex);
1444         }
1445       }
1446       return;
1447     }
1448   }
1449 }
1450
1451 void AsyncSocket::checkForImmediateRead() noexcept {
1452   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1453   // (However, note that some subclasses do override this method.)
1454   //
1455   // Simply calling handleRead() here would be bad, as this would call
1456   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1457   // buffer even though no data may be available.  This would waste lots of
1458   // memory, since the buffer will sit around unused until the socket actually
1459   // becomes readable.
1460   //
1461   // Checking if the socket is readable now also seems like it would probably
1462   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1463   // would just waste an extra system call.  Even if it is readable, waiting to
1464   // find out from libevent on the next event loop doesn't seem that bad.
1465 }
1466
1467 void AsyncSocket::handleInitialReadWrite() noexcept {
1468   // Our callers should already be holding a DestructorGuard, but grab
1469   // one here just to make sure, in case one of our calling code paths ever
1470   // changes.
1471   DestructorGuard dg(this);
1472
1473   // If we have a readCallback_, make sure we enable read events.  We
1474   // may already be registered for reads if connectSuccess() set
1475   // the read calback.
1476   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1477     assert(state_ == StateEnum::ESTABLISHED);
1478     assert((shutdownFlags_ & SHUT_READ) == 0);
1479     if (!updateEventRegistration(EventHandler::READ, 0)) {
1480       assert(state_ == StateEnum::ERROR);
1481       return;
1482     }
1483     checkForImmediateRead();
1484   } else if (readCallback_ == nullptr) {
1485     // Unregister for read events.
1486     updateEventRegistration(0, EventHandler::READ);
1487   }
1488
1489   // If we have write requests pending, try to send them immediately.
1490   // Since we just finished accepting, there is a very good chance that we can
1491   // write without blocking.
1492   //
1493   // However, we only process them if EventHandler::WRITE is not already set,
1494   // which means that we're already blocked on a write attempt.  (This can
1495   // happen if connectSuccess() called write() before returning.)
1496   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1497     // Call handleWrite() to perform write processing.
1498     handleWrite();
1499   } else if (writeReqHead_ == nullptr) {
1500     // Unregister for write event.
1501     updateEventRegistration(0, EventHandler::WRITE);
1502   }
1503 }
1504
1505 void AsyncSocket::handleConnect() noexcept {
1506   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1507           << ", state=" << state_;
1508   assert(state_ == StateEnum::CONNECTING);
1509   // SHUT_WRITE can never be set while we are still connecting;
1510   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1511   // finishes
1512   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1513
1514   // In case we had a connect timeout, cancel the timeout
1515   writeTimeout_.cancelTimeout();
1516   // We don't use a persistent registration when waiting on a connect event,
1517   // so we have been automatically unregistered now.  Update eventFlags_ to
1518   // reflect reality.
1519   assert(eventFlags_ == EventHandler::WRITE);
1520   eventFlags_ = EventHandler::NONE;
1521
1522   // Call getsockopt() to check if the connect succeeded
1523   int error;
1524   socklen_t len = sizeof(error);
1525   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1526   if (rv != 0) {
1527     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1528                            withAddr("error calling getsockopt() after connect"),
1529                            errno);
1530     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1531                << fd_ << " host=" << addr_.describe()
1532                << ") exception:" << ex.what();
1533     return failConnect(__func__, ex);
1534   }
1535
1536   if (error != 0) {
1537     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1538                            "connect failed", error);
1539     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1540             << fd_ << " host=" << addr_.describe()
1541             << ") exception: " << ex.what();
1542     return failConnect(__func__, ex);
1543   }
1544
1545   // Move into STATE_ESTABLISHED
1546   state_ = StateEnum::ESTABLISHED;
1547
1548   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1549   // perform, immediately shutdown the write half of the socket.
1550   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1551     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1552     // are still connecting we just abort the connect rather than waiting for
1553     // it to complete.
1554     assert((shutdownFlags_ & SHUT_READ) == 0);
1555     ::shutdown(fd_, SHUT_WR);
1556     shutdownFlags_ |= SHUT_WRITE;
1557   }
1558
1559   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1560           << "successfully connected; state=" << state_;
1561
1562   // Remember the EventBase we are attached to, before we start invoking any
1563   // callbacks (since the callbacks may call detachEventBase()).
1564   EventBase* originalEventBase = eventBase_;
1565
1566   // Call the connect callback.
1567   if (connectCallback_) {
1568     ConnectCallback* callback = connectCallback_;
1569     connectCallback_ = nullptr;
1570     callback->connectSuccess();
1571   }
1572
1573   // Note that the connect callback may have changed our state.
1574   // (set or unset the read callback, called write(), closed the socket, etc.)
1575   // The following code needs to handle these situations correctly.
1576   //
1577   // If the socket has been closed, readCallback_ and writeReqHead_ will
1578   // always be nullptr, so that will prevent us from trying to read or write.
1579   //
1580   // The main thing to check for is if eventBase_ is still originalEventBase.
1581   // If not, we have been detached from this event base, so we shouldn't
1582   // perform any more operations.
1583   if (eventBase_ != originalEventBase) {
1584     return;
1585   }
1586
1587   handleInitialReadWrite();
1588 }
1589
1590 void AsyncSocket::timeoutExpired() noexcept {
1591   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1592           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1593   DestructorGuard dg(this);
1594   assert(eventBase_->isInEventBaseThread());
1595
1596   if (state_ == StateEnum::CONNECTING) {
1597     // connect() timed out
1598     // Unregister for I/O events.
1599     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1600                            "connect timed out");
1601     failConnect(__func__, ex);
1602   } else {
1603     // a normal write operation timed out
1604     assert(state_ == StateEnum::ESTABLISHED);
1605     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1606     failWrite(__func__, ex);
1607   }
1608 }
1609
1610 ssize_t AsyncSocket::performWrite(const iovec* vec,
1611                                    uint32_t count,
1612                                    WriteFlags flags,
1613                                    uint32_t* countWritten,
1614                                    uint32_t* partialWritten) {
1615   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1616   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1617   // (since it may terminate the program if the main program doesn't explicitly
1618   // ignore it).
1619   struct msghdr msg;
1620   msg.msg_name = nullptr;
1621   msg.msg_namelen = 0;
1622   msg.msg_iov = const_cast<iovec *>(vec);
1623 #ifdef IOV_MAX // not defined on Android
1624   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1625 #else
1626   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1627 #endif
1628   msg.msg_control = nullptr;
1629   msg.msg_controllen = 0;
1630   msg.msg_flags = 0;
1631
1632   int msg_flags = MSG_DONTWAIT;
1633
1634 #ifdef MSG_NOSIGNAL // Linux-only
1635   msg_flags |= MSG_NOSIGNAL;
1636   if (isSet(flags, WriteFlags::CORK)) {
1637     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1638     // give it the rest of the data rather than immediately sending a partial
1639     // frame, even when TCP_NODELAY is enabled.
1640     msg_flags |= MSG_MORE;
1641   }
1642 #endif
1643   if (isSet(flags, WriteFlags::EOR)) {
1644     // marks that this is the last byte of a record (response)
1645     msg_flags |= MSG_EOR;
1646   }
1647   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1648   if (totalWritten < 0) {
1649     if (errno == EAGAIN) {
1650       // TCP buffer is full; we can't write any more data right now.
1651       *countWritten = 0;
1652       *partialWritten = 0;
1653       return 0;
1654     }
1655     // error
1656     *countWritten = 0;
1657     *partialWritten = 0;
1658     return -1;
1659   }
1660
1661   appBytesWritten_ += totalWritten;
1662
1663   uint32_t bytesWritten;
1664   uint32_t n;
1665   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1666     const iovec* v = vec + n;
1667     if (v->iov_len > bytesWritten) {
1668       // Partial write finished in the middle of this iovec
1669       *countWritten = n;
1670       *partialWritten = bytesWritten;
1671       return totalWritten;
1672     }
1673
1674     bytesWritten -= v->iov_len;
1675   }
1676
1677   assert(bytesWritten == 0);
1678   *countWritten = n;
1679   *partialWritten = 0;
1680   return totalWritten;
1681 }
1682
1683 /**
1684  * Re-register the EventHandler after eventFlags_ has changed.
1685  *
1686  * If an error occurs, fail() is called to move the socket into the error state
1687  * and call all currently installed callbacks.  After an error, the
1688  * AsyncSocket is completely unregistered.
1689  *
1690  * @return Returns true on succcess, or false on error.
1691  */
1692 bool AsyncSocket::updateEventRegistration() {
1693   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1694           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1695           << ", events=" << std::hex << eventFlags_;
1696   assert(eventBase_->isInEventBaseThread());
1697   if (eventFlags_ == EventHandler::NONE) {
1698     ioHandler_.unregisterHandler();
1699     return true;
1700   }
1701
1702   // Always register for persistent events, so we don't have to re-register
1703   // after being called back.
1704   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1705     eventFlags_ = EventHandler::NONE; // we're not registered after error
1706     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1707         withAddr("failed to update AsyncSocket event registration"));
1708     fail("updateEventRegistration", ex);
1709     return false;
1710   }
1711
1712   return true;
1713 }
1714
1715 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1716                                            uint16_t disable) {
1717   uint16_t oldFlags = eventFlags_;
1718   eventFlags_ |= enable;
1719   eventFlags_ &= ~disable;
1720   if (eventFlags_ == oldFlags) {
1721     return true;
1722   } else {
1723     return updateEventRegistration();
1724   }
1725 }
1726
1727 void AsyncSocket::startFail() {
1728   // startFail() should only be called once
1729   assert(state_ != StateEnum::ERROR);
1730   assert(getDestructorGuardCount() > 0);
1731   state_ = StateEnum::ERROR;
1732   // Ensure that SHUT_READ and SHUT_WRITE are set,
1733   // so all future attempts to read or write will be rejected
1734   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1735
1736   if (eventFlags_ != EventHandler::NONE) {
1737     eventFlags_ = EventHandler::NONE;
1738     ioHandler_.unregisterHandler();
1739   }
1740   writeTimeout_.cancelTimeout();
1741
1742   if (fd_ >= 0) {
1743     ioHandler_.changeHandlerFD(-1);
1744     doClose();
1745   }
1746 }
1747
1748 void AsyncSocket::finishFail() {
1749   assert(state_ == StateEnum::ERROR);
1750   assert(getDestructorGuardCount() > 0);
1751
1752   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1753                          withAddr("socket closing after error"));
1754   if (connectCallback_) {
1755     ConnectCallback* callback = connectCallback_;
1756     connectCallback_ = nullptr;
1757     callback->connectErr(ex);
1758   }
1759
1760   failAllWrites(ex);
1761
1762   if (readCallback_) {
1763     ReadCallback* callback = readCallback_;
1764     readCallback_ = nullptr;
1765     callback->readErr(ex);
1766   }
1767 }
1768
1769 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1770   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1771              << state_ << " host=" << addr_.describe()
1772              << "): failed in " << fn << "(): "
1773              << ex.what();
1774   startFail();
1775   finishFail();
1776 }
1777
1778 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1779   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1780                << state_ << " host=" << addr_.describe()
1781                << "): failed while connecting in " << fn << "(): "
1782                << ex.what();
1783   startFail();
1784
1785   if (connectCallback_ != nullptr) {
1786     ConnectCallback* callback = connectCallback_;
1787     connectCallback_ = nullptr;
1788     callback->connectErr(ex);
1789   }
1790
1791   finishFail();
1792 }
1793
1794 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1795   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1796                << state_ << " host=" << addr_.describe()
1797                << "): failed while reading in " << fn << "(): "
1798                << ex.what();
1799   startFail();
1800
1801   if (readCallback_ != nullptr) {
1802     ReadCallback* callback = readCallback_;
1803     readCallback_ = nullptr;
1804     callback->readErr(ex);
1805   }
1806
1807   finishFail();
1808 }
1809
1810 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1811   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1812                << state_ << " host=" << addr_.describe()
1813                << "): failed while writing in " << fn << "(): "
1814                << ex.what();
1815   startFail();
1816
1817   // Only invoke the first write callback, since the error occurred while
1818   // writing this request.  Let any other pending write callbacks be invoked in
1819   // finishFail().
1820   if (writeReqHead_ != nullptr) {
1821     WriteRequest* req = writeReqHead_;
1822     writeReqHead_ = req->getNext();
1823     WriteCallback* callback = req->getCallback();
1824     uint32_t bytesWritten = req->getBytesWritten();
1825     req->destroy();
1826     if (callback) {
1827       callback->writeErr(bytesWritten, ex);
1828     }
1829   }
1830
1831   finishFail();
1832 }
1833
1834 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1835                              size_t bytesWritten,
1836                              const AsyncSocketException& ex) {
1837   // This version of failWrite() is used when the failure occurs before
1838   // we've added the callback to writeReqHead_.
1839   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1840              << state_ << " host=" << addr_.describe()
1841              <<"): failed while writing in " << fn << "(): "
1842              << ex.what();
1843   startFail();
1844
1845   if (callback != nullptr) {
1846     callback->writeErr(bytesWritten, ex);
1847   }
1848
1849   finishFail();
1850 }
1851
1852 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1853   // Invoke writeError() on all write callbacks.
1854   // This is used when writes are forcibly shutdown with write requests
1855   // pending, or when an error occurs with writes pending.
1856   while (writeReqHead_ != nullptr) {
1857     WriteRequest* req = writeReqHead_;
1858     writeReqHead_ = req->getNext();
1859     WriteCallback* callback = req->getCallback();
1860     if (callback) {
1861       callback->writeErr(req->getBytesWritten(), ex);
1862     }
1863     req->destroy();
1864   }
1865 }
1866
1867 void AsyncSocket::invalidState(ConnectCallback* callback) {
1868   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1869              << "): connect() called in invalid state " << state_;
1870
1871   /*
1872    * The invalidState() methods don't use the normal failure mechanisms,
1873    * since we don't know what state we are in.  We don't want to call
1874    * startFail()/finishFail() recursively if we are already in the middle of
1875    * cleaning up.
1876    */
1877
1878   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1879                          "connect() called with socket in invalid state");
1880   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1881     if (callback) {
1882       callback->connectErr(ex);
1883     }
1884   } else {
1885     // We can't use failConnect() here since connectCallback_
1886     // may already be set to another callback.  Invoke this ConnectCallback
1887     // here; any other connectCallback_ will be invoked in finishFail()
1888     startFail();
1889     if (callback) {
1890       callback->connectErr(ex);
1891     }
1892     finishFail();
1893   }
1894 }
1895
1896 void AsyncSocket::invalidState(ReadCallback* callback) {
1897   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1898              << "): setReadCallback(" << callback
1899              << ") called in invalid state " << state_;
1900
1901   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1902                          "setReadCallback() called with socket in "
1903                          "invalid state");
1904   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1905     if (callback) {
1906       callback->readErr(ex);
1907     }
1908   } else {
1909     startFail();
1910     if (callback) {
1911       callback->readErr(ex);
1912     }
1913     finishFail();
1914   }
1915 }
1916
1917 void AsyncSocket::invalidState(WriteCallback* callback) {
1918   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1919              << "): write() called in invalid state " << state_;
1920
1921   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1922                          withAddr("write() called with socket in invalid state"));
1923   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1924     if (callback) {
1925       callback->writeErr(0, ex);
1926     }
1927   } else {
1928     startFail();
1929     if (callback) {
1930       callback->writeErr(0, ex);
1931     }
1932     finishFail();
1933   }
1934 }
1935
1936 void AsyncSocket::doClose() {
1937   if (fd_ == -1) return;
1938   if (shutdownSocketSet_) {
1939     shutdownSocketSet_->close(fd_);
1940   } else {
1941     ::close(fd_);
1942   }
1943   fd_ = -1;
1944 }
1945
1946 std::ostream& operator << (std::ostream& os,
1947                            const AsyncSocket::StateEnum& state) {
1948   os << static_cast<int>(state);
1949   return os;
1950 }
1951
1952 std::string AsyncSocket::withAddr(const std::string& s) {
1953   // Don't use addr_ directly because it may not be initialized
1954   // e.g. if constructed from fd
1955   folly::SocketAddress peer, local;
1956   try {
1957     getPeerAddress(&peer);
1958     getLocalAddress(&local);
1959   } catch (const std::exception&) {
1960     // ignore
1961   } catch (...) {
1962     // ignore
1963   }
1964   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
1965 }
1966
1967 } // folly