86eba2ff39d1153a7322b4ea90c7c2d8dc52d218
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(AsyncSocket* socket,
67                                        WriteCallback* callback,
68                                        const iovec* ops,
69                                        uint32_t opCount,
70                                        uint32_t partialWritten,
71                                        uint32_t bytesWritten,
72                                        unique_ptr<IOBuf>&& ioBuf,
73                                        WriteFlags flags) {
74     assert(opCount > 0);
75     // Since we put a variable size iovec array at the end
76     // of each BytesWriteRequest, we have to manually allocate the memory.
77     void* buf = malloc(sizeof(BytesWriteRequest) +
78                        (opCount * sizeof(struct iovec)));
79     if (buf == nullptr) {
80       throw std::bad_alloc();
81     }
82
83     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
84                                       partialWritten, bytesWritten,
85                                       std::move(ioBuf), flags);
86   }
87
88   void destroy() override {
89     this->~BytesWriteRequest();
90     free(this);
91   }
92
93   bool performWrite() override {
94     WriteFlags writeFlags = flags_;
95     if (getNext() != nullptr) {
96       writeFlags = writeFlags | WriteFlags::CORK;
97     }
98     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
99                                           &opsWritten_, &partialBytes_);
100     return bytesWritten_ >= 0;
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
301                                withAddr("failed to set close-on-exec flag"),
302                                errno);
303   }
304 }
305
306 void AsyncSocket::connect(ConnectCallback* callback,
307                            const folly::SocketAddress& address,
308                            int timeout,
309                            const OptionMap &options,
310                            const folly::SocketAddress& bindAddr) noexcept {
311   DestructorGuard dg(this);
312   assert(eventBase_->isInEventBaseThread());
313
314   addr_ = address;
315
316   // Make sure we're in the uninitialized state
317   if (state_ != StateEnum::UNINIT) {
318     return invalidState(callback);
319   }
320
321   connectStartTime_ = std::chrono::steady_clock::now();
322   // Make connect end time at least >= connectStartTime.
323   connectEndTime_ = connectStartTime_;
324
325   assert(fd_ == -1);
326   state_ = StateEnum::CONNECTING;
327   connectCallback_ = callback;
328
329   sockaddr_storage addrStorage;
330   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
331
332   try {
333     // Create the socket
334     // Technically the first parameter should actually be a protocol family
335     // constant (PF_xxx) rather than an address family (AF_xxx), but the
336     // distinction is mainly just historical.  In pretty much all
337     // implementations the PF_foo and AF_foo constants are identical.
338     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
339     if (fd_ < 0) {
340       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
341                                 withAddr("failed to create socket"), errno);
342     }
343     if (shutdownSocketSet_) {
344       shutdownSocketSet_->add(fd_);
345     }
346     ioHandler_.changeHandlerFD(fd_);
347
348     setCloseOnExec();
349
350     // Put the socket in non-blocking mode
351     int flags = fcntl(fd_, F_GETFL, 0);
352     if (flags == -1) {
353       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
354                                 withAddr("failed to get socket flags"), errno);
355     }
356     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
357     if (rv == -1) {
358       throw AsyncSocketException(
359           AsyncSocketException::INTERNAL_ERROR,
360           withAddr("failed to put socket in non-blocking mode"),
361           errno);
362     }
363
364 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
365     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
366     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
367     if (rv == -1) {
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           "failed to enable F_SETNOSIGPIPE on socket",
371           errno);
372     }
373 #endif
374
375     // By default, turn on TCP_NODELAY
376     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
377     // setNoDelay() will log an error message if it fails.
378     if (address.getFamily() != AF_UNIX) {
379       (void)setNoDelay(true);
380     }
381
382     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
383             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
384
385     // bind the socket
386     if (bindAddr != anyAddress()) {
387       int one = 1;
388       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
389         doClose();
390         throw AsyncSocketException(
391           AsyncSocketException::NOT_OPEN,
392           "failed to setsockopt prior to bind on " + bindAddr.describe(),
393           errno);
394       }
395
396       bindAddr.getAddress(&addrStorage);
397
398       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
399         doClose();
400         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
401                                   "failed to bind to async socket: " +
402                                   bindAddr.describe(),
403                                   errno);
404       }
405     }
406
407     // Apply the additional options if any.
408     for (const auto& opt: options) {
409       int rv = opt.first.apply(fd_, opt.second);
410       if (rv != 0) {
411         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
412                                   withAddr("failed to set socket option"),
413                                   errno);
414       }
415     }
416
417     // Perform the connect()
418     address.getAddress(&addrStorage);
419
420     rv = ::connect(fd_, saddr, address.getActualSize());
421     if (rv < 0) {
422       if (errno == EINPROGRESS) {
423         // Connection in progress.
424         if (timeout > 0) {
425           // Start a timer in case the connection takes too long.
426           if (!writeTimeout_.scheduleTimeout(timeout)) {
427             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
428                 withAddr("failed to schedule AsyncSocket connect timeout"));
429           }
430         }
431
432         // Register for write events, so we'll
433         // be notified when the connection finishes/fails.
434         // Note that we don't register for a persistent event here.
435         assert(eventFlags_ == EventHandler::NONE);
436         eventFlags_ = EventHandler::WRITE;
437         if (!ioHandler_.registerHandler(eventFlags_)) {
438           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
439               withAddr("failed to register AsyncSocket connect handler"));
440         }
441         return;
442       } else {
443         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
444                                   "connect failed (immediately)", errno);
445       }
446     }
447
448     // If we're still here the connect() succeeded immediately.
449     // Fall through to call the callback outside of this try...catch block
450   } catch (const AsyncSocketException& ex) {
451     return failConnect(__func__, ex);
452   } catch (const std::exception& ex) {
453     // shouldn't happen, but handle it just in case
454     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
455                << "): unexpected " << typeid(ex).name() << " exception: "
456                << ex.what();
457     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
458                             withAddr(string("unexpected exception: ") +
459                                      ex.what()));
460     return failConnect(__func__, tex);
461   }
462
463   // The connection succeeded immediately
464   // The read callback may not have been set yet, and no writes may be pending
465   // yet, so we don't have to register for any events at the moment.
466   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
467   assert(readCallback_ == nullptr);
468   assert(writeReqHead_ == nullptr);
469   state_ = StateEnum::ESTABLISHED;
470   invokeConnectSuccess();
471 }
472
473 void AsyncSocket::connect(ConnectCallback* callback,
474                            const string& ip, uint16_t port,
475                            int timeout,
476                            const OptionMap &options) noexcept {
477   DestructorGuard dg(this);
478   try {
479     connectCallback_ = callback;
480     connect(callback, folly::SocketAddress(ip, port), timeout, options);
481   } catch (const std::exception& ex) {
482     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
483                             ex.what());
484     return failConnect(__func__, tex);
485   }
486 }
487
488 void AsyncSocket::cancelConnect() {
489   connectCallback_ = nullptr;
490   if (state_ == StateEnum::CONNECTING) {
491     closeNow();
492   }
493 }
494
495 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
496   sendTimeout_ = milliseconds;
497   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
498
499   // If we are currently pending on write requests, immediately update
500   // writeTimeout_ with the new value.
501   if ((eventFlags_ & EventHandler::WRITE) &&
502       (state_ != StateEnum::CONNECTING)) {
503     assert(state_ == StateEnum::ESTABLISHED);
504     assert((shutdownFlags_ & SHUT_WRITE) == 0);
505     if (sendTimeout_ > 0) {
506       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
507         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
508             withAddr("failed to reschedule send timeout in setSendTimeout"));
509         return failWrite(__func__, ex);
510       }
511     } else {
512       writeTimeout_.cancelTimeout();
513     }
514   }
515 }
516
517 void AsyncSocket::setReadCB(ReadCallback *callback) {
518   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
519           << ", callback=" << callback << ", state=" << state_;
520
521   // Short circuit if callback is the same as the existing readCallback_.
522   //
523   // Note that this is needed for proper functioning during some cleanup cases.
524   // During cleanup we allow setReadCallback(nullptr) to be called even if the
525   // read callback is already unset and we have been detached from an event
526   // base.  This check prevents us from asserting
527   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
528   if (callback == readCallback_) {
529     return;
530   }
531
532   /* We are removing a read callback */
533   if (callback == nullptr &&
534       immediateReadHandler_.isLoopCallbackScheduled()) {
535     immediateReadHandler_.cancelLoopCallback();
536   }
537
538   if (shutdownFlags_ & SHUT_READ) {
539     // Reads have already been shut down on this socket.
540     //
541     // Allow setReadCallback(nullptr) to be called in this case, but don't
542     // allow a new callback to be set.
543     //
544     // For example, setReadCallback(nullptr) can happen after an error if we
545     // invoke some other error callback before invoking readError().  The other
546     // error callback that is invoked first may go ahead and clear the read
547     // callback before we get a chance to invoke readError().
548     if (callback != nullptr) {
549       return invalidState(callback);
550     }
551     assert((eventFlags_ & EventHandler::READ) == 0);
552     readCallback_ = nullptr;
553     return;
554   }
555
556   DestructorGuard dg(this);
557   assert(eventBase_->isInEventBaseThread());
558
559   switch ((StateEnum)state_) {
560     case StateEnum::CONNECTING:
561       // For convenience, we allow the read callback to be set while we are
562       // still connecting.  We just store the callback for now.  Once the
563       // connection completes we'll register for read events.
564       readCallback_ = callback;
565       return;
566     case StateEnum::ESTABLISHED:
567     {
568       readCallback_ = callback;
569       uint16_t oldFlags = eventFlags_;
570       if (readCallback_) {
571         eventFlags_ |= EventHandler::READ;
572       } else {
573         eventFlags_ &= ~EventHandler::READ;
574       }
575
576       // Update our registration if our flags have changed
577       if (eventFlags_ != oldFlags) {
578         // We intentionally ignore the return value here.
579         // updateEventRegistration() will move us into the error state if it
580         // fails, and we don't need to do anything else here afterwards.
581         (void)updateEventRegistration();
582       }
583
584       if (readCallback_) {
585         checkForImmediateRead();
586       }
587       return;
588     }
589     case StateEnum::CLOSED:
590     case StateEnum::ERROR:
591       // We should never reach here.  SHUT_READ should always be set
592       // if we are in STATE_CLOSED or STATE_ERROR.
593       assert(false);
594       return invalidState(callback);
595     case StateEnum::UNINIT:
596       // We do not allow setReadCallback() to be called before we start
597       // connecting.
598       return invalidState(callback);
599   }
600
601   // We don't put a default case in the switch statement, so that the compiler
602   // will warn us to update the switch statement if a new state is added.
603   return invalidState(callback);
604 }
605
606 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
607   return readCallback_;
608 }
609
610 void AsyncSocket::write(WriteCallback* callback,
611                          const void* buf, size_t bytes, WriteFlags flags) {
612   iovec op;
613   op.iov_base = const_cast<void*>(buf);
614   op.iov_len = bytes;
615   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
616 }
617
618 void AsyncSocket::writev(WriteCallback* callback,
619                           const iovec* vec,
620                           size_t count,
621                           WriteFlags flags) {
622   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
623 }
624
625 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
626                               WriteFlags flags) {
627   constexpr size_t kSmallSizeMax = 64;
628   size_t count = buf->countChainElements();
629   if (count <= kSmallSizeMax) {
630     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632   } else {
633     iovec* vec = new iovec[count];
634     writeChainImpl(callback, vec, count, std::move(buf), flags);
635     delete[] vec;
636   }
637 }
638
639 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
640     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
641   size_t veclen = buf->fillIov(vec, count);
642   writeImpl(callback, vec, veclen, std::move(buf), flags);
643 }
644
645 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
646                              size_t count, unique_ptr<IOBuf>&& buf,
647                              WriteFlags flags) {
648   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
649           << ", callback=" << callback << ", count=" << count
650           << ", state=" << state_;
651   DestructorGuard dg(this);
652   unique_ptr<IOBuf>ioBuf(std::move(buf));
653   assert(eventBase_->isInEventBaseThread());
654
655   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
656     // No new writes may be performed after the write side of the socket has
657     // been shutdown.
658     //
659     // We could just call callback->writeError() here to fail just this write.
660     // However, fail hard and use invalidState() to fail all outstanding
661     // callbacks and move the socket into the error state.  There's most likely
662     // a bug in the caller's code, so we abort everything rather than trying to
663     // proceed as best we can.
664     return invalidState(callback);
665   }
666
667   uint32_t countWritten = 0;
668   uint32_t partialWritten = 0;
669   int bytesWritten = 0;
670   bool mustRegister = false;
671   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
672     if (writeReqHead_ == nullptr) {
673       // If we are established and there are no other writes pending,
674       // we can attempt to perform the write immediately.
675       assert(writeReqTail_ == nullptr);
676       assert((eventFlags_ & EventHandler::WRITE) == 0);
677
678       bytesWritten = performWrite(vec, count, flags,
679                                   &countWritten, &partialWritten);
680       if (bytesWritten < 0) {
681         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
682                                withAddr("writev failed"), errno);
683         return failWrite(__func__, callback, 0, ex);
684       } else if (countWritten == count) {
685         // We successfully wrote everything.
686         // Invoke the callback and return.
687         if (callback) {
688           callback->writeSuccess();
689         }
690         return;
691       } // else { continue writing the next writeReq }
692       mustRegister = true;
693     }
694   } else if (!connecting()) {
695     // Invalid state for writing
696     return invalidState(callback);
697   }
698
699   // Create a new WriteRequest to add to the queue
700   WriteRequest* req;
701   try {
702     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
703                                         count - countWritten, partialWritten,
704                                         bytesWritten, std::move(ioBuf), flags);
705   } catch (const std::exception& ex) {
706     // we mainly expect to catch std::bad_alloc here
707     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
708         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
709     return failWrite(__func__, callback, bytesWritten, tex);
710   }
711   req->consume();
712   if (writeReqTail_ == nullptr) {
713     assert(writeReqHead_ == nullptr);
714     writeReqHead_ = writeReqTail_ = req;
715   } else {
716     writeReqTail_->append(req);
717     writeReqTail_ = req;
718   }
719
720   // Register for write events if are established and not currently
721   // waiting on write events
722   if (mustRegister) {
723     assert(state_ == StateEnum::ESTABLISHED);
724     assert((eventFlags_ & EventHandler::WRITE) == 0);
725     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
726       assert(state_ == StateEnum::ERROR);
727       return;
728     }
729     if (sendTimeout_ > 0) {
730       // Schedule a timeout to fire if the write takes too long.
731       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
732         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
733                                withAddr("failed to schedule send timeout"));
734         return failWrite(__func__, ex);
735       }
736     }
737   }
738 }
739
740 void AsyncSocket::writeRequest(WriteRequest* req) {
741   if (writeReqTail_ == nullptr) {
742     assert(writeReqHead_ == nullptr);
743     writeReqHead_ = writeReqTail_ = req;
744     req->start();
745   } else {
746     writeReqTail_->append(req);
747     writeReqTail_ = req;
748   }
749 }
750
751 void AsyncSocket::close() {
752   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
753           << ", state=" << state_ << ", shutdownFlags="
754           << std::hex << (int) shutdownFlags_;
755
756   // close() is only different from closeNow() when there are pending writes
757   // that need to drain before we can close.  In all other cases, just call
758   // closeNow().
759   //
760   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
761   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
762   // is still running.  (e.g., If there are multiple pending writes, and we
763   // call writeError() on the first one, it may call close().  In this case we
764   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
765   // writes will still be in the queue.)
766   //
767   // We only need to drain pending writes if we are still in STATE_CONNECTING
768   // or STATE_ESTABLISHED
769   if ((writeReqHead_ == nullptr) ||
770       !(state_ == StateEnum::CONNECTING ||
771       state_ == StateEnum::ESTABLISHED)) {
772     closeNow();
773     return;
774   }
775
776   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
777   // destroyed until close() returns.
778   DestructorGuard dg(this);
779   assert(eventBase_->isInEventBaseThread());
780
781   // Since there are write requests pending, we have to set the
782   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
783   // connect finishes and we finish writing these requests.
784   //
785   // Set SHUT_READ to indicate that reads are shut down, and set the
786   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
787   // pending writes complete.
788   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
789
790   // If a read callback is set, invoke readEOF() immediately to inform it that
791   // the socket has been closed and no more data can be read.
792   if (readCallback_) {
793     // Disable reads if they are enabled
794     if (!updateEventRegistration(0, EventHandler::READ)) {
795       // We're now in the error state; callbacks have been cleaned up
796       assert(state_ == StateEnum::ERROR);
797       assert(readCallback_ == nullptr);
798     } else {
799       ReadCallback* callback = readCallback_;
800       readCallback_ = nullptr;
801       callback->readEOF();
802     }
803   }
804 }
805
806 void AsyncSocket::closeNow() {
807   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
808           << ", state=" << state_ << ", shutdownFlags="
809           << std::hex << (int) shutdownFlags_;
810   DestructorGuard dg(this);
811   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
812
813   switch (state_) {
814     case StateEnum::ESTABLISHED:
815     case StateEnum::CONNECTING:
816     {
817       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
818       state_ = StateEnum::CLOSED;
819
820       // If the write timeout was set, cancel it.
821       writeTimeout_.cancelTimeout();
822
823       // If we are registered for I/O events, unregister.
824       if (eventFlags_ != EventHandler::NONE) {
825         eventFlags_ = EventHandler::NONE;
826         if (!updateEventRegistration()) {
827           // We will have been moved into the error state.
828           assert(state_ == StateEnum::ERROR);
829           return;
830         }
831       }
832
833       if (immediateReadHandler_.isLoopCallbackScheduled()) {
834         immediateReadHandler_.cancelLoopCallback();
835       }
836
837       if (fd_ >= 0) {
838         ioHandler_.changeHandlerFD(-1);
839         doClose();
840       }
841
842       invokeConnectErr(socketClosedLocallyEx);
843
844       failAllWrites(socketClosedLocallyEx);
845
846       if (readCallback_) {
847         ReadCallback* callback = readCallback_;
848         readCallback_ = nullptr;
849         callback->readEOF();
850       }
851       return;
852     }
853     case StateEnum::CLOSED:
854       // Do nothing.  It's possible that we are being called recursively
855       // from inside a callback that we invoked inside another call to close()
856       // that is still running.
857       return;
858     case StateEnum::ERROR:
859       // Do nothing.  The error handling code has performed (or is performing)
860       // cleanup.
861       return;
862     case StateEnum::UNINIT:
863       assert(eventFlags_ == EventHandler::NONE);
864       assert(connectCallback_ == nullptr);
865       assert(readCallback_ == nullptr);
866       assert(writeReqHead_ == nullptr);
867       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
868       state_ = StateEnum::CLOSED;
869       return;
870   }
871
872   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
873               << ") called in unknown state " << state_;
874 }
875
876 void AsyncSocket::closeWithReset() {
877   // Enable SO_LINGER, with the linger timeout set to 0.
878   // This will trigger a TCP reset when we close the socket.
879   if (fd_ >= 0) {
880     struct linger optLinger = {1, 0};
881     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
882       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
883               << "on " << fd_ << ": errno=" << errno;
884     }
885   }
886
887   // Then let closeNow() take care of the rest
888   closeNow();
889 }
890
891 void AsyncSocket::shutdownWrite() {
892   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
893           << ", state=" << state_ << ", shutdownFlags="
894           << std::hex << (int) shutdownFlags_;
895
896   // If there are no pending writes, shutdownWrite() is identical to
897   // shutdownWriteNow().
898   if (writeReqHead_ == nullptr) {
899     shutdownWriteNow();
900     return;
901   }
902
903   assert(eventBase_->isInEventBaseThread());
904
905   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
906   // shutdown will be performed once all writes complete.
907   shutdownFlags_ |= SHUT_WRITE_PENDING;
908 }
909
910 void AsyncSocket::shutdownWriteNow() {
911   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
912           << ", fd=" << fd_ << ", state=" << state_
913           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
914
915   if (shutdownFlags_ & SHUT_WRITE) {
916     // Writes are already shutdown; nothing else to do.
917     return;
918   }
919
920   // If SHUT_READ is already set, just call closeNow() to completely
921   // close the socket.  This can happen if close() was called with writes
922   // pending, and then shutdownWriteNow() is called before all pending writes
923   // complete.
924   if (shutdownFlags_ & SHUT_READ) {
925     closeNow();
926     return;
927   }
928
929   DestructorGuard dg(this);
930   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
931
932   switch (static_cast<StateEnum>(state_)) {
933     case StateEnum::ESTABLISHED:
934     {
935       shutdownFlags_ |= SHUT_WRITE;
936
937       // If the write timeout was set, cancel it.
938       writeTimeout_.cancelTimeout();
939
940       // If we are registered for write events, unregister.
941       if (!updateEventRegistration(0, EventHandler::WRITE)) {
942         // We will have been moved into the error state.
943         assert(state_ == StateEnum::ERROR);
944         return;
945       }
946
947       // Shutdown writes on the file descriptor
948       ::shutdown(fd_, SHUT_WR);
949
950       // Immediately fail all write requests
951       failAllWrites(socketShutdownForWritesEx);
952       return;
953     }
954     case StateEnum::CONNECTING:
955     {
956       // Set the SHUT_WRITE_PENDING flag.
957       // When the connection completes, it will check this flag,
958       // shutdown the write half of the socket, and then set SHUT_WRITE.
959       shutdownFlags_ |= SHUT_WRITE_PENDING;
960
961       // Immediately fail all write requests
962       failAllWrites(socketShutdownForWritesEx);
963       return;
964     }
965     case StateEnum::UNINIT:
966       // Callers normally shouldn't call shutdownWriteNow() before the socket
967       // even starts connecting.  Nonetheless, go ahead and set
968       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
969       // immediately shut down the write side of the socket.
970       shutdownFlags_ |= SHUT_WRITE_PENDING;
971       return;
972     case StateEnum::CLOSED:
973     case StateEnum::ERROR:
974       // We should never get here.  SHUT_WRITE should always be set
975       // in STATE_CLOSED and STATE_ERROR.
976       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
977                  << ", fd=" << fd_ << ") in unexpected state " << state_
978                  << " with SHUT_WRITE not set ("
979                  << std::hex << (int) shutdownFlags_ << ")";
980       assert(false);
981       return;
982   }
983
984   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
985               << fd_ << ") called in unknown state " << state_;
986 }
987
988 bool AsyncSocket::readable() const {
989   if (fd_ == -1) {
990     return false;
991   }
992   struct pollfd fds[1];
993   fds[0].fd = fd_;
994   fds[0].events = POLLIN;
995   fds[0].revents = 0;
996   int rc = poll(fds, 1, 0);
997   return rc == 1;
998 }
999
1000 bool AsyncSocket::isPending() const {
1001   return ioHandler_.isPending();
1002 }
1003
1004 bool AsyncSocket::hangup() const {
1005   if (fd_ == -1) {
1006     // sanity check, no one should ask for hangup if we are not connected.
1007     assert(false);
1008     return false;
1009   }
1010 #ifdef POLLRDHUP // Linux-only
1011   struct pollfd fds[1];
1012   fds[0].fd = fd_;
1013   fds[0].events = POLLRDHUP|POLLHUP;
1014   fds[0].revents = 0;
1015   poll(fds, 1, 0);
1016   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1017 #else
1018   return false;
1019 #endif
1020 }
1021
1022 bool AsyncSocket::good() const {
1023   return ((state_ == StateEnum::CONNECTING ||
1024           state_ == StateEnum::ESTABLISHED) &&
1025           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1026 }
1027
1028 bool AsyncSocket::error() const {
1029   return (state_ == StateEnum::ERROR);
1030 }
1031
1032 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1033   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1034           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1035           << ", state=" << state_ << ", events="
1036           << std::hex << eventFlags_ << ")";
1037   assert(eventBase_ == nullptr);
1038   assert(eventBase->isInEventBaseThread());
1039
1040   eventBase_ = eventBase;
1041   ioHandler_.attachEventBase(eventBase);
1042   writeTimeout_.attachEventBase(eventBase);
1043 }
1044
1045 void AsyncSocket::detachEventBase() {
1046   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1047           << ", old evb=" << eventBase_ << ", state=" << state_
1048           << ", events=" << std::hex << eventFlags_ << ")";
1049   assert(eventBase_ != nullptr);
1050   assert(eventBase_->isInEventBaseThread());
1051
1052   eventBase_ = nullptr;
1053   ioHandler_.detachEventBase();
1054   writeTimeout_.detachEventBase();
1055 }
1056
1057 bool AsyncSocket::isDetachable() const {
1058   DCHECK(eventBase_ != nullptr);
1059   DCHECK(eventBase_->isInEventBaseThread());
1060
1061   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1062 }
1063
1064 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1065   address->setFromLocalAddress(fd_);
1066 }
1067
1068 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1069   if (!addr_.isInitialized()) {
1070     addr_.setFromPeerAddress(fd_);
1071   }
1072   *address = addr_;
1073 }
1074
1075 int AsyncSocket::setNoDelay(bool noDelay) {
1076   if (fd_ < 0) {
1077     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1078                << this << "(state=" << state_ << ")";
1079     return EINVAL;
1080
1081   }
1082
1083   int value = noDelay ? 1 : 0;
1084   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1085     int errnoCopy = errno;
1086     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1087             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1088             << strerror(errnoCopy);
1089     return errnoCopy;
1090   }
1091
1092   return 0;
1093 }
1094
1095 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1096
1097   #ifndef TCP_CONGESTION
1098   #define TCP_CONGESTION  13
1099   #endif
1100
1101   if (fd_ < 0) {
1102     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1103                << "socket " << this << "(state=" << state_ << ")";
1104     return EINVAL;
1105
1106   }
1107
1108   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1109         cname.length() + 1) != 0) {
1110     int errnoCopy = errno;
1111     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1112             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1113             << strerror(errnoCopy);
1114     return errnoCopy;
1115   }
1116
1117   return 0;
1118 }
1119
1120 int AsyncSocket::setQuickAck(bool quickack) {
1121   if (fd_ < 0) {
1122     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1123                << this << "(state=" << state_ << ")";
1124     return EINVAL;
1125
1126   }
1127
1128 #ifdef TCP_QUICKACK // Linux-only
1129   int value = quickack ? 1 : 0;
1130   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1131     int errnoCopy = errno;
1132     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1133             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1134             << strerror(errnoCopy);
1135     return errnoCopy;
1136   }
1137
1138   return 0;
1139 #else
1140   return ENOSYS;
1141 #endif
1142 }
1143
1144 int AsyncSocket::setSendBufSize(size_t bufsize) {
1145   if (fd_ < 0) {
1146     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1147                << this << "(state=" << state_ << ")";
1148     return EINVAL;
1149   }
1150
1151   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1152     int errnoCopy = errno;
1153     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1154             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1155             << strerror(errnoCopy);
1156     return errnoCopy;
1157   }
1158
1159   return 0;
1160 }
1161
1162 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1163   if (fd_ < 0) {
1164     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1165                << this << "(state=" << state_ << ")";
1166     return EINVAL;
1167   }
1168
1169   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1170     int errnoCopy = errno;
1171     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1172             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1173             << strerror(errnoCopy);
1174     return errnoCopy;
1175   }
1176
1177   return 0;
1178 }
1179
1180 int AsyncSocket::setTCPProfile(int profd) {
1181   if (fd_ < 0) {
1182     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1183                << this << "(state=" << state_ << ")";
1184     return EINVAL;
1185   }
1186
1187   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1188     int errnoCopy = errno;
1189     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1190             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1191             << strerror(errnoCopy);
1192     return errnoCopy;
1193   }
1194
1195   return 0;
1196 }
1197
1198 void AsyncSocket::ioReady(uint16_t events) noexcept {
1199   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1200           << ", events=" << std::hex << events << ", state=" << state_;
1201   DestructorGuard dg(this);
1202   assert(events & EventHandler::READ_WRITE);
1203   assert(eventBase_->isInEventBaseThread());
1204
1205   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1206   if (relevantEvents == EventHandler::READ) {
1207     handleRead();
1208   } else if (relevantEvents == EventHandler::WRITE) {
1209     handleWrite();
1210   } else if (relevantEvents == EventHandler::READ_WRITE) {
1211     EventBase* originalEventBase = eventBase_;
1212     // If both read and write events are ready, process writes first.
1213     handleWrite();
1214
1215     // Return now if handleWrite() detached us from our EventBase
1216     if (eventBase_ != originalEventBase) {
1217       return;
1218     }
1219
1220     // Only call handleRead() if a read callback is still installed.
1221     // (It's possible that the read callback was uninstalled during
1222     // handleWrite().)
1223     if (readCallback_) {
1224       handleRead();
1225     }
1226   } else {
1227     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1228                << std::hex << events << "(this=" << this << ")";
1229     abort();
1230   }
1231 }
1232
1233 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1234   VLOG(5) << "AsyncSocket::performRead() this=" << this
1235           << ", buf=" << *buf << ", buflen=" << *buflen;
1236
1237   int recvFlags = 0;
1238   if (peek_) {
1239     recvFlags |= MSG_PEEK;
1240   }
1241
1242   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1243   if (bytes < 0) {
1244     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1245       // No more data to read right now.
1246       return READ_BLOCKING;
1247     } else {
1248       return READ_ERROR;
1249     }
1250   } else {
1251     appBytesReceived_ += bytes;
1252     return bytes;
1253   }
1254 }
1255
1256 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1257   // no matter what, buffer should be preapared for non-ssl socket
1258   CHECK(readCallback_);
1259   readCallback_->getReadBuffer(buf, buflen);
1260 }
1261
1262 void AsyncSocket::handleRead() noexcept {
1263   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1264           << ", state=" << state_;
1265   assert(state_ == StateEnum::ESTABLISHED);
1266   assert((shutdownFlags_ & SHUT_READ) == 0);
1267   assert(readCallback_ != nullptr);
1268   assert(eventFlags_ & EventHandler::READ);
1269
1270   // Loop until:
1271   // - a read attempt would block
1272   // - readCallback_ is uninstalled
1273   // - the number of loop iterations exceeds the optional maximum
1274   // - this AsyncSocket is moved to another EventBase
1275   //
1276   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1277   // which is why need to check for it here.
1278   //
1279   // The last bullet point is slightly subtle.  readDataAvailable() may also
1280   // detach this socket from this EventBase.  However, before
1281   // readDataAvailable() returns another thread may pick it up, attach it to
1282   // a different EventBase, and install another readCallback_.  We need to
1283   // exit immediately after readDataAvailable() returns if the eventBase_ has
1284   // changed.  (The caller must perform some sort of locking to transfer the
1285   // AsyncSocket between threads properly.  This will be sufficient to ensure
1286   // that this thread sees the updated eventBase_ variable after
1287   // readDataAvailable() returns.)
1288   uint16_t numReads = 0;
1289   EventBase* originalEventBase = eventBase_;
1290   while (readCallback_ && eventBase_ == originalEventBase) {
1291     // Get the buffer to read into.
1292     void* buf = nullptr;
1293     size_t buflen = 0, offset = 0;
1294     try {
1295       prepareReadBuffer(&buf, &buflen);
1296       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1297     } catch (const AsyncSocketException& ex) {
1298       return failRead(__func__, ex);
1299     } catch (const std::exception& ex) {
1300       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1301                               string("ReadCallback::getReadBuffer() "
1302                                      "threw exception: ") +
1303                               ex.what());
1304       return failRead(__func__, tex);
1305     } catch (...) {
1306       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1307                              "ReadCallback::getReadBuffer() threw "
1308                              "non-exception type");
1309       return failRead(__func__, ex);
1310     }
1311     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1312       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1313                              "ReadCallback::getReadBuffer() returned "
1314                              "empty buffer");
1315       return failRead(__func__, ex);
1316     }
1317
1318     // Perform the read
1319     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1320     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1321             << bytesRead << " bytes";
1322     if (bytesRead > 0) {
1323       if (!isBufferMovable_) {
1324         readCallback_->readDataAvailable(bytesRead);
1325       } else {
1326         CHECK(kOpenSslModeMoveBufferOwnership);
1327         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1328                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1329                 << ", offset=" << offset;
1330         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1331         readBuf->trimStart(offset);
1332         readBuf->trimEnd(buflen - offset - bytesRead);
1333         readCallback_->readBufferAvailable(std::move(readBuf));
1334       }
1335
1336       // Fall through and continue around the loop if the read
1337       // completely filled the available buffer.
1338       // Note that readCallback_ may have been uninstalled or changed inside
1339       // readDataAvailable().
1340       if (size_t(bytesRead) < buflen) {
1341         return;
1342       }
1343     } else if (bytesRead == READ_BLOCKING) {
1344         // No more data to read right now.
1345         return;
1346     } else if (bytesRead == READ_ERROR) {
1347       readErr_ = READ_ERROR;
1348       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1349                              withAddr("recv() failed"), errno);
1350       return failRead(__func__, ex);
1351     } else {
1352       assert(bytesRead == READ_EOF);
1353       readErr_ = READ_EOF;
1354       // EOF
1355       shutdownFlags_ |= SHUT_READ;
1356       if (!updateEventRegistration(0, EventHandler::READ)) {
1357         // we've already been moved into STATE_ERROR
1358         assert(state_ == StateEnum::ERROR);
1359         assert(readCallback_ == nullptr);
1360         return;
1361       }
1362
1363       ReadCallback* callback = readCallback_;
1364       readCallback_ = nullptr;
1365       callback->readEOF();
1366       return;
1367     }
1368     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1369       if (readCallback_ != nullptr) {
1370         // We might still have data in the socket.
1371         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1372         scheduleImmediateRead();
1373       }
1374       return;
1375     }
1376   }
1377 }
1378
1379 /**
1380  * This function attempts to write as much data as possible, until no more data
1381  * can be written.
1382  *
1383  * - If it sends all available data, it unregisters for write events, and stops
1384  *   the writeTimeout_.
1385  *
1386  * - If not all of the data can be sent immediately, it reschedules
1387  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1388  *   registered for write events.
1389  */
1390 void AsyncSocket::handleWrite() noexcept {
1391   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1392           << ", state=" << state_;
1393   if (state_ == StateEnum::CONNECTING) {
1394     handleConnect();
1395     return;
1396   }
1397
1398   // Normal write
1399   assert(state_ == StateEnum::ESTABLISHED);
1400   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1401   assert(writeReqHead_ != nullptr);
1402
1403   // Loop until we run out of write requests,
1404   // or until this socket is moved to another EventBase.
1405   // (See the comment in handleRead() explaining how this can happen.)
1406   EventBase* originalEventBase = eventBase_;
1407   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1408     if (!writeReqHead_->performWrite()) {
1409       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1410                              withAddr("writev() failed"), errno);
1411       return failWrite(__func__, ex);
1412     } else if (writeReqHead_->isComplete()) {
1413       // We finished this request
1414       WriteRequest* req = writeReqHead_;
1415       writeReqHead_ = req->getNext();
1416
1417       if (writeReqHead_ == nullptr) {
1418         writeReqTail_ = nullptr;
1419         // This is the last write request.
1420         // Unregister for write events and cancel the send timer
1421         // before we invoke the callback.  We have to update the state properly
1422         // before calling the callback, since it may want to detach us from
1423         // the EventBase.
1424         if (eventFlags_ & EventHandler::WRITE) {
1425           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1426             assert(state_ == StateEnum::ERROR);
1427             return;
1428           }
1429           // Stop the send timeout
1430           writeTimeout_.cancelTimeout();
1431         }
1432         assert(!writeTimeout_.isScheduled());
1433
1434         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1435         // we finish sending the last write request.
1436         //
1437         // We have to do this before invoking writeSuccess(), since
1438         // writeSuccess() may detach us from our EventBase.
1439         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1440           assert(connectCallback_ == nullptr);
1441           shutdownFlags_ |= SHUT_WRITE;
1442
1443           if (shutdownFlags_ & SHUT_READ) {
1444             // Reads have already been shutdown.  Fully close the socket and
1445             // move to STATE_CLOSED.
1446             //
1447             // Note: This code currently moves us to STATE_CLOSED even if
1448             // close() hasn't ever been called.  This can occur if we have
1449             // received EOF from the peer and shutdownWrite() has been called
1450             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1451             // case, until close() is actually called?  I can't think of a
1452             // reason why we would need to do so.  No other operations besides
1453             // calling close() or destroying the socket can be performed at
1454             // this point.
1455             assert(readCallback_ == nullptr);
1456             state_ = StateEnum::CLOSED;
1457             if (fd_ >= 0) {
1458               ioHandler_.changeHandlerFD(-1);
1459               doClose();
1460             }
1461           } else {
1462             // Reads are still enabled, so we are only doing a half-shutdown
1463             ::shutdown(fd_, SHUT_WR);
1464           }
1465         }
1466       }
1467
1468       // Invoke the callback
1469       WriteCallback* callback = req->getCallback();
1470       req->destroy();
1471       if (callback) {
1472         callback->writeSuccess();
1473       }
1474       // We'll continue around the loop, trying to write another request
1475     } else {
1476       // Partial write.
1477       writeReqHead_->consume();
1478       // Stop after a partial write; it's highly likely that a subsequent write
1479       // attempt will just return EAGAIN.
1480       //
1481       // Ensure that we are registered for write events.
1482       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1483         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1484           assert(state_ == StateEnum::ERROR);
1485           return;
1486         }
1487       }
1488
1489       // Reschedule the send timeout, since we have made some write progress.
1490       if (sendTimeout_ > 0) {
1491         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1492           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1493               withAddr("failed to reschedule write timeout"));
1494           return failWrite(__func__, ex);
1495         }
1496       }
1497       return;
1498     }
1499   }
1500 }
1501
1502 void AsyncSocket::checkForImmediateRead() noexcept {
1503   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1504   // (However, note that some subclasses do override this method.)
1505   //
1506   // Simply calling handleRead() here would be bad, as this would call
1507   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1508   // buffer even though no data may be available.  This would waste lots of
1509   // memory, since the buffer will sit around unused until the socket actually
1510   // becomes readable.
1511   //
1512   // Checking if the socket is readable now also seems like it would probably
1513   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1514   // would just waste an extra system call.  Even if it is readable, waiting to
1515   // find out from libevent on the next event loop doesn't seem that bad.
1516 }
1517
1518 void AsyncSocket::handleInitialReadWrite() noexcept {
1519   // Our callers should already be holding a DestructorGuard, but grab
1520   // one here just to make sure, in case one of our calling code paths ever
1521   // changes.
1522   DestructorGuard dg(this);
1523
1524   // If we have a readCallback_, make sure we enable read events.  We
1525   // may already be registered for reads if connectSuccess() set
1526   // the read calback.
1527   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1528     assert(state_ == StateEnum::ESTABLISHED);
1529     assert((shutdownFlags_ & SHUT_READ) == 0);
1530     if (!updateEventRegistration(EventHandler::READ, 0)) {
1531       assert(state_ == StateEnum::ERROR);
1532       return;
1533     }
1534     checkForImmediateRead();
1535   } else if (readCallback_ == nullptr) {
1536     // Unregister for read events.
1537     updateEventRegistration(0, EventHandler::READ);
1538   }
1539
1540   // If we have write requests pending, try to send them immediately.
1541   // Since we just finished accepting, there is a very good chance that we can
1542   // write without blocking.
1543   //
1544   // However, we only process them if EventHandler::WRITE is not already set,
1545   // which means that we're already blocked on a write attempt.  (This can
1546   // happen if connectSuccess() called write() before returning.)
1547   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1548     // Call handleWrite() to perform write processing.
1549     handleWrite();
1550   } else if (writeReqHead_ == nullptr) {
1551     // Unregister for write event.
1552     updateEventRegistration(0, EventHandler::WRITE);
1553   }
1554 }
1555
1556 void AsyncSocket::handleConnect() noexcept {
1557   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1558           << ", state=" << state_;
1559   assert(state_ == StateEnum::CONNECTING);
1560   // SHUT_WRITE can never be set while we are still connecting;
1561   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1562   // finishes
1563   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1564
1565   // In case we had a connect timeout, cancel the timeout
1566   writeTimeout_.cancelTimeout();
1567   // We don't use a persistent registration when waiting on a connect event,
1568   // so we have been automatically unregistered now.  Update eventFlags_ to
1569   // reflect reality.
1570   assert(eventFlags_ == EventHandler::WRITE);
1571   eventFlags_ = EventHandler::NONE;
1572
1573   // Call getsockopt() to check if the connect succeeded
1574   int error;
1575   socklen_t len = sizeof(error);
1576   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1577   if (rv != 0) {
1578     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1579                            withAddr("error calling getsockopt() after connect"),
1580                            errno);
1581     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1582                << fd_ << " host=" << addr_.describe()
1583                << ") exception:" << ex.what();
1584     return failConnect(__func__, ex);
1585   }
1586
1587   if (error != 0) {
1588     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1589                            "connect failed", error);
1590     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1591             << fd_ << " host=" << addr_.describe()
1592             << ") exception: " << ex.what();
1593     return failConnect(__func__, ex);
1594   }
1595
1596   // Move into STATE_ESTABLISHED
1597   state_ = StateEnum::ESTABLISHED;
1598
1599   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1600   // perform, immediately shutdown the write half of the socket.
1601   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1602     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1603     // are still connecting we just abort the connect rather than waiting for
1604     // it to complete.
1605     assert((shutdownFlags_ & SHUT_READ) == 0);
1606     ::shutdown(fd_, SHUT_WR);
1607     shutdownFlags_ |= SHUT_WRITE;
1608   }
1609
1610   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1611           << "successfully connected; state=" << state_;
1612
1613   // Remember the EventBase we are attached to, before we start invoking any
1614   // callbacks (since the callbacks may call detachEventBase()).
1615   EventBase* originalEventBase = eventBase_;
1616
1617   invokeConnectSuccess();
1618   // Note that the connect callback may have changed our state.
1619   // (set or unset the read callback, called write(), closed the socket, etc.)
1620   // The following code needs to handle these situations correctly.
1621   //
1622   // If the socket has been closed, readCallback_ and writeReqHead_ will
1623   // always be nullptr, so that will prevent us from trying to read or write.
1624   //
1625   // The main thing to check for is if eventBase_ is still originalEventBase.
1626   // If not, we have been detached from this event base, so we shouldn't
1627   // perform any more operations.
1628   if (eventBase_ != originalEventBase) {
1629     return;
1630   }
1631
1632   handleInitialReadWrite();
1633 }
1634
1635 void AsyncSocket::timeoutExpired() noexcept {
1636   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1637           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1638   DestructorGuard dg(this);
1639   assert(eventBase_->isInEventBaseThread());
1640
1641   if (state_ == StateEnum::CONNECTING) {
1642     // connect() timed out
1643     // Unregister for I/O events.
1644     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1645                            "connect timed out");
1646     failConnect(__func__, ex);
1647   } else {
1648     // a normal write operation timed out
1649     assert(state_ == StateEnum::ESTABLISHED);
1650     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1651     failWrite(__func__, ex);
1652   }
1653 }
1654
1655 ssize_t AsyncSocket::performWrite(const iovec* vec,
1656                                    uint32_t count,
1657                                    WriteFlags flags,
1658                                    uint32_t* countWritten,
1659                                    uint32_t* partialWritten) {
1660   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1661   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1662   // (since it may terminate the program if the main program doesn't explicitly
1663   // ignore it).
1664   struct msghdr msg;
1665   msg.msg_name = nullptr;
1666   msg.msg_namelen = 0;
1667   msg.msg_iov = const_cast<iovec *>(vec);
1668 #ifdef IOV_MAX // not defined on Android
1669   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1670 #else
1671   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1672 #endif
1673   msg.msg_control = nullptr;
1674   msg.msg_controllen = 0;
1675   msg.msg_flags = 0;
1676
1677   int msg_flags = MSG_DONTWAIT;
1678
1679 #ifdef MSG_NOSIGNAL // Linux-only
1680   msg_flags |= MSG_NOSIGNAL;
1681   if (isSet(flags, WriteFlags::CORK)) {
1682     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1683     // give it the rest of the data rather than immediately sending a partial
1684     // frame, even when TCP_NODELAY is enabled.
1685     msg_flags |= MSG_MORE;
1686   }
1687 #endif
1688   if (isSet(flags, WriteFlags::EOR)) {
1689     // marks that this is the last byte of a record (response)
1690     msg_flags |= MSG_EOR;
1691   }
1692   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1693   if (totalWritten < 0) {
1694     if (errno == EAGAIN) {
1695       // TCP buffer is full; we can't write any more data right now.
1696       *countWritten = 0;
1697       *partialWritten = 0;
1698       return 0;
1699     }
1700     // error
1701     *countWritten = 0;
1702     *partialWritten = 0;
1703     return -1;
1704   }
1705
1706   appBytesWritten_ += totalWritten;
1707
1708   uint32_t bytesWritten;
1709   uint32_t n;
1710   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1711     const iovec* v = vec + n;
1712     if (v->iov_len > bytesWritten) {
1713       // Partial write finished in the middle of this iovec
1714       *countWritten = n;
1715       *partialWritten = bytesWritten;
1716       return totalWritten;
1717     }
1718
1719     bytesWritten -= v->iov_len;
1720   }
1721
1722   assert(bytesWritten == 0);
1723   *countWritten = n;
1724   *partialWritten = 0;
1725   return totalWritten;
1726 }
1727
1728 /**
1729  * Re-register the EventHandler after eventFlags_ has changed.
1730  *
1731  * If an error occurs, fail() is called to move the socket into the error state
1732  * and call all currently installed callbacks.  After an error, the
1733  * AsyncSocket is completely unregistered.
1734  *
1735  * @return Returns true on succcess, or false on error.
1736  */
1737 bool AsyncSocket::updateEventRegistration() {
1738   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1739           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1740           << ", events=" << std::hex << eventFlags_;
1741   assert(eventBase_->isInEventBaseThread());
1742   if (eventFlags_ == EventHandler::NONE) {
1743     ioHandler_.unregisterHandler();
1744     return true;
1745   }
1746
1747   // Always register for persistent events, so we don't have to re-register
1748   // after being called back.
1749   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1750     eventFlags_ = EventHandler::NONE; // we're not registered after error
1751     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1752         withAddr("failed to update AsyncSocket event registration"));
1753     fail("updateEventRegistration", ex);
1754     return false;
1755   }
1756
1757   return true;
1758 }
1759
1760 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1761                                            uint16_t disable) {
1762   uint16_t oldFlags = eventFlags_;
1763   eventFlags_ |= enable;
1764   eventFlags_ &= ~disable;
1765   if (eventFlags_ == oldFlags) {
1766     return true;
1767   } else {
1768     return updateEventRegistration();
1769   }
1770 }
1771
1772 void AsyncSocket::startFail() {
1773   // startFail() should only be called once
1774   assert(state_ != StateEnum::ERROR);
1775   assert(getDestructorGuardCount() > 0);
1776   state_ = StateEnum::ERROR;
1777   // Ensure that SHUT_READ and SHUT_WRITE are set,
1778   // so all future attempts to read or write will be rejected
1779   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1780
1781   if (eventFlags_ != EventHandler::NONE) {
1782     eventFlags_ = EventHandler::NONE;
1783     ioHandler_.unregisterHandler();
1784   }
1785   writeTimeout_.cancelTimeout();
1786
1787   if (fd_ >= 0) {
1788     ioHandler_.changeHandlerFD(-1);
1789     doClose();
1790   }
1791 }
1792
1793 void AsyncSocket::finishFail() {
1794   assert(state_ == StateEnum::ERROR);
1795   assert(getDestructorGuardCount() > 0);
1796
1797   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1798                          withAddr("socket closing after error"));
1799   invokeConnectErr(ex);
1800   failAllWrites(ex);
1801
1802   if (readCallback_) {
1803     ReadCallback* callback = readCallback_;
1804     readCallback_ = nullptr;
1805     callback->readErr(ex);
1806   }
1807 }
1808
1809 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1810   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1811              << state_ << " host=" << addr_.describe()
1812              << "): failed in " << fn << "(): "
1813              << ex.what();
1814   startFail();
1815   finishFail();
1816 }
1817
1818 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1819   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1820                << state_ << " host=" << addr_.describe()
1821                << "): failed while connecting in " << fn << "(): "
1822                << ex.what();
1823   startFail();
1824
1825   invokeConnectErr(ex);
1826   finishFail();
1827 }
1828
1829 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1830   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1831                << state_ << " host=" << addr_.describe()
1832                << "): failed while reading in " << fn << "(): "
1833                << ex.what();
1834   startFail();
1835
1836   if (readCallback_ != nullptr) {
1837     ReadCallback* callback = readCallback_;
1838     readCallback_ = nullptr;
1839     callback->readErr(ex);
1840   }
1841
1842   finishFail();
1843 }
1844
1845 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1846   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1847                << state_ << " host=" << addr_.describe()
1848                << "): failed while writing in " << fn << "(): "
1849                << ex.what();
1850   startFail();
1851
1852   // Only invoke the first write callback, since the error occurred while
1853   // writing this request.  Let any other pending write callbacks be invoked in
1854   // finishFail().
1855   if (writeReqHead_ != nullptr) {
1856     WriteRequest* req = writeReqHead_;
1857     writeReqHead_ = req->getNext();
1858     WriteCallback* callback = req->getCallback();
1859     uint32_t bytesWritten = req->getTotalBytesWritten();
1860     req->destroy();
1861     if (callback) {
1862       callback->writeErr(bytesWritten, ex);
1863     }
1864   }
1865
1866   finishFail();
1867 }
1868
1869 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1870                              size_t bytesWritten,
1871                              const AsyncSocketException& ex) {
1872   // This version of failWrite() is used when the failure occurs before
1873   // we've added the callback to writeReqHead_.
1874   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1875              << state_ << " host=" << addr_.describe()
1876              <<"): failed while writing in " << fn << "(): "
1877              << ex.what();
1878   startFail();
1879
1880   if (callback != nullptr) {
1881     callback->writeErr(bytesWritten, ex);
1882   }
1883
1884   finishFail();
1885 }
1886
1887 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1888   // Invoke writeError() on all write callbacks.
1889   // This is used when writes are forcibly shutdown with write requests
1890   // pending, or when an error occurs with writes pending.
1891   while (writeReqHead_ != nullptr) {
1892     WriteRequest* req = writeReqHead_;
1893     writeReqHead_ = req->getNext();
1894     WriteCallback* callback = req->getCallback();
1895     if (callback) {
1896       callback->writeErr(req->getTotalBytesWritten(), ex);
1897     }
1898     req->destroy();
1899   }
1900 }
1901
1902 void AsyncSocket::invalidState(ConnectCallback* callback) {
1903   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1904              << "): connect() called in invalid state " << state_;
1905
1906   /*
1907    * The invalidState() methods don't use the normal failure mechanisms,
1908    * since we don't know what state we are in.  We don't want to call
1909    * startFail()/finishFail() recursively if we are already in the middle of
1910    * cleaning up.
1911    */
1912
1913   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1914                          "connect() called with socket in invalid state");
1915   connectEndTime_ = std::chrono::steady_clock::now();
1916   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1917     if (callback) {
1918       callback->connectErr(ex);
1919     }
1920   } else {
1921     // We can't use failConnect() here since connectCallback_
1922     // may already be set to another callback.  Invoke this ConnectCallback
1923     // here; any other connectCallback_ will be invoked in finishFail()
1924     startFail();
1925     if (callback) {
1926       callback->connectErr(ex);
1927     }
1928     finishFail();
1929   }
1930 }
1931
1932 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1933   connectEndTime_ = std::chrono::steady_clock::now();
1934   if (connectCallback_) {
1935     ConnectCallback* callback = connectCallback_;
1936     connectCallback_ = nullptr;
1937     callback->connectErr(ex);
1938   }
1939 }
1940
1941 void AsyncSocket::invokeConnectSuccess() {
1942   connectEndTime_ = std::chrono::steady_clock::now();
1943   if (connectCallback_) {
1944     ConnectCallback* callback = connectCallback_;
1945     connectCallback_ = nullptr;
1946     callback->connectSuccess();
1947   }
1948 }
1949
1950 void AsyncSocket::invalidState(ReadCallback* callback) {
1951   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1952              << "): setReadCallback(" << callback
1953              << ") called in invalid state " << state_;
1954
1955   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1956                          "setReadCallback() called with socket in "
1957                          "invalid state");
1958   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1959     if (callback) {
1960       callback->readErr(ex);
1961     }
1962   } else {
1963     startFail();
1964     if (callback) {
1965       callback->readErr(ex);
1966     }
1967     finishFail();
1968   }
1969 }
1970
1971 void AsyncSocket::invalidState(WriteCallback* callback) {
1972   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1973              << "): write() called in invalid state " << state_;
1974
1975   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1976                          withAddr("write() called with socket in invalid state"));
1977   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1978     if (callback) {
1979       callback->writeErr(0, ex);
1980     }
1981   } else {
1982     startFail();
1983     if (callback) {
1984       callback->writeErr(0, ex);
1985     }
1986     finishFail();
1987   }
1988 }
1989
1990 void AsyncSocket::doClose() {
1991   if (fd_ == -1) return;
1992   if (shutdownSocketSet_) {
1993     shutdownSocketSet_->close(fd_);
1994   } else {
1995     ::close(fd_);
1996   }
1997   fd_ = -1;
1998 }
1999
2000 std::ostream& operator << (std::ostream& os,
2001                            const AsyncSocket::StateEnum& state) {
2002   os << static_cast<int>(state);
2003   return os;
2004 }
2005
2006 std::string AsyncSocket::withAddr(const std::string& s) {
2007   // Don't use addr_ directly because it may not be initialized
2008   // e.g. if constructed from fd
2009   folly::SocketAddress peer, local;
2010   try {
2011     getPeerAddress(&peer);
2012     getLocalAddress(&local);
2013   } catch (const std::exception&) {
2014     // ignore
2015   } catch (...) {
2016     // ignore
2017   }
2018   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2019 }
2020
2021 } // folly