Add missing override and remove redundant virtual in folly
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34
35 using std::string;
36 using std::unique_ptr;
37
38 namespace folly {
39
40 // static members initializers
41 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
42
43 const AsyncSocketException socketClosedLocallyEx(
44     AsyncSocketException::END_OF_FILE, "socket closed locally");
45 const AsyncSocketException socketShutdownForWritesEx(
46     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
47
48 // TODO: It might help performance to provide a version of BytesWriteRequest that
49 // users could derive from, so we can avoid the extra allocation for each call
50 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
51 // protocols are currently templatized for transports.
52 //
53 // We would need the version for external users where they provide the iovec
54 // storage space, and only our internal version would allocate it at the end of
55 // the WriteRequest.
56
57 /* The default WriteRequest implementation, used for write(), writev() and
58  * writeChain()
59  *
60  * A new BytesWriteRequest operation is allocated on the heap for all write
61  * operations that cannot be completed immediately.
62  */
63 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
64  public:
65   static BytesWriteRequest* newRequest(AsyncSocket* socket,
66                                        WriteCallback* callback,
67                                        const iovec* ops,
68                                        uint32_t opCount,
69                                        uint32_t partialWritten,
70                                        uint32_t bytesWritten,
71                                        unique_ptr<IOBuf>&& ioBuf,
72                                        WriteFlags flags) {
73     assert(opCount > 0);
74     // Since we put a variable size iovec array at the end
75     // of each BytesWriteRequest, we have to manually allocate the memory.
76     void* buf = malloc(sizeof(BytesWriteRequest) +
77                        (opCount * sizeof(struct iovec)));
78     if (buf == nullptr) {
79       throw std::bad_alloc();
80     }
81
82     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
83                                       partialWritten, bytesWritten,
84                                       std::move(ioBuf), flags);
85   }
86
87   void destroy() override {
88     this->~BytesWriteRequest();
89     free(this);
90   }
91
92   bool performWrite() override {
93     WriteFlags writeFlags = flags_;
94     if (getNext() != nullptr) {
95       writeFlags = writeFlags | WriteFlags::CORK;
96     }
97     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
98                                           &opsWritten_, &partialBytes_);
99     return bytesWritten_ >= 0;
100   }
101
102   bool isComplete() override {
103     return opsWritten_ == getOpCount();
104   }
105
106   void consume() override {
107     // Advance opIndex_ forward by opsWritten_
108     opIndex_ += opsWritten_;
109     assert(opIndex_ < opCount_);
110
111     // If we've finished writing any IOBufs, release them
112     if (ioBuf_) {
113       for (uint32_t i = opsWritten_; i != 0; --i) {
114         assert(ioBuf_);
115         ioBuf_ = ioBuf_->pop();
116       }
117     }
118
119     // Move partialBytes_ forward into the current iovec buffer
120     struct iovec* currentOp = writeOps_ + opIndex_;
121     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
122     currentOp->iov_base =
123       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
124     currentOp->iov_len -= partialBytes_;
125
126     // Increment the totalBytesWritten_ count by bytesWritten_;
127     totalBytesWritten_ += bytesWritten_;
128   }
129
130  private:
131   BytesWriteRequest(AsyncSocket* socket,
132                     WriteCallback* callback,
133                     const struct iovec* ops,
134                     uint32_t opCount,
135                     uint32_t partialBytes,
136                     uint32_t bytesWritten,
137                     unique_ptr<IOBuf>&& ioBuf,
138                     WriteFlags flags)
139     : AsyncSocket::WriteRequest(socket, callback)
140     , opCount_(opCount)
141     , opIndex_(0)
142     , flags_(flags)
143     , ioBuf_(std::move(ioBuf))
144     , opsWritten_(0)
145     , partialBytes_(partialBytes)
146     , bytesWritten_(bytesWritten) {
147     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
148   }
149
150   // private destructor, to ensure callers use destroy()
151   ~BytesWriteRequest() override = default;
152
153   const struct iovec* getOps() const {
154     assert(opCount_ > opIndex_);
155     return writeOps_ + opIndex_;
156   }
157
158   uint32_t getOpCount() const {
159     assert(opCount_ > opIndex_);
160     return opCount_ - opIndex_;
161   }
162
163   uint32_t opCount_;            ///< number of entries in writeOps_
164   uint32_t opIndex_;            ///< current index into writeOps_
165   WriteFlags flags_;            ///< set for WriteFlags
166   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
167
168   // for consume(), how much we wrote on the last write
169   uint32_t opsWritten_;         ///< complete ops written
170   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
171   ssize_t bytesWritten_;        ///< bytes written altogether
172
173   struct iovec writeOps_[];     ///< write operation(s) list
174 };
175
176 AsyncSocket::AsyncSocket()
177   : eventBase_(nullptr)
178   , writeTimeout_(this, nullptr)
179   , ioHandler_(this, nullptr)
180   , immediateReadHandler_(this) {
181   VLOG(5) << "new AsyncSocket()";
182   init();
183 }
184
185 AsyncSocket::AsyncSocket(EventBase* evb)
186   : eventBase_(evb)
187   , writeTimeout_(this, evb)
188   , ioHandler_(this, evb)
189   , immediateReadHandler_(this) {
190   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
191   init();
192 }
193
194 AsyncSocket::AsyncSocket(EventBase* evb,
195                            const folly::SocketAddress& address,
196                            uint32_t connectTimeout)
197   : AsyncSocket(evb) {
198   connect(nullptr, address, connectTimeout);
199 }
200
201 AsyncSocket::AsyncSocket(EventBase* evb,
202                            const std::string& ip,
203                            uint16_t port,
204                            uint32_t connectTimeout)
205   : AsyncSocket(evb) {
206   connect(nullptr, ip, port, connectTimeout);
207 }
208
209 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
210   : eventBase_(evb)
211   , writeTimeout_(this, evb)
212   , ioHandler_(this, evb, fd)
213   , immediateReadHandler_(this) {
214   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
215           << fd << ")";
216   init();
217   fd_ = fd;
218   setCloseOnExec();
219   state_ = StateEnum::ESTABLISHED;
220 }
221
222 // init() method, since constructor forwarding isn't supported in most
223 // compilers yet.
224 void AsyncSocket::init() {
225   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
226   shutdownFlags_ = 0;
227   state_ = StateEnum::UNINIT;
228   eventFlags_ = EventHandler::NONE;
229   fd_ = -1;
230   sendTimeout_ = 0;
231   maxReadsPerEvent_ = 16;
232   connectCallback_ = nullptr;
233   readCallback_ = nullptr;
234   writeReqHead_ = nullptr;
235   writeReqTail_ = nullptr;
236   shutdownSocketSet_ = nullptr;
237   appBytesWritten_ = 0;
238   appBytesReceived_ = 0;
239 }
240
241 AsyncSocket::~AsyncSocket() {
242   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
243           << ", evb=" << eventBase_ << ", fd=" << fd_
244           << ", state=" << state_ << ")";
245 }
246
247 void AsyncSocket::destroy() {
248   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
249           << ", fd=" << fd_ << ", state=" << state_;
250   // When destroy is called, close the socket immediately
251   closeNow();
252
253   // Then call DelayedDestruction::destroy() to take care of
254   // whether or not we need immediate or delayed destruction
255   DelayedDestruction::destroy();
256 }
257
258 int AsyncSocket::detachFd() {
259   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
260           << ", evb=" << eventBase_ << ", state=" << state_
261           << ", events=" << std::hex << eventFlags_ << ")";
262   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
263   // actually close the descriptor.
264   if (shutdownSocketSet_) {
265     shutdownSocketSet_->remove(fd_);
266   }
267   int fd = fd_;
268   fd_ = -1;
269   // Call closeNow() to invoke all pending callbacks with an error.
270   closeNow();
271   // Update the EventHandler to stop using this fd.
272   // This can only be done after closeNow() unregisters the handler.
273   ioHandler_.changeHandlerFD(-1);
274   return fd;
275 }
276
277 const folly::SocketAddress& AsyncSocket::anyAddress() {
278   static const folly::SocketAddress anyAddress =
279     folly::SocketAddress("0.0.0.0", 0);
280   return anyAddress;
281 }
282
283 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
284   if (shutdownSocketSet_ == newSS) {
285     return;
286   }
287   if (shutdownSocketSet_ && fd_ != -1) {
288     shutdownSocketSet_->remove(fd_);
289   }
290   shutdownSocketSet_ = newSS;
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->add(fd_);
293   }
294 }
295
296 void AsyncSocket::setCloseOnExec() {
297   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
298   if (rv != 0) {
299     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
300                                withAddr("failed to set close-on-exec flag"),
301                                errno);
302   }
303 }
304
305 void AsyncSocket::connect(ConnectCallback* callback,
306                            const folly::SocketAddress& address,
307                            int timeout,
308                            const OptionMap &options,
309                            const folly::SocketAddress& bindAddr) noexcept {
310   DestructorGuard dg(this);
311   assert(eventBase_->isInEventBaseThread());
312
313   addr_ = address;
314
315   // Make sure we're in the uninitialized state
316   if (state_ != StateEnum::UNINIT) {
317     return invalidState(callback);
318   }
319
320   assert(fd_ == -1);
321   state_ = StateEnum::CONNECTING;
322   connectCallback_ = callback;
323
324   sockaddr_storage addrStorage;
325   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
326
327   try {
328     // Create the socket
329     // Technically the first parameter should actually be a protocol family
330     // constant (PF_xxx) rather than an address family (AF_xxx), but the
331     // distinction is mainly just historical.  In pretty much all
332     // implementations the PF_foo and AF_foo constants are identical.
333     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
334     if (fd_ < 0) {
335       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
336                                 withAddr("failed to create socket"), errno);
337     }
338     if (shutdownSocketSet_) {
339       shutdownSocketSet_->add(fd_);
340     }
341     ioHandler_.changeHandlerFD(fd_);
342
343     setCloseOnExec();
344
345     // Put the socket in non-blocking mode
346     int flags = fcntl(fd_, F_GETFL, 0);
347     if (flags == -1) {
348       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
349                                 withAddr("failed to get socket flags"), errno);
350     }
351     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
352     if (rv == -1) {
353       throw AsyncSocketException(
354           AsyncSocketException::INTERNAL_ERROR,
355           withAddr("failed to put socket in non-blocking mode"),
356           errno);
357     }
358
359 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
360     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
361     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
362     if (rv == -1) {
363       throw AsyncSocketException(
364           AsyncSocketException::INTERNAL_ERROR,
365           "failed to enable F_SETNOSIGPIPE on socket",
366           errno);
367     }
368 #endif
369
370     // By default, turn on TCP_NODELAY
371     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
372     // setNoDelay() will log an error message if it fails.
373     if (address.getFamily() != AF_UNIX) {
374       (void)setNoDelay(true);
375     }
376
377     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
378             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
379
380     // bind the socket
381     if (bindAddr != anyAddress()) {
382       int one = 1;
383       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
384         doClose();
385         throw AsyncSocketException(
386           AsyncSocketException::NOT_OPEN,
387           "failed to setsockopt prior to bind on " + bindAddr.describe(),
388           errno);
389       }
390
391       bindAddr.getAddress(&addrStorage);
392
393       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
394         doClose();
395         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
396                                   "failed to bind to async socket: " +
397                                   bindAddr.describe(),
398                                   errno);
399       }
400     }
401
402     // Apply the additional options if any.
403     for (const auto& opt: options) {
404       int rv = opt.first.apply(fd_, opt.second);
405       if (rv != 0) {
406         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
407                                   withAddr("failed to set socket option"),
408                                   errno);
409       }
410     }
411
412     // Perform the connect()
413     address.getAddress(&addrStorage);
414
415     rv = ::connect(fd_, saddr, address.getActualSize());
416     if (rv < 0) {
417       if (errno == EINPROGRESS) {
418         // Connection in progress.
419         if (timeout > 0) {
420           // Start a timer in case the connection takes too long.
421           if (!writeTimeout_.scheduleTimeout(timeout)) {
422             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
423                 withAddr("failed to schedule AsyncSocket connect timeout"));
424           }
425         }
426
427         // Register for write events, so we'll
428         // be notified when the connection finishes/fails.
429         // Note that we don't register for a persistent event here.
430         assert(eventFlags_ == EventHandler::NONE);
431         eventFlags_ = EventHandler::WRITE;
432         if (!ioHandler_.registerHandler(eventFlags_)) {
433           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
434               withAddr("failed to register AsyncSocket connect handler"));
435         }
436         return;
437       } else {
438         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
439                                   "connect failed (immediately)", errno);
440       }
441     }
442
443     // If we're still here the connect() succeeded immediately.
444     // Fall through to call the callback outside of this try...catch block
445   } catch (const AsyncSocketException& ex) {
446     return failConnect(__func__, ex);
447   } catch (const std::exception& ex) {
448     // shouldn't happen, but handle it just in case
449     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
450                << "): unexpected " << typeid(ex).name() << " exception: "
451                << ex.what();
452     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
453                             withAddr(string("unexpected exception: ") +
454                                      ex.what()));
455     return failConnect(__func__, tex);
456   }
457
458   // The connection succeeded immediately
459   // The read callback may not have been set yet, and no writes may be pending
460   // yet, so we don't have to register for any events at the moment.
461   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
462   assert(readCallback_ == nullptr);
463   assert(writeReqHead_ == nullptr);
464   state_ = StateEnum::ESTABLISHED;
465   if (callback) {
466     connectCallback_ = nullptr;
467     callback->connectSuccess();
468   }
469 }
470
471 void AsyncSocket::connect(ConnectCallback* callback,
472                            const string& ip, uint16_t port,
473                            int timeout,
474                            const OptionMap &options) noexcept {
475   DestructorGuard dg(this);
476   try {
477     connectCallback_ = callback;
478     connect(callback, folly::SocketAddress(ip, port), timeout, options);
479   } catch (const std::exception& ex) {
480     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
481                             ex.what());
482     return failConnect(__func__, tex);
483   }
484 }
485
486 void AsyncSocket::cancelConnect() {
487   connectCallback_ = nullptr;
488   if (state_ == StateEnum::CONNECTING) {
489     closeNow();
490   }
491 }
492
493 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
494   sendTimeout_ = milliseconds;
495   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
496
497   // If we are currently pending on write requests, immediately update
498   // writeTimeout_ with the new value.
499   if ((eventFlags_ & EventHandler::WRITE) &&
500       (state_ != StateEnum::CONNECTING)) {
501     assert(state_ == StateEnum::ESTABLISHED);
502     assert((shutdownFlags_ & SHUT_WRITE) == 0);
503     if (sendTimeout_ > 0) {
504       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
505         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
506             withAddr("failed to reschedule send timeout in setSendTimeout"));
507         return failWrite(__func__, ex);
508       }
509     } else {
510       writeTimeout_.cancelTimeout();
511     }
512   }
513 }
514
515 void AsyncSocket::setReadCB(ReadCallback *callback) {
516   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
517           << ", callback=" << callback << ", state=" << state_;
518
519   // Short circuit if callback is the same as the existing readCallback_.
520   //
521   // Note that this is needed for proper functioning during some cleanup cases.
522   // During cleanup we allow setReadCallback(nullptr) to be called even if the
523   // read callback is already unset and we have been detached from an event
524   // base.  This check prevents us from asserting
525   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
526   if (callback == readCallback_) {
527     return;
528   }
529
530   /* We are removing a read callback */
531   if (callback == nullptr &&
532       immediateReadHandler_.isLoopCallbackScheduled()) {
533     immediateReadHandler_.cancelLoopCallback();
534   }
535
536   if (shutdownFlags_ & SHUT_READ) {
537     // Reads have already been shut down on this socket.
538     //
539     // Allow setReadCallback(nullptr) to be called in this case, but don't
540     // allow a new callback to be set.
541     //
542     // For example, setReadCallback(nullptr) can happen after an error if we
543     // invoke some other error callback before invoking readError().  The other
544     // error callback that is invoked first may go ahead and clear the read
545     // callback before we get a chance to invoke readError().
546     if (callback != nullptr) {
547       return invalidState(callback);
548     }
549     assert((eventFlags_ & EventHandler::READ) == 0);
550     readCallback_ = nullptr;
551     return;
552   }
553
554   DestructorGuard dg(this);
555   assert(eventBase_->isInEventBaseThread());
556
557   switch ((StateEnum)state_) {
558     case StateEnum::CONNECTING:
559       // For convenience, we allow the read callback to be set while we are
560       // still connecting.  We just store the callback for now.  Once the
561       // connection completes we'll register for read events.
562       readCallback_ = callback;
563       return;
564     case StateEnum::ESTABLISHED:
565     {
566       readCallback_ = callback;
567       uint16_t oldFlags = eventFlags_;
568       if (readCallback_) {
569         eventFlags_ |= EventHandler::READ;
570       } else {
571         eventFlags_ &= ~EventHandler::READ;
572       }
573
574       // Update our registration if our flags have changed
575       if (eventFlags_ != oldFlags) {
576         // We intentionally ignore the return value here.
577         // updateEventRegistration() will move us into the error state if it
578         // fails, and we don't need to do anything else here afterwards.
579         (void)updateEventRegistration();
580       }
581
582       if (readCallback_) {
583         checkForImmediateRead();
584       }
585       return;
586     }
587     case StateEnum::CLOSED:
588     case StateEnum::ERROR:
589       // We should never reach here.  SHUT_READ should always be set
590       // if we are in STATE_CLOSED or STATE_ERROR.
591       assert(false);
592       return invalidState(callback);
593     case StateEnum::UNINIT:
594       // We do not allow setReadCallback() to be called before we start
595       // connecting.
596       return invalidState(callback);
597   }
598
599   // We don't put a default case in the switch statement, so that the compiler
600   // will warn us to update the switch statement if a new state is added.
601   return invalidState(callback);
602 }
603
604 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
605   return readCallback_;
606 }
607
608 void AsyncSocket::write(WriteCallback* callback,
609                          const void* buf, size_t bytes, WriteFlags flags) {
610   iovec op;
611   op.iov_base = const_cast<void*>(buf);
612   op.iov_len = bytes;
613   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
614 }
615
616 void AsyncSocket::writev(WriteCallback* callback,
617                           const iovec* vec,
618                           size_t count,
619                           WriteFlags flags) {
620   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
621 }
622
623 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
624                               WriteFlags flags) {
625   size_t count = buf->countChainElements();
626   if (count <= 64) {
627     iovec vec[count];
628     writeChainImpl(callback, vec, count, std::move(buf), flags);
629   } else {
630     iovec* vec = new iovec[count];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632     delete[] vec;
633   }
634 }
635
636 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
637     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
638   size_t veclen = buf->fillIov(vec, count);
639   writeImpl(callback, vec, veclen, std::move(buf), flags);
640 }
641
642 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
643                              size_t count, unique_ptr<IOBuf>&& buf,
644                              WriteFlags flags) {
645   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
646           << ", callback=" << callback << ", count=" << count
647           << ", state=" << state_;
648   DestructorGuard dg(this);
649   unique_ptr<IOBuf>ioBuf(std::move(buf));
650   assert(eventBase_->isInEventBaseThread());
651
652   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
653     // No new writes may be performed after the write side of the socket has
654     // been shutdown.
655     //
656     // We could just call callback->writeError() here to fail just this write.
657     // However, fail hard and use invalidState() to fail all outstanding
658     // callbacks and move the socket into the error state.  There's most likely
659     // a bug in the caller's code, so we abort everything rather than trying to
660     // proceed as best we can.
661     return invalidState(callback);
662   }
663
664   uint32_t countWritten = 0;
665   uint32_t partialWritten = 0;
666   int bytesWritten = 0;
667   bool mustRegister = false;
668   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
669     if (writeReqHead_ == nullptr) {
670       // If we are established and there are no other writes pending,
671       // we can attempt to perform the write immediately.
672       assert(writeReqTail_ == nullptr);
673       assert((eventFlags_ & EventHandler::WRITE) == 0);
674
675       bytesWritten = performWrite(vec, count, flags,
676                                   &countWritten, &partialWritten);
677       if (bytesWritten < 0) {
678         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
679                                withAddr("writev failed"), errno);
680         return failWrite(__func__, callback, 0, ex);
681       } else if (countWritten == count) {
682         // We successfully wrote everything.
683         // Invoke the callback and return.
684         if (callback) {
685           callback->writeSuccess();
686         }
687         return;
688       } // else { continue writing the next writeReq }
689       mustRegister = true;
690     }
691   } else if (!connecting()) {
692     // Invalid state for writing
693     return invalidState(callback);
694   }
695
696   // Create a new WriteRequest to add to the queue
697   WriteRequest* req;
698   try {
699     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
700                                         count - countWritten, partialWritten,
701                                         bytesWritten, std::move(ioBuf), flags);
702   } catch (const std::exception& ex) {
703     // we mainly expect to catch std::bad_alloc here
704     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
705         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
706     return failWrite(__func__, callback, bytesWritten, tex);
707   }
708   req->consume();
709   if (writeReqTail_ == nullptr) {
710     assert(writeReqHead_ == nullptr);
711     writeReqHead_ = writeReqTail_ = req;
712   } else {
713     writeReqTail_->append(req);
714     writeReqTail_ = req;
715   }
716
717   // Register for write events if are established and not currently
718   // waiting on write events
719   if (mustRegister) {
720     assert(state_ == StateEnum::ESTABLISHED);
721     assert((eventFlags_ & EventHandler::WRITE) == 0);
722     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
723       assert(state_ == StateEnum::ERROR);
724       return;
725     }
726     if (sendTimeout_ > 0) {
727       // Schedule a timeout to fire if the write takes too long.
728       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
729         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
730                                withAddr("failed to schedule send timeout"));
731         return failWrite(__func__, ex);
732       }
733     }
734   }
735 }
736
737 void AsyncSocket::writeRequest(WriteRequest* req) {
738   if (writeReqTail_ == nullptr) {
739     assert(writeReqHead_ == nullptr);
740     writeReqHead_ = writeReqTail_ = req;
741     req->start();
742   } else {
743     writeReqTail_->append(req);
744     writeReqTail_ = req;
745   }
746 }
747
748 void AsyncSocket::close() {
749   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
750           << ", state=" << state_ << ", shutdownFlags="
751           << std::hex << (int) shutdownFlags_;
752
753   // close() is only different from closeNow() when there are pending writes
754   // that need to drain before we can close.  In all other cases, just call
755   // closeNow().
756   //
757   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
758   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
759   // is still running.  (e.g., If there are multiple pending writes, and we
760   // call writeError() on the first one, it may call close().  In this case we
761   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
762   // writes will still be in the queue.)
763   //
764   // We only need to drain pending writes if we are still in STATE_CONNECTING
765   // or STATE_ESTABLISHED
766   if ((writeReqHead_ == nullptr) ||
767       !(state_ == StateEnum::CONNECTING ||
768       state_ == StateEnum::ESTABLISHED)) {
769     closeNow();
770     return;
771   }
772
773   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
774   // destroyed until close() returns.
775   DestructorGuard dg(this);
776   assert(eventBase_->isInEventBaseThread());
777
778   // Since there are write requests pending, we have to set the
779   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
780   // connect finishes and we finish writing these requests.
781   //
782   // Set SHUT_READ to indicate that reads are shut down, and set the
783   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
784   // pending writes complete.
785   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
786
787   // If a read callback is set, invoke readEOF() immediately to inform it that
788   // the socket has been closed and no more data can be read.
789   if (readCallback_) {
790     // Disable reads if they are enabled
791     if (!updateEventRegistration(0, EventHandler::READ)) {
792       // We're now in the error state; callbacks have been cleaned up
793       assert(state_ == StateEnum::ERROR);
794       assert(readCallback_ == nullptr);
795     } else {
796       ReadCallback* callback = readCallback_;
797       readCallback_ = nullptr;
798       callback->readEOF();
799     }
800   }
801 }
802
803 void AsyncSocket::closeNow() {
804   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
805           << ", state=" << state_ << ", shutdownFlags="
806           << std::hex << (int) shutdownFlags_;
807   DestructorGuard dg(this);
808   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
809
810   switch (state_) {
811     case StateEnum::ESTABLISHED:
812     case StateEnum::CONNECTING:
813     {
814       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
815       state_ = StateEnum::CLOSED;
816
817       // If the write timeout was set, cancel it.
818       writeTimeout_.cancelTimeout();
819
820       // If we are registered for I/O events, unregister.
821       if (eventFlags_ != EventHandler::NONE) {
822         eventFlags_ = EventHandler::NONE;
823         if (!updateEventRegistration()) {
824           // We will have been moved into the error state.
825           assert(state_ == StateEnum::ERROR);
826           return;
827         }
828       }
829
830       if (immediateReadHandler_.isLoopCallbackScheduled()) {
831         immediateReadHandler_.cancelLoopCallback();
832       }
833
834       if (fd_ >= 0) {
835         ioHandler_.changeHandlerFD(-1);
836         doClose();
837       }
838
839       if (connectCallback_) {
840         ConnectCallback* callback = connectCallback_;
841         connectCallback_ = nullptr;
842         callback->connectErr(socketClosedLocallyEx);
843       }
844
845       failAllWrites(socketClosedLocallyEx);
846
847       if (readCallback_) {
848         ReadCallback* callback = readCallback_;
849         readCallback_ = nullptr;
850         callback->readEOF();
851       }
852       return;
853     }
854     case StateEnum::CLOSED:
855       // Do nothing.  It's possible that we are being called recursively
856       // from inside a callback that we invoked inside another call to close()
857       // that is still running.
858       return;
859     case StateEnum::ERROR:
860       // Do nothing.  The error handling code has performed (or is performing)
861       // cleanup.
862       return;
863     case StateEnum::UNINIT:
864       assert(eventFlags_ == EventHandler::NONE);
865       assert(connectCallback_ == nullptr);
866       assert(readCallback_ == nullptr);
867       assert(writeReqHead_ == nullptr);
868       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
869       state_ = StateEnum::CLOSED;
870       return;
871   }
872
873   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
874               << ") called in unknown state " << state_;
875 }
876
877 void AsyncSocket::closeWithReset() {
878   // Enable SO_LINGER, with the linger timeout set to 0.
879   // This will trigger a TCP reset when we close the socket.
880   if (fd_ >= 0) {
881     struct linger optLinger = {1, 0};
882     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
883       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
884               << "on " << fd_ << ": errno=" << errno;
885     }
886   }
887
888   // Then let closeNow() take care of the rest
889   closeNow();
890 }
891
892 void AsyncSocket::shutdownWrite() {
893   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
894           << ", state=" << state_ << ", shutdownFlags="
895           << std::hex << (int) shutdownFlags_;
896
897   // If there are no pending writes, shutdownWrite() is identical to
898   // shutdownWriteNow().
899   if (writeReqHead_ == nullptr) {
900     shutdownWriteNow();
901     return;
902   }
903
904   assert(eventBase_->isInEventBaseThread());
905
906   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
907   // shutdown will be performed once all writes complete.
908   shutdownFlags_ |= SHUT_WRITE_PENDING;
909 }
910
911 void AsyncSocket::shutdownWriteNow() {
912   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
913           << ", fd=" << fd_ << ", state=" << state_
914           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
915
916   if (shutdownFlags_ & SHUT_WRITE) {
917     // Writes are already shutdown; nothing else to do.
918     return;
919   }
920
921   // If SHUT_READ is already set, just call closeNow() to completely
922   // close the socket.  This can happen if close() was called with writes
923   // pending, and then shutdownWriteNow() is called before all pending writes
924   // complete.
925   if (shutdownFlags_ & SHUT_READ) {
926     closeNow();
927     return;
928   }
929
930   DestructorGuard dg(this);
931   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
932
933   switch (static_cast<StateEnum>(state_)) {
934     case StateEnum::ESTABLISHED:
935     {
936       shutdownFlags_ |= SHUT_WRITE;
937
938       // If the write timeout was set, cancel it.
939       writeTimeout_.cancelTimeout();
940
941       // If we are registered for write events, unregister.
942       if (!updateEventRegistration(0, EventHandler::WRITE)) {
943         // We will have been moved into the error state.
944         assert(state_ == StateEnum::ERROR);
945         return;
946       }
947
948       // Shutdown writes on the file descriptor
949       ::shutdown(fd_, SHUT_WR);
950
951       // Immediately fail all write requests
952       failAllWrites(socketShutdownForWritesEx);
953       return;
954     }
955     case StateEnum::CONNECTING:
956     {
957       // Set the SHUT_WRITE_PENDING flag.
958       // When the connection completes, it will check this flag,
959       // shutdown the write half of the socket, and then set SHUT_WRITE.
960       shutdownFlags_ |= SHUT_WRITE_PENDING;
961
962       // Immediately fail all write requests
963       failAllWrites(socketShutdownForWritesEx);
964       return;
965     }
966     case StateEnum::UNINIT:
967       // Callers normally shouldn't call shutdownWriteNow() before the socket
968       // even starts connecting.  Nonetheless, go ahead and set
969       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
970       // immediately shut down the write side of the socket.
971       shutdownFlags_ |= SHUT_WRITE_PENDING;
972       return;
973     case StateEnum::CLOSED:
974     case StateEnum::ERROR:
975       // We should never get here.  SHUT_WRITE should always be set
976       // in STATE_CLOSED and STATE_ERROR.
977       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
978                  << ", fd=" << fd_ << ") in unexpected state " << state_
979                  << " with SHUT_WRITE not set ("
980                  << std::hex << (int) shutdownFlags_ << ")";
981       assert(false);
982       return;
983   }
984
985   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
986               << fd_ << ") called in unknown state " << state_;
987 }
988
989 bool AsyncSocket::readable() const {
990   if (fd_ == -1) {
991     return false;
992   }
993   struct pollfd fds[1];
994   fds[0].fd = fd_;
995   fds[0].events = POLLIN;
996   fds[0].revents = 0;
997   int rc = poll(fds, 1, 0);
998   return rc == 1;
999 }
1000
1001 bool AsyncSocket::isPending() const {
1002   return ioHandler_.isPending();
1003 }
1004
1005 bool AsyncSocket::hangup() const {
1006   if (fd_ == -1) {
1007     // sanity check, no one should ask for hangup if we are not connected.
1008     assert(false);
1009     return false;
1010   }
1011 #ifdef POLLRDHUP // Linux-only
1012   struct pollfd fds[1];
1013   fds[0].fd = fd_;
1014   fds[0].events = POLLRDHUP|POLLHUP;
1015   fds[0].revents = 0;
1016   poll(fds, 1, 0);
1017   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1018 #else
1019   return false;
1020 #endif
1021 }
1022
1023 bool AsyncSocket::good() const {
1024   return ((state_ == StateEnum::CONNECTING ||
1025           state_ == StateEnum::ESTABLISHED) &&
1026           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1027 }
1028
1029 bool AsyncSocket::error() const {
1030   return (state_ == StateEnum::ERROR);
1031 }
1032
1033 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1034   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1035           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1036           << ", state=" << state_ << ", events="
1037           << std::hex << eventFlags_ << ")";
1038   assert(eventBase_ == nullptr);
1039   assert(eventBase->isInEventBaseThread());
1040
1041   eventBase_ = eventBase;
1042   ioHandler_.attachEventBase(eventBase);
1043   writeTimeout_.attachEventBase(eventBase);
1044 }
1045
1046 void AsyncSocket::detachEventBase() {
1047   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1048           << ", old evb=" << eventBase_ << ", state=" << state_
1049           << ", events=" << std::hex << eventFlags_ << ")";
1050   assert(eventBase_ != nullptr);
1051   assert(eventBase_->isInEventBaseThread());
1052
1053   eventBase_ = nullptr;
1054   ioHandler_.detachEventBase();
1055   writeTimeout_.detachEventBase();
1056 }
1057
1058 bool AsyncSocket::isDetachable() const {
1059   DCHECK(eventBase_ != nullptr);
1060   DCHECK(eventBase_->isInEventBaseThread());
1061
1062   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1063 }
1064
1065 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1066   address->setFromLocalAddress(fd_);
1067 }
1068
1069 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1070   if (!addr_.isInitialized()) {
1071     addr_.setFromPeerAddress(fd_);
1072   }
1073   *address = addr_;
1074 }
1075
1076 int AsyncSocket::setNoDelay(bool noDelay) {
1077   if (fd_ < 0) {
1078     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1079                << this << "(state=" << state_ << ")";
1080     return EINVAL;
1081
1082   }
1083
1084   int value = noDelay ? 1 : 0;
1085   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1086     int errnoCopy = errno;
1087     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1088             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1089             << strerror(errnoCopy);
1090     return errnoCopy;
1091   }
1092
1093   return 0;
1094 }
1095
1096 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1097
1098   #ifndef TCP_CONGESTION
1099   #define TCP_CONGESTION  13
1100   #endif
1101
1102   if (fd_ < 0) {
1103     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1104                << "socket " << this << "(state=" << state_ << ")";
1105     return EINVAL;
1106
1107   }
1108
1109   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1110         cname.length() + 1) != 0) {
1111     int errnoCopy = errno;
1112     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1113             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1114             << strerror(errnoCopy);
1115     return errnoCopy;
1116   }
1117
1118   return 0;
1119 }
1120
1121 int AsyncSocket::setQuickAck(bool quickack) {
1122   if (fd_ < 0) {
1123     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1124                << this << "(state=" << state_ << ")";
1125     return EINVAL;
1126
1127   }
1128
1129 #ifdef TCP_QUICKACK // Linux-only
1130   int value = quickack ? 1 : 0;
1131   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1132     int errnoCopy = errno;
1133     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1134             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1135             << strerror(errnoCopy);
1136     return errnoCopy;
1137   }
1138
1139   return 0;
1140 #else
1141   return ENOSYS;
1142 #endif
1143 }
1144
1145 int AsyncSocket::setSendBufSize(size_t bufsize) {
1146   if (fd_ < 0) {
1147     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1148                << this << "(state=" << state_ << ")";
1149     return EINVAL;
1150   }
1151
1152   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1153     int errnoCopy = errno;
1154     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1155             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1156             << strerror(errnoCopy);
1157     return errnoCopy;
1158   }
1159
1160   return 0;
1161 }
1162
1163 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1164   if (fd_ < 0) {
1165     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1166                << this << "(state=" << state_ << ")";
1167     return EINVAL;
1168   }
1169
1170   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1171     int errnoCopy = errno;
1172     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1173             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1174             << strerror(errnoCopy);
1175     return errnoCopy;
1176   }
1177
1178   return 0;
1179 }
1180
1181 int AsyncSocket::setTCPProfile(int profd) {
1182   if (fd_ < 0) {
1183     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1184                << this << "(state=" << state_ << ")";
1185     return EINVAL;
1186   }
1187
1188   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1189     int errnoCopy = errno;
1190     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1191             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1192             << strerror(errnoCopy);
1193     return errnoCopy;
1194   }
1195
1196   return 0;
1197 }
1198
1199 void AsyncSocket::ioReady(uint16_t events) noexcept {
1200   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1201           << ", events=" << std::hex << events << ", state=" << state_;
1202   DestructorGuard dg(this);
1203   assert(events & EventHandler::READ_WRITE);
1204   assert(eventBase_->isInEventBaseThread());
1205
1206   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1207   if (relevantEvents == EventHandler::READ) {
1208     handleRead();
1209   } else if (relevantEvents == EventHandler::WRITE) {
1210     handleWrite();
1211   } else if (relevantEvents == EventHandler::READ_WRITE) {
1212     EventBase* originalEventBase = eventBase_;
1213     // If both read and write events are ready, process writes first.
1214     handleWrite();
1215
1216     // Return now if handleWrite() detached us from our EventBase
1217     if (eventBase_ != originalEventBase) {
1218       return;
1219     }
1220
1221     // Only call handleRead() if a read callback is still installed.
1222     // (It's possible that the read callback was uninstalled during
1223     // handleWrite().)
1224     if (readCallback_) {
1225       handleRead();
1226     }
1227   } else {
1228     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1229                << std::hex << events << "(this=" << this << ")";
1230     abort();
1231   }
1232 }
1233
1234 ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
1235   ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
1236   if (bytes < 0) {
1237     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1238       // No more data to read right now.
1239       return READ_BLOCKING;
1240     } else {
1241       return READ_ERROR;
1242     }
1243   } else {
1244     appBytesReceived_ += bytes;
1245     return bytes;
1246   }
1247 }
1248
1249 void AsyncSocket::handleRead() noexcept {
1250   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1251           << ", state=" << state_;
1252   assert(state_ == StateEnum::ESTABLISHED);
1253   assert((shutdownFlags_ & SHUT_READ) == 0);
1254   assert(readCallback_ != nullptr);
1255   assert(eventFlags_ & EventHandler::READ);
1256
1257   // Loop until:
1258   // - a read attempt would block
1259   // - readCallback_ is uninstalled
1260   // - the number of loop iterations exceeds the optional maximum
1261   // - this AsyncSocket is moved to another EventBase
1262   //
1263   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1264   // which is why need to check for it here.
1265   //
1266   // The last bullet point is slightly subtle.  readDataAvailable() may also
1267   // detach this socket from this EventBase.  However, before
1268   // readDataAvailable() returns another thread may pick it up, attach it to
1269   // a different EventBase, and install another readCallback_.  We need to
1270   // exit immediately after readDataAvailable() returns if the eventBase_ has
1271   // changed.  (The caller must perform some sort of locking to transfer the
1272   // AsyncSocket between threads properly.  This will be sufficient to ensure
1273   // that this thread sees the updated eventBase_ variable after
1274   // readDataAvailable() returns.)
1275   uint16_t numReads = 0;
1276   EventBase* originalEventBase = eventBase_;
1277   while (readCallback_ && eventBase_ == originalEventBase) {
1278     // Get the buffer to read into.
1279     void* buf = nullptr;
1280     size_t buflen = 0;
1281     try {
1282       readCallback_->getReadBuffer(&buf, &buflen);
1283     } catch (const AsyncSocketException& ex) {
1284       return failRead(__func__, ex);
1285     } catch (const std::exception& ex) {
1286       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1287                               string("ReadCallback::getReadBuffer() "
1288                                      "threw exception: ") +
1289                               ex.what());
1290       return failRead(__func__, tex);
1291     } catch (...) {
1292       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1293                              "ReadCallback::getReadBuffer() threw "
1294                              "non-exception type");
1295       return failRead(__func__, ex);
1296     }
1297     if (buf == nullptr || buflen == 0) {
1298       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1299                              "ReadCallback::getReadBuffer() returned "
1300                              "empty buffer");
1301       return failRead(__func__, ex);
1302     }
1303
1304     // Perform the read
1305     ssize_t bytesRead = performRead(buf, buflen);
1306     if (bytesRead > 0) {
1307       readCallback_->readDataAvailable(bytesRead);
1308       // Fall through and continue around the loop if the read
1309       // completely filled the available buffer.
1310       // Note that readCallback_ may have been uninstalled or changed inside
1311       // readDataAvailable().
1312       if (size_t(bytesRead) < buflen) {
1313         return;
1314       }
1315     } else if (bytesRead == READ_BLOCKING) {
1316         // No more data to read right now.
1317         return;
1318     } else if (bytesRead == READ_ERROR) {
1319       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1320                              withAddr("recv() failed"), errno);
1321       return failRead(__func__, ex);
1322     } else {
1323       assert(bytesRead == READ_EOF);
1324       // EOF
1325       shutdownFlags_ |= SHUT_READ;
1326       if (!updateEventRegistration(0, EventHandler::READ)) {
1327         // we've already been moved into STATE_ERROR
1328         assert(state_ == StateEnum::ERROR);
1329         assert(readCallback_ == nullptr);
1330         return;
1331       }
1332
1333       ReadCallback* callback = readCallback_;
1334       readCallback_ = nullptr;
1335       callback->readEOF();
1336       return;
1337     }
1338     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1339       if (readCallback_ != nullptr) {
1340         // We might still have data in the socket.
1341         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1342         scheduleImmediateRead();
1343       }
1344       return;
1345     }
1346   }
1347 }
1348
1349 /**
1350  * This function attempts to write as much data as possible, until no more data
1351  * can be written.
1352  *
1353  * - If it sends all available data, it unregisters for write events, and stops
1354  *   the writeTimeout_.
1355  *
1356  * - If not all of the data can be sent immediately, it reschedules
1357  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1358  *   registered for write events.
1359  */
1360 void AsyncSocket::handleWrite() noexcept {
1361   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1362           << ", state=" << state_;
1363   if (state_ == StateEnum::CONNECTING) {
1364     handleConnect();
1365     return;
1366   }
1367
1368   // Normal write
1369   assert(state_ == StateEnum::ESTABLISHED);
1370   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1371   assert(writeReqHead_ != nullptr);
1372
1373   // Loop until we run out of write requests,
1374   // or until this socket is moved to another EventBase.
1375   // (See the comment in handleRead() explaining how this can happen.)
1376   EventBase* originalEventBase = eventBase_;
1377   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1378     if (!writeReqHead_->performWrite()) {
1379       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1380                              withAddr("writev() failed"), errno);
1381       return failWrite(__func__, ex);
1382     } else if (writeReqHead_->isComplete()) {
1383       // We finished this request
1384       WriteRequest* req = writeReqHead_;
1385       writeReqHead_ = req->getNext();
1386
1387       if (writeReqHead_ == nullptr) {
1388         writeReqTail_ = nullptr;
1389         // This is the last write request.
1390         // Unregister for write events and cancel the send timer
1391         // before we invoke the callback.  We have to update the state properly
1392         // before calling the callback, since it may want to detach us from
1393         // the EventBase.
1394         if (eventFlags_ & EventHandler::WRITE) {
1395           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1396             assert(state_ == StateEnum::ERROR);
1397             return;
1398           }
1399           // Stop the send timeout
1400           writeTimeout_.cancelTimeout();
1401         }
1402         assert(!writeTimeout_.isScheduled());
1403
1404         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1405         // we finish sending the last write request.
1406         //
1407         // We have to do this before invoking writeSuccess(), since
1408         // writeSuccess() may detach us from our EventBase.
1409         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1410           assert(connectCallback_ == nullptr);
1411           shutdownFlags_ |= SHUT_WRITE;
1412
1413           if (shutdownFlags_ & SHUT_READ) {
1414             // Reads have already been shutdown.  Fully close the socket and
1415             // move to STATE_CLOSED.
1416             //
1417             // Note: This code currently moves us to STATE_CLOSED even if
1418             // close() hasn't ever been called.  This can occur if we have
1419             // received EOF from the peer and shutdownWrite() has been called
1420             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1421             // case, until close() is actually called?  I can't think of a
1422             // reason why we would need to do so.  No other operations besides
1423             // calling close() or destroying the socket can be performed at
1424             // this point.
1425             assert(readCallback_ == nullptr);
1426             state_ = StateEnum::CLOSED;
1427             if (fd_ >= 0) {
1428               ioHandler_.changeHandlerFD(-1);
1429               doClose();
1430             }
1431           } else {
1432             // Reads are still enabled, so we are only doing a half-shutdown
1433             ::shutdown(fd_, SHUT_WR);
1434           }
1435         }
1436       }
1437
1438       // Invoke the callback
1439       WriteCallback* callback = req->getCallback();
1440       req->destroy();
1441       if (callback) {
1442         callback->writeSuccess();
1443       }
1444       // We'll continue around the loop, trying to write another request
1445     } else {
1446       // Partial write.
1447       writeReqHead_->consume();
1448       // Stop after a partial write; it's highly likely that a subsequent write
1449       // attempt will just return EAGAIN.
1450       //
1451       // Ensure that we are registered for write events.
1452       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1453         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1454           assert(state_ == StateEnum::ERROR);
1455           return;
1456         }
1457       }
1458
1459       // Reschedule the send timeout, since we have made some write progress.
1460       if (sendTimeout_ > 0) {
1461         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1462           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1463               withAddr("failed to reschedule write timeout"));
1464           return failWrite(__func__, ex);
1465         }
1466       }
1467       return;
1468     }
1469   }
1470 }
1471
1472 void AsyncSocket::checkForImmediateRead() noexcept {
1473   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1474   // (However, note that some subclasses do override this method.)
1475   //
1476   // Simply calling handleRead() here would be bad, as this would call
1477   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1478   // buffer even though no data may be available.  This would waste lots of
1479   // memory, since the buffer will sit around unused until the socket actually
1480   // becomes readable.
1481   //
1482   // Checking if the socket is readable now also seems like it would probably
1483   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1484   // would just waste an extra system call.  Even if it is readable, waiting to
1485   // find out from libevent on the next event loop doesn't seem that bad.
1486 }
1487
1488 void AsyncSocket::handleInitialReadWrite() noexcept {
1489   // Our callers should already be holding a DestructorGuard, but grab
1490   // one here just to make sure, in case one of our calling code paths ever
1491   // changes.
1492   DestructorGuard dg(this);
1493
1494   // If we have a readCallback_, make sure we enable read events.  We
1495   // may already be registered for reads if connectSuccess() set
1496   // the read calback.
1497   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1498     assert(state_ == StateEnum::ESTABLISHED);
1499     assert((shutdownFlags_ & SHUT_READ) == 0);
1500     if (!updateEventRegistration(EventHandler::READ, 0)) {
1501       assert(state_ == StateEnum::ERROR);
1502       return;
1503     }
1504     checkForImmediateRead();
1505   } else if (readCallback_ == nullptr) {
1506     // Unregister for read events.
1507     updateEventRegistration(0, EventHandler::READ);
1508   }
1509
1510   // If we have write requests pending, try to send them immediately.
1511   // Since we just finished accepting, there is a very good chance that we can
1512   // write without blocking.
1513   //
1514   // However, we only process them if EventHandler::WRITE is not already set,
1515   // which means that we're already blocked on a write attempt.  (This can
1516   // happen if connectSuccess() called write() before returning.)
1517   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1518     // Call handleWrite() to perform write processing.
1519     handleWrite();
1520   } else if (writeReqHead_ == nullptr) {
1521     // Unregister for write event.
1522     updateEventRegistration(0, EventHandler::WRITE);
1523   }
1524 }
1525
1526 void AsyncSocket::handleConnect() noexcept {
1527   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1528           << ", state=" << state_;
1529   assert(state_ == StateEnum::CONNECTING);
1530   // SHUT_WRITE can never be set while we are still connecting;
1531   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1532   // finishes
1533   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1534
1535   // In case we had a connect timeout, cancel the timeout
1536   writeTimeout_.cancelTimeout();
1537   // We don't use a persistent registration when waiting on a connect event,
1538   // so we have been automatically unregistered now.  Update eventFlags_ to
1539   // reflect reality.
1540   assert(eventFlags_ == EventHandler::WRITE);
1541   eventFlags_ = EventHandler::NONE;
1542
1543   // Call getsockopt() to check if the connect succeeded
1544   int error;
1545   socklen_t len = sizeof(error);
1546   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1547   if (rv != 0) {
1548     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1549                            withAddr("error calling getsockopt() after connect"),
1550                            errno);
1551     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1552                << fd_ << " host=" << addr_.describe()
1553                << ") exception:" << ex.what();
1554     return failConnect(__func__, ex);
1555   }
1556
1557   if (error != 0) {
1558     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1559                            "connect failed", error);
1560     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1561             << fd_ << " host=" << addr_.describe()
1562             << ") exception: " << ex.what();
1563     return failConnect(__func__, ex);
1564   }
1565
1566   // Move into STATE_ESTABLISHED
1567   state_ = StateEnum::ESTABLISHED;
1568
1569   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1570   // perform, immediately shutdown the write half of the socket.
1571   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1572     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1573     // are still connecting we just abort the connect rather than waiting for
1574     // it to complete.
1575     assert((shutdownFlags_ & SHUT_READ) == 0);
1576     ::shutdown(fd_, SHUT_WR);
1577     shutdownFlags_ |= SHUT_WRITE;
1578   }
1579
1580   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1581           << "successfully connected; state=" << state_;
1582
1583   // Remember the EventBase we are attached to, before we start invoking any
1584   // callbacks (since the callbacks may call detachEventBase()).
1585   EventBase* originalEventBase = eventBase_;
1586
1587   // Call the connect callback.
1588   if (connectCallback_) {
1589     ConnectCallback* callback = connectCallback_;
1590     connectCallback_ = nullptr;
1591     callback->connectSuccess();
1592   }
1593
1594   // Note that the connect callback may have changed our state.
1595   // (set or unset the read callback, called write(), closed the socket, etc.)
1596   // The following code needs to handle these situations correctly.
1597   //
1598   // If the socket has been closed, readCallback_ and writeReqHead_ will
1599   // always be nullptr, so that will prevent us from trying to read or write.
1600   //
1601   // The main thing to check for is if eventBase_ is still originalEventBase.
1602   // If not, we have been detached from this event base, so we shouldn't
1603   // perform any more operations.
1604   if (eventBase_ != originalEventBase) {
1605     return;
1606   }
1607
1608   handleInitialReadWrite();
1609 }
1610
1611 void AsyncSocket::timeoutExpired() noexcept {
1612   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1613           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1614   DestructorGuard dg(this);
1615   assert(eventBase_->isInEventBaseThread());
1616
1617   if (state_ == StateEnum::CONNECTING) {
1618     // connect() timed out
1619     // Unregister for I/O events.
1620     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1621                            "connect timed out");
1622     failConnect(__func__, ex);
1623   } else {
1624     // a normal write operation timed out
1625     assert(state_ == StateEnum::ESTABLISHED);
1626     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1627     failWrite(__func__, ex);
1628   }
1629 }
1630
1631 ssize_t AsyncSocket::performWrite(const iovec* vec,
1632                                    uint32_t count,
1633                                    WriteFlags flags,
1634                                    uint32_t* countWritten,
1635                                    uint32_t* partialWritten) {
1636   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1637   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1638   // (since it may terminate the program if the main program doesn't explicitly
1639   // ignore it).
1640   struct msghdr msg;
1641   msg.msg_name = nullptr;
1642   msg.msg_namelen = 0;
1643   msg.msg_iov = const_cast<iovec *>(vec);
1644 #ifdef IOV_MAX // not defined on Android
1645   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1646 #else
1647   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1648 #endif
1649   msg.msg_control = nullptr;
1650   msg.msg_controllen = 0;
1651   msg.msg_flags = 0;
1652
1653   int msg_flags = MSG_DONTWAIT;
1654
1655 #ifdef MSG_NOSIGNAL // Linux-only
1656   msg_flags |= MSG_NOSIGNAL;
1657   if (isSet(flags, WriteFlags::CORK)) {
1658     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1659     // give it the rest of the data rather than immediately sending a partial
1660     // frame, even when TCP_NODELAY is enabled.
1661     msg_flags |= MSG_MORE;
1662   }
1663 #endif
1664   if (isSet(flags, WriteFlags::EOR)) {
1665     // marks that this is the last byte of a record (response)
1666     msg_flags |= MSG_EOR;
1667   }
1668   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1669   if (totalWritten < 0) {
1670     if (errno == EAGAIN) {
1671       // TCP buffer is full; we can't write any more data right now.
1672       *countWritten = 0;
1673       *partialWritten = 0;
1674       return 0;
1675     }
1676     // error
1677     *countWritten = 0;
1678     *partialWritten = 0;
1679     return -1;
1680   }
1681
1682   appBytesWritten_ += totalWritten;
1683
1684   uint32_t bytesWritten;
1685   uint32_t n;
1686   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1687     const iovec* v = vec + n;
1688     if (v->iov_len > bytesWritten) {
1689       // Partial write finished in the middle of this iovec
1690       *countWritten = n;
1691       *partialWritten = bytesWritten;
1692       return totalWritten;
1693     }
1694
1695     bytesWritten -= v->iov_len;
1696   }
1697
1698   assert(bytesWritten == 0);
1699   *countWritten = n;
1700   *partialWritten = 0;
1701   return totalWritten;
1702 }
1703
1704 /**
1705  * Re-register the EventHandler after eventFlags_ has changed.
1706  *
1707  * If an error occurs, fail() is called to move the socket into the error state
1708  * and call all currently installed callbacks.  After an error, the
1709  * AsyncSocket is completely unregistered.
1710  *
1711  * @return Returns true on succcess, or false on error.
1712  */
1713 bool AsyncSocket::updateEventRegistration() {
1714   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1715           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1716           << ", events=" << std::hex << eventFlags_;
1717   assert(eventBase_->isInEventBaseThread());
1718   if (eventFlags_ == EventHandler::NONE) {
1719     ioHandler_.unregisterHandler();
1720     return true;
1721   }
1722
1723   // Always register for persistent events, so we don't have to re-register
1724   // after being called back.
1725   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1726     eventFlags_ = EventHandler::NONE; // we're not registered after error
1727     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1728         withAddr("failed to update AsyncSocket event registration"));
1729     fail("updateEventRegistration", ex);
1730     return false;
1731   }
1732
1733   return true;
1734 }
1735
1736 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1737                                            uint16_t disable) {
1738   uint16_t oldFlags = eventFlags_;
1739   eventFlags_ |= enable;
1740   eventFlags_ &= ~disable;
1741   if (eventFlags_ == oldFlags) {
1742     return true;
1743   } else {
1744     return updateEventRegistration();
1745   }
1746 }
1747
1748 void AsyncSocket::startFail() {
1749   // startFail() should only be called once
1750   assert(state_ != StateEnum::ERROR);
1751   assert(getDestructorGuardCount() > 0);
1752   state_ = StateEnum::ERROR;
1753   // Ensure that SHUT_READ and SHUT_WRITE are set,
1754   // so all future attempts to read or write will be rejected
1755   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1756
1757   if (eventFlags_ != EventHandler::NONE) {
1758     eventFlags_ = EventHandler::NONE;
1759     ioHandler_.unregisterHandler();
1760   }
1761   writeTimeout_.cancelTimeout();
1762
1763   if (fd_ >= 0) {
1764     ioHandler_.changeHandlerFD(-1);
1765     doClose();
1766   }
1767 }
1768
1769 void AsyncSocket::finishFail() {
1770   assert(state_ == StateEnum::ERROR);
1771   assert(getDestructorGuardCount() > 0);
1772
1773   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1774                          withAddr("socket closing after error"));
1775   if (connectCallback_) {
1776     ConnectCallback* callback = connectCallback_;
1777     connectCallback_ = nullptr;
1778     callback->connectErr(ex);
1779   }
1780
1781   failAllWrites(ex);
1782
1783   if (readCallback_) {
1784     ReadCallback* callback = readCallback_;
1785     readCallback_ = nullptr;
1786     callback->readErr(ex);
1787   }
1788 }
1789
1790 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1791   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1792              << state_ << " host=" << addr_.describe()
1793              << "): failed in " << fn << "(): "
1794              << ex.what();
1795   startFail();
1796   finishFail();
1797 }
1798
1799 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1800   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1801                << state_ << " host=" << addr_.describe()
1802                << "): failed while connecting in " << fn << "(): "
1803                << ex.what();
1804   startFail();
1805
1806   if (connectCallback_ != nullptr) {
1807     ConnectCallback* callback = connectCallback_;
1808     connectCallback_ = nullptr;
1809     callback->connectErr(ex);
1810   }
1811
1812   finishFail();
1813 }
1814
1815 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1816   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1817                << state_ << " host=" << addr_.describe()
1818                << "): failed while reading in " << fn << "(): "
1819                << ex.what();
1820   startFail();
1821
1822   if (readCallback_ != nullptr) {
1823     ReadCallback* callback = readCallback_;
1824     readCallback_ = nullptr;
1825     callback->readErr(ex);
1826   }
1827
1828   finishFail();
1829 }
1830
1831 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1832   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1833                << state_ << " host=" << addr_.describe()
1834                << "): failed while writing in " << fn << "(): "
1835                << ex.what();
1836   startFail();
1837
1838   // Only invoke the first write callback, since the error occurred while
1839   // writing this request.  Let any other pending write callbacks be invoked in
1840   // finishFail().
1841   if (writeReqHead_ != nullptr) {
1842     WriteRequest* req = writeReqHead_;
1843     writeReqHead_ = req->getNext();
1844     WriteCallback* callback = req->getCallback();
1845     uint32_t bytesWritten = req->getTotalBytesWritten();
1846     req->destroy();
1847     if (callback) {
1848       callback->writeErr(bytesWritten, ex);
1849     }
1850   }
1851
1852   finishFail();
1853 }
1854
1855 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1856                              size_t bytesWritten,
1857                              const AsyncSocketException& ex) {
1858   // This version of failWrite() is used when the failure occurs before
1859   // we've added the callback to writeReqHead_.
1860   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1861              << state_ << " host=" << addr_.describe()
1862              <<"): failed while writing in " << fn << "(): "
1863              << ex.what();
1864   startFail();
1865
1866   if (callback != nullptr) {
1867     callback->writeErr(bytesWritten, ex);
1868   }
1869
1870   finishFail();
1871 }
1872
1873 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1874   // Invoke writeError() on all write callbacks.
1875   // This is used when writes are forcibly shutdown with write requests
1876   // pending, or when an error occurs with writes pending.
1877   while (writeReqHead_ != nullptr) {
1878     WriteRequest* req = writeReqHead_;
1879     writeReqHead_ = req->getNext();
1880     WriteCallback* callback = req->getCallback();
1881     if (callback) {
1882       callback->writeErr(req->getTotalBytesWritten(), ex);
1883     }
1884     req->destroy();
1885   }
1886 }
1887
1888 void AsyncSocket::invalidState(ConnectCallback* callback) {
1889   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1890              << "): connect() called in invalid state " << state_;
1891
1892   /*
1893    * The invalidState() methods don't use the normal failure mechanisms,
1894    * since we don't know what state we are in.  We don't want to call
1895    * startFail()/finishFail() recursively if we are already in the middle of
1896    * cleaning up.
1897    */
1898
1899   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1900                          "connect() called with socket in invalid state");
1901   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1902     if (callback) {
1903       callback->connectErr(ex);
1904     }
1905   } else {
1906     // We can't use failConnect() here since connectCallback_
1907     // may already be set to another callback.  Invoke this ConnectCallback
1908     // here; any other connectCallback_ will be invoked in finishFail()
1909     startFail();
1910     if (callback) {
1911       callback->connectErr(ex);
1912     }
1913     finishFail();
1914   }
1915 }
1916
1917 void AsyncSocket::invalidState(ReadCallback* callback) {
1918   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1919              << "): setReadCallback(" << callback
1920              << ") called in invalid state " << state_;
1921
1922   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1923                          "setReadCallback() called with socket in "
1924                          "invalid state");
1925   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1926     if (callback) {
1927       callback->readErr(ex);
1928     }
1929   } else {
1930     startFail();
1931     if (callback) {
1932       callback->readErr(ex);
1933     }
1934     finishFail();
1935   }
1936 }
1937
1938 void AsyncSocket::invalidState(WriteCallback* callback) {
1939   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1940              << "): write() called in invalid state " << state_;
1941
1942   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1943                          withAddr("write() called with socket in invalid state"));
1944   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1945     if (callback) {
1946       callback->writeErr(0, ex);
1947     }
1948   } else {
1949     startFail();
1950     if (callback) {
1951       callback->writeErr(0, ex);
1952     }
1953     finishFail();
1954   }
1955 }
1956
1957 void AsyncSocket::doClose() {
1958   if (fd_ == -1) return;
1959   if (shutdownSocketSet_) {
1960     shutdownSocketSet_->close(fd_);
1961   } else {
1962     ::close(fd_);
1963   }
1964   fd_ = -1;
1965 }
1966
1967 std::ostream& operator << (std::ostream& os,
1968                            const AsyncSocket::StateEnum& state) {
1969   os << static_cast<int>(state);
1970   return os;
1971 }
1972
1973 std::string AsyncSocket::withAddr(const std::string& s) {
1974   // Don't use addr_ directly because it may not be initialized
1975   // e.g. if constructed from fd
1976   folly::SocketAddress peer, local;
1977   try {
1978     getPeerAddress(&peer);
1979     getLocalAddress(&local);
1980   } catch (const std::exception&) {
1981     // ignore
1982   } catch (...) {
1983     // ignore
1984   }
1985   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
1986 }
1987
1988 } // folly