Add a buffer callback to AsyncSocket
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(
67       AsyncSocket* socket,
68       WriteCallback* callback,
69       const iovec* ops,
70       uint32_t opCount,
71       uint32_t partialWritten,
72       uint32_t bytesWritten,
73       unique_ptr<IOBuf>&& ioBuf,
74       WriteFlags flags,
75       BufferCallback* bufferCallback = nullptr) {
76     assert(opCount > 0);
77     // Since we put a variable size iovec array at the end
78     // of each BytesWriteRequest, we have to manually allocate the memory.
79     void* buf = malloc(sizeof(BytesWriteRequest) +
80                        (opCount * sizeof(struct iovec)));
81     if (buf == nullptr) {
82       throw std::bad_alloc();
83     }
84
85     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
86                                       partialWritten, bytesWritten,
87                                       std::move(ioBuf), flags, bufferCallback);
88   }
89
90   void destroy() override {
91     this->~BytesWriteRequest();
92     free(this);
93   }
94
95   bool performWrite() override {
96     WriteFlags writeFlags = flags_;
97     if (getNext() != nullptr) {
98       writeFlags = writeFlags | WriteFlags::CORK;
99     }
100     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
101                                           &opsWritten_, &partialBytes_);
102     return bytesWritten_ >= 0;
103   }
104
105   bool isComplete() override {
106     return opsWritten_ == getOpCount();
107   }
108
109   void consume() override {
110     // Advance opIndex_ forward by opsWritten_
111     opIndex_ += opsWritten_;
112     assert(opIndex_ < opCount_);
113
114     // If we've finished writing any IOBufs, release them
115     if (ioBuf_) {
116       for (uint32_t i = opsWritten_; i != 0; --i) {
117         assert(ioBuf_);
118         ioBuf_ = ioBuf_->pop();
119       }
120     }
121
122     // Move partialBytes_ forward into the current iovec buffer
123     struct iovec* currentOp = writeOps_ + opIndex_;
124     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
125     currentOp->iov_base =
126       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
127     currentOp->iov_len -= partialBytes_;
128
129     // Increment the totalBytesWritten_ count by bytesWritten_;
130     totalBytesWritten_ += bytesWritten_;
131   }
132
133  private:
134   BytesWriteRequest(AsyncSocket* socket,
135                     WriteCallback* callback,
136                     const struct iovec* ops,
137                     uint32_t opCount,
138                     uint32_t partialBytes,
139                     uint32_t bytesWritten,
140                     unique_ptr<IOBuf>&& ioBuf,
141                     WriteFlags flags,
142                     BufferCallback* bufferCallback = nullptr)
143     : AsyncSocket::WriteRequest(socket, callback, bufferCallback)
144     , opCount_(opCount)
145     , opIndex_(0)
146     , flags_(flags)
147     , ioBuf_(std::move(ioBuf))
148     , opsWritten_(0)
149     , partialBytes_(partialBytes)
150     , bytesWritten_(bytesWritten) {
151     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
152   }
153
154   // private destructor, to ensure callers use destroy()
155   ~BytesWriteRequest() override = default;
156
157   const struct iovec* getOps() const {
158     assert(opCount_ > opIndex_);
159     return writeOps_ + opIndex_;
160   }
161
162   uint32_t getOpCount() const {
163     assert(opCount_ > opIndex_);
164     return opCount_ - opIndex_;
165   }
166
167   uint32_t opCount_;            ///< number of entries in writeOps_
168   uint32_t opIndex_;            ///< current index into writeOps_
169   WriteFlags flags_;            ///< set for WriteFlags
170   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
171
172   // for consume(), how much we wrote on the last write
173   uint32_t opsWritten_;         ///< complete ops written
174   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
175   ssize_t bytesWritten_;        ///< bytes written altogether
176
177   struct iovec writeOps_[];     ///< write operation(s) list
178 };
179
180 AsyncSocket::AsyncSocket()
181   : eventBase_(nullptr)
182   , writeTimeout_(this, nullptr)
183   , ioHandler_(this, nullptr)
184   , immediateReadHandler_(this) {
185   VLOG(5) << "new AsyncSocket()";
186   init();
187 }
188
189 AsyncSocket::AsyncSocket(EventBase* evb)
190   : eventBase_(evb)
191   , writeTimeout_(this, evb)
192   , ioHandler_(this, evb)
193   , immediateReadHandler_(this) {
194   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
195   init();
196 }
197
198 AsyncSocket::AsyncSocket(EventBase* evb,
199                            const folly::SocketAddress& address,
200                            uint32_t connectTimeout)
201   : AsyncSocket(evb) {
202   connect(nullptr, address, connectTimeout);
203 }
204
205 AsyncSocket::AsyncSocket(EventBase* evb,
206                            const std::string& ip,
207                            uint16_t port,
208                            uint32_t connectTimeout)
209   : AsyncSocket(evb) {
210   connect(nullptr, ip, port, connectTimeout);
211 }
212
213 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
214   : eventBase_(evb)
215   , writeTimeout_(this, evb)
216   , ioHandler_(this, evb, fd)
217   , immediateReadHandler_(this) {
218   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
219           << fd << ")";
220   init();
221   fd_ = fd;
222   setCloseOnExec();
223   state_ = StateEnum::ESTABLISHED;
224 }
225
226 // init() method, since constructor forwarding isn't supported in most
227 // compilers yet.
228 void AsyncSocket::init() {
229   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
230   shutdownFlags_ = 0;
231   state_ = StateEnum::UNINIT;
232   eventFlags_ = EventHandler::NONE;
233   fd_ = -1;
234   sendTimeout_ = 0;
235   maxReadsPerEvent_ = 16;
236   connectCallback_ = nullptr;
237   readCallback_ = nullptr;
238   writeReqHead_ = nullptr;
239   writeReqTail_ = nullptr;
240   shutdownSocketSet_ = nullptr;
241   appBytesWritten_ = 0;
242   appBytesReceived_ = 0;
243 }
244
245 AsyncSocket::~AsyncSocket() {
246   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
247           << ", evb=" << eventBase_ << ", fd=" << fd_
248           << ", state=" << state_ << ")";
249 }
250
251 void AsyncSocket::destroy() {
252   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
253           << ", fd=" << fd_ << ", state=" << state_;
254   // When destroy is called, close the socket immediately
255   closeNow();
256
257   // Then call DelayedDestruction::destroy() to take care of
258   // whether or not we need immediate or delayed destruction
259   DelayedDestruction::destroy();
260 }
261
262 int AsyncSocket::detachFd() {
263   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
264           << ", evb=" << eventBase_ << ", state=" << state_
265           << ", events=" << std::hex << eventFlags_ << ")";
266   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
267   // actually close the descriptor.
268   if (shutdownSocketSet_) {
269     shutdownSocketSet_->remove(fd_);
270   }
271   int fd = fd_;
272   fd_ = -1;
273   // Call closeNow() to invoke all pending callbacks with an error.
274   closeNow();
275   // Update the EventHandler to stop using this fd.
276   // This can only be done after closeNow() unregisters the handler.
277   ioHandler_.changeHandlerFD(-1);
278   return fd;
279 }
280
281 const folly::SocketAddress& AsyncSocket::anyAddress() {
282   static const folly::SocketAddress anyAddress =
283     folly::SocketAddress("0.0.0.0", 0);
284   return anyAddress;
285 }
286
287 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
288   if (shutdownSocketSet_ == newSS) {
289     return;
290   }
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->remove(fd_);
293   }
294   shutdownSocketSet_ = newSS;
295   if (shutdownSocketSet_ && fd_ != -1) {
296     shutdownSocketSet_->add(fd_);
297   }
298 }
299
300 void AsyncSocket::setCloseOnExec() {
301   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
302   if (rv != 0) {
303     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
304                                withAddr("failed to set close-on-exec flag"),
305                                errno);
306   }
307 }
308
309 void AsyncSocket::connect(ConnectCallback* callback,
310                            const folly::SocketAddress& address,
311                            int timeout,
312                            const OptionMap &options,
313                            const folly::SocketAddress& bindAddr) noexcept {
314   DestructorGuard dg(this);
315   assert(eventBase_->isInEventBaseThread());
316
317   addr_ = address;
318
319   // Make sure we're in the uninitialized state
320   if (state_ != StateEnum::UNINIT) {
321     return invalidState(callback);
322   }
323
324   connectStartTime_ = std::chrono::steady_clock::now();
325   // Make connect end time at least >= connectStartTime.
326   connectEndTime_ = connectStartTime_;
327
328   assert(fd_ == -1);
329   state_ = StateEnum::CONNECTING;
330   connectCallback_ = callback;
331
332   sockaddr_storage addrStorage;
333   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
334
335   try {
336     // Create the socket
337     // Technically the first parameter should actually be a protocol family
338     // constant (PF_xxx) rather than an address family (AF_xxx), but the
339     // distinction is mainly just historical.  In pretty much all
340     // implementations the PF_foo and AF_foo constants are identical.
341     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
342     if (fd_ < 0) {
343       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
344                                 withAddr("failed to create socket"), errno);
345     }
346     if (shutdownSocketSet_) {
347       shutdownSocketSet_->add(fd_);
348     }
349     ioHandler_.changeHandlerFD(fd_);
350
351     setCloseOnExec();
352
353     // Put the socket in non-blocking mode
354     int flags = fcntl(fd_, F_GETFL, 0);
355     if (flags == -1) {
356       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
357                                 withAddr("failed to get socket flags"), errno);
358     }
359     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
360     if (rv == -1) {
361       throw AsyncSocketException(
362           AsyncSocketException::INTERNAL_ERROR,
363           withAddr("failed to put socket in non-blocking mode"),
364           errno);
365     }
366
367 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
368     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
369     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
370     if (rv == -1) {
371       throw AsyncSocketException(
372           AsyncSocketException::INTERNAL_ERROR,
373           "failed to enable F_SETNOSIGPIPE on socket",
374           errno);
375     }
376 #endif
377
378     // By default, turn on TCP_NODELAY
379     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
380     // setNoDelay() will log an error message if it fails.
381     if (address.getFamily() != AF_UNIX) {
382       (void)setNoDelay(true);
383     }
384
385     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
386             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
387
388     // bind the socket
389     if (bindAddr != anyAddress()) {
390       int one = 1;
391       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
392         doClose();
393         throw AsyncSocketException(
394           AsyncSocketException::NOT_OPEN,
395           "failed to setsockopt prior to bind on " + bindAddr.describe(),
396           errno);
397       }
398
399       bindAddr.getAddress(&addrStorage);
400
401       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
402         doClose();
403         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
404                                   "failed to bind to async socket: " +
405                                   bindAddr.describe(),
406                                   errno);
407       }
408     }
409
410     // Apply the additional options if any.
411     for (const auto& opt: options) {
412       int rv = opt.first.apply(fd_, opt.second);
413       if (rv != 0) {
414         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
415                                   withAddr("failed to set socket option"),
416                                   errno);
417       }
418     }
419
420     // Perform the connect()
421     address.getAddress(&addrStorage);
422
423     rv = ::connect(fd_, saddr, address.getActualSize());
424     if (rv < 0) {
425       if (errno == EINPROGRESS) {
426         // Connection in progress.
427         if (timeout > 0) {
428           // Start a timer in case the connection takes too long.
429           if (!writeTimeout_.scheduleTimeout(timeout)) {
430             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
431                 withAddr("failed to schedule AsyncSocket connect timeout"));
432           }
433         }
434
435         // Register for write events, so we'll
436         // be notified when the connection finishes/fails.
437         // Note that we don't register for a persistent event here.
438         assert(eventFlags_ == EventHandler::NONE);
439         eventFlags_ = EventHandler::WRITE;
440         if (!ioHandler_.registerHandler(eventFlags_)) {
441           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
442               withAddr("failed to register AsyncSocket connect handler"));
443         }
444         return;
445       } else {
446         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
447                                   "connect failed (immediately)", errno);
448       }
449     }
450
451     // If we're still here the connect() succeeded immediately.
452     // Fall through to call the callback outside of this try...catch block
453   } catch (const AsyncSocketException& ex) {
454     return failConnect(__func__, ex);
455   } catch (const std::exception& ex) {
456     // shouldn't happen, but handle it just in case
457     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
458                << "): unexpected " << typeid(ex).name() << " exception: "
459                << ex.what();
460     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
461                             withAddr(string("unexpected exception: ") +
462                                      ex.what()));
463     return failConnect(__func__, tex);
464   }
465
466   // The connection succeeded immediately
467   // The read callback may not have been set yet, and no writes may be pending
468   // yet, so we don't have to register for any events at the moment.
469   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
470   assert(readCallback_ == nullptr);
471   assert(writeReqHead_ == nullptr);
472   state_ = StateEnum::ESTABLISHED;
473   invokeConnectSuccess();
474 }
475
476 void AsyncSocket::connect(ConnectCallback* callback,
477                            const string& ip, uint16_t port,
478                            int timeout,
479                            const OptionMap &options) noexcept {
480   DestructorGuard dg(this);
481   try {
482     connectCallback_ = callback;
483     connect(callback, folly::SocketAddress(ip, port), timeout, options);
484   } catch (const std::exception& ex) {
485     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
486                             ex.what());
487     return failConnect(__func__, tex);
488   }
489 }
490
491 void AsyncSocket::cancelConnect() {
492   connectCallback_ = nullptr;
493   if (state_ == StateEnum::CONNECTING) {
494     closeNow();
495   }
496 }
497
498 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
499   sendTimeout_ = milliseconds;
500   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
501
502   // If we are currently pending on write requests, immediately update
503   // writeTimeout_ with the new value.
504   if ((eventFlags_ & EventHandler::WRITE) &&
505       (state_ != StateEnum::CONNECTING)) {
506     assert(state_ == StateEnum::ESTABLISHED);
507     assert((shutdownFlags_ & SHUT_WRITE) == 0);
508     if (sendTimeout_ > 0) {
509       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
510         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
511             withAddr("failed to reschedule send timeout in setSendTimeout"));
512         return failWrite(__func__, ex);
513       }
514     } else {
515       writeTimeout_.cancelTimeout();
516     }
517   }
518 }
519
520 void AsyncSocket::setReadCB(ReadCallback *callback) {
521   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
522           << ", callback=" << callback << ", state=" << state_;
523
524   // Short circuit if callback is the same as the existing readCallback_.
525   //
526   // Note that this is needed for proper functioning during some cleanup cases.
527   // During cleanup we allow setReadCallback(nullptr) to be called even if the
528   // read callback is already unset and we have been detached from an event
529   // base.  This check prevents us from asserting
530   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
531   if (callback == readCallback_) {
532     return;
533   }
534
535   /* We are removing a read callback */
536   if (callback == nullptr &&
537       immediateReadHandler_.isLoopCallbackScheduled()) {
538     immediateReadHandler_.cancelLoopCallback();
539   }
540
541   if (shutdownFlags_ & SHUT_READ) {
542     // Reads have already been shut down on this socket.
543     //
544     // Allow setReadCallback(nullptr) to be called in this case, but don't
545     // allow a new callback to be set.
546     //
547     // For example, setReadCallback(nullptr) can happen after an error if we
548     // invoke some other error callback before invoking readError().  The other
549     // error callback that is invoked first may go ahead and clear the read
550     // callback before we get a chance to invoke readError().
551     if (callback != nullptr) {
552       return invalidState(callback);
553     }
554     assert((eventFlags_ & EventHandler::READ) == 0);
555     readCallback_ = nullptr;
556     return;
557   }
558
559   DestructorGuard dg(this);
560   assert(eventBase_->isInEventBaseThread());
561
562   switch ((StateEnum)state_) {
563     case StateEnum::CONNECTING:
564       // For convenience, we allow the read callback to be set while we are
565       // still connecting.  We just store the callback for now.  Once the
566       // connection completes we'll register for read events.
567       readCallback_ = callback;
568       return;
569     case StateEnum::ESTABLISHED:
570     {
571       readCallback_ = callback;
572       uint16_t oldFlags = eventFlags_;
573       if (readCallback_) {
574         eventFlags_ |= EventHandler::READ;
575       } else {
576         eventFlags_ &= ~EventHandler::READ;
577       }
578
579       // Update our registration if our flags have changed
580       if (eventFlags_ != oldFlags) {
581         // We intentionally ignore the return value here.
582         // updateEventRegistration() will move us into the error state if it
583         // fails, and we don't need to do anything else here afterwards.
584         (void)updateEventRegistration();
585       }
586
587       if (readCallback_) {
588         checkForImmediateRead();
589       }
590       return;
591     }
592     case StateEnum::CLOSED:
593     case StateEnum::ERROR:
594       // We should never reach here.  SHUT_READ should always be set
595       // if we are in STATE_CLOSED or STATE_ERROR.
596       assert(false);
597       return invalidState(callback);
598     case StateEnum::UNINIT:
599       // We do not allow setReadCallback() to be called before we start
600       // connecting.
601       return invalidState(callback);
602   }
603
604   // We don't put a default case in the switch statement, so that the compiler
605   // will warn us to update the switch statement if a new state is added.
606   return invalidState(callback);
607 }
608
609 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
610   return readCallback_;
611 }
612
613 void AsyncSocket::write(WriteCallback* callback,
614                          const void* buf, size_t bytes, WriteFlags flags,
615                          BufferCallback* bufCallback) {
616   iovec op;
617   op.iov_base = const_cast<void*>(buf);
618   op.iov_len = bytes;
619   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags, bufCallback);
620 }
621
622 void AsyncSocket::writev(WriteCallback* callback,
623                           const iovec* vec,
624                           size_t count,
625                           WriteFlags flags,
626                           BufferCallback* bufCallback) {
627   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags, bufCallback);
628 }
629
630 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
631                               WriteFlags flags, BufferCallback* bufCallback) {
632   constexpr size_t kSmallSizeMax = 64;
633   size_t count = buf->countChainElements();
634   if (count <= kSmallSizeMax) {
635     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
636     writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
637   } else {
638     iovec* vec = new iovec[count];
639     writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
640     delete[] vec;
641   }
642 }
643
644 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
645     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags,
646     BufferCallback* bufCallback) {
647   size_t veclen = buf->fillIov(vec, count);
648   writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback);
649 }
650
651 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
652                              size_t count, unique_ptr<IOBuf>&& buf,
653                              WriteFlags flags, BufferCallback* bufCallback) {
654   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
655           << ", callback=" << callback << ", count=" << count
656           << ", state=" << state_;
657   DestructorGuard dg(this);
658   unique_ptr<IOBuf>ioBuf(std::move(buf));
659   assert(eventBase_->isInEventBaseThread());
660
661   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
662     // No new writes may be performed after the write side of the socket has
663     // been shutdown.
664     //
665     // We could just call callback->writeError() here to fail just this write.
666     // However, fail hard and use invalidState() to fail all outstanding
667     // callbacks and move the socket into the error state.  There's most likely
668     // a bug in the caller's code, so we abort everything rather than trying to
669     // proceed as best we can.
670     return invalidState(callback);
671   }
672
673   uint32_t countWritten = 0;
674   uint32_t partialWritten = 0;
675   int bytesWritten = 0;
676   bool mustRegister = false;
677   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
678     if (writeReqHead_ == nullptr) {
679       // If we are established and there are no other writes pending,
680       // we can attempt to perform the write immediately.
681       assert(writeReqTail_ == nullptr);
682       assert((eventFlags_ & EventHandler::WRITE) == 0);
683
684       bytesWritten = performWrite(vec, count, flags,
685                                   &countWritten, &partialWritten);
686       if (bytesWritten < 0) {
687         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
688                                withAddr("writev failed"), errno);
689         return failWrite(__func__, callback, 0, ex);
690       } else if (countWritten == count) {
691         // We successfully wrote everything.
692         // Invoke the callback and return.
693         if (callback) {
694           callback->writeSuccess();
695         }
696         return;
697       } else { // continue writing the next writeReq
698         if (bufCallback) {
699           bufCallback->onEgressBuffered();
700         }
701       }
702       mustRegister = true;
703     }
704   } else if (!connecting()) {
705     // Invalid state for writing
706     return invalidState(callback);
707   }
708
709   // Create a new WriteRequest to add to the queue
710   WriteRequest* req;
711   try {
712     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
713                                         count - countWritten, partialWritten,
714                                         bytesWritten, std::move(ioBuf), flags,
715                                         bufCallback);
716   } catch (const std::exception& ex) {
717     // we mainly expect to catch std::bad_alloc here
718     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
719         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
720     return failWrite(__func__, callback, bytesWritten, tex);
721   }
722   req->consume();
723   if (writeReqTail_ == nullptr) {
724     assert(writeReqHead_ == nullptr);
725     writeReqHead_ = writeReqTail_ = req;
726   } else {
727     writeReqTail_->append(req);
728     writeReqTail_ = req;
729   }
730
731   // Register for write events if are established and not currently
732   // waiting on write events
733   if (mustRegister) {
734     assert(state_ == StateEnum::ESTABLISHED);
735     assert((eventFlags_ & EventHandler::WRITE) == 0);
736     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
737       assert(state_ == StateEnum::ERROR);
738       return;
739     }
740     if (sendTimeout_ > 0) {
741       // Schedule a timeout to fire if the write takes too long.
742       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
743         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
744                                withAddr("failed to schedule send timeout"));
745         return failWrite(__func__, ex);
746       }
747     }
748   }
749 }
750
751 void AsyncSocket::writeRequest(WriteRequest* req) {
752   if (writeReqTail_ == nullptr) {
753     assert(writeReqHead_ == nullptr);
754     writeReqHead_ = writeReqTail_ = req;
755     req->start();
756   } else {
757     writeReqTail_->append(req);
758     writeReqTail_ = req;
759   }
760 }
761
762 void AsyncSocket::close() {
763   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
764           << ", state=" << state_ << ", shutdownFlags="
765           << std::hex << (int) shutdownFlags_;
766
767   // close() is only different from closeNow() when there are pending writes
768   // that need to drain before we can close.  In all other cases, just call
769   // closeNow().
770   //
771   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
772   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
773   // is still running.  (e.g., If there are multiple pending writes, and we
774   // call writeError() on the first one, it may call close().  In this case we
775   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
776   // writes will still be in the queue.)
777   //
778   // We only need to drain pending writes if we are still in STATE_CONNECTING
779   // or STATE_ESTABLISHED
780   if ((writeReqHead_ == nullptr) ||
781       !(state_ == StateEnum::CONNECTING ||
782       state_ == StateEnum::ESTABLISHED)) {
783     closeNow();
784     return;
785   }
786
787   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
788   // destroyed until close() returns.
789   DestructorGuard dg(this);
790   assert(eventBase_->isInEventBaseThread());
791
792   // Since there are write requests pending, we have to set the
793   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
794   // connect finishes and we finish writing these requests.
795   //
796   // Set SHUT_READ to indicate that reads are shut down, and set the
797   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
798   // pending writes complete.
799   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
800
801   // If a read callback is set, invoke readEOF() immediately to inform it that
802   // the socket has been closed and no more data can be read.
803   if (readCallback_) {
804     // Disable reads if they are enabled
805     if (!updateEventRegistration(0, EventHandler::READ)) {
806       // We're now in the error state; callbacks have been cleaned up
807       assert(state_ == StateEnum::ERROR);
808       assert(readCallback_ == nullptr);
809     } else {
810       ReadCallback* callback = readCallback_;
811       readCallback_ = nullptr;
812       callback->readEOF();
813     }
814   }
815 }
816
817 void AsyncSocket::closeNow() {
818   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
819           << ", state=" << state_ << ", shutdownFlags="
820           << std::hex << (int) shutdownFlags_;
821   DestructorGuard dg(this);
822   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
823
824   switch (state_) {
825     case StateEnum::ESTABLISHED:
826     case StateEnum::CONNECTING:
827     {
828       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
829       state_ = StateEnum::CLOSED;
830
831       // If the write timeout was set, cancel it.
832       writeTimeout_.cancelTimeout();
833
834       // If we are registered for I/O events, unregister.
835       if (eventFlags_ != EventHandler::NONE) {
836         eventFlags_ = EventHandler::NONE;
837         if (!updateEventRegistration()) {
838           // We will have been moved into the error state.
839           assert(state_ == StateEnum::ERROR);
840           return;
841         }
842       }
843
844       if (immediateReadHandler_.isLoopCallbackScheduled()) {
845         immediateReadHandler_.cancelLoopCallback();
846       }
847
848       if (fd_ >= 0) {
849         ioHandler_.changeHandlerFD(-1);
850         doClose();
851       }
852
853       invokeConnectErr(socketClosedLocallyEx);
854
855       failAllWrites(socketClosedLocallyEx);
856
857       if (readCallback_) {
858         ReadCallback* callback = readCallback_;
859         readCallback_ = nullptr;
860         callback->readEOF();
861       }
862       return;
863     }
864     case StateEnum::CLOSED:
865       // Do nothing.  It's possible that we are being called recursively
866       // from inside a callback that we invoked inside another call to close()
867       // that is still running.
868       return;
869     case StateEnum::ERROR:
870       // Do nothing.  The error handling code has performed (or is performing)
871       // cleanup.
872       return;
873     case StateEnum::UNINIT:
874       assert(eventFlags_ == EventHandler::NONE);
875       assert(connectCallback_ == nullptr);
876       assert(readCallback_ == nullptr);
877       assert(writeReqHead_ == nullptr);
878       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
879       state_ = StateEnum::CLOSED;
880       return;
881   }
882
883   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
884               << ") called in unknown state " << state_;
885 }
886
887 void AsyncSocket::closeWithReset() {
888   // Enable SO_LINGER, with the linger timeout set to 0.
889   // This will trigger a TCP reset when we close the socket.
890   if (fd_ >= 0) {
891     struct linger optLinger = {1, 0};
892     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
893       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
894               << "on " << fd_ << ": errno=" << errno;
895     }
896   }
897
898   // Then let closeNow() take care of the rest
899   closeNow();
900 }
901
902 void AsyncSocket::shutdownWrite() {
903   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
904           << ", state=" << state_ << ", shutdownFlags="
905           << std::hex << (int) shutdownFlags_;
906
907   // If there are no pending writes, shutdownWrite() is identical to
908   // shutdownWriteNow().
909   if (writeReqHead_ == nullptr) {
910     shutdownWriteNow();
911     return;
912   }
913
914   assert(eventBase_->isInEventBaseThread());
915
916   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
917   // shutdown will be performed once all writes complete.
918   shutdownFlags_ |= SHUT_WRITE_PENDING;
919 }
920
921 void AsyncSocket::shutdownWriteNow() {
922   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
923           << ", fd=" << fd_ << ", state=" << state_
924           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
925
926   if (shutdownFlags_ & SHUT_WRITE) {
927     // Writes are already shutdown; nothing else to do.
928     return;
929   }
930
931   // If SHUT_READ is already set, just call closeNow() to completely
932   // close the socket.  This can happen if close() was called with writes
933   // pending, and then shutdownWriteNow() is called before all pending writes
934   // complete.
935   if (shutdownFlags_ & SHUT_READ) {
936     closeNow();
937     return;
938   }
939
940   DestructorGuard dg(this);
941   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
942
943   switch (static_cast<StateEnum>(state_)) {
944     case StateEnum::ESTABLISHED:
945     {
946       shutdownFlags_ |= SHUT_WRITE;
947
948       // If the write timeout was set, cancel it.
949       writeTimeout_.cancelTimeout();
950
951       // If we are registered for write events, unregister.
952       if (!updateEventRegistration(0, EventHandler::WRITE)) {
953         // We will have been moved into the error state.
954         assert(state_ == StateEnum::ERROR);
955         return;
956       }
957
958       // Shutdown writes on the file descriptor
959       ::shutdown(fd_, SHUT_WR);
960
961       // Immediately fail all write requests
962       failAllWrites(socketShutdownForWritesEx);
963       return;
964     }
965     case StateEnum::CONNECTING:
966     {
967       // Set the SHUT_WRITE_PENDING flag.
968       // When the connection completes, it will check this flag,
969       // shutdown the write half of the socket, and then set SHUT_WRITE.
970       shutdownFlags_ |= SHUT_WRITE_PENDING;
971
972       // Immediately fail all write requests
973       failAllWrites(socketShutdownForWritesEx);
974       return;
975     }
976     case StateEnum::UNINIT:
977       // Callers normally shouldn't call shutdownWriteNow() before the socket
978       // even starts connecting.  Nonetheless, go ahead and set
979       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
980       // immediately shut down the write side of the socket.
981       shutdownFlags_ |= SHUT_WRITE_PENDING;
982       return;
983     case StateEnum::CLOSED:
984     case StateEnum::ERROR:
985       // We should never get here.  SHUT_WRITE should always be set
986       // in STATE_CLOSED and STATE_ERROR.
987       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
988                  << ", fd=" << fd_ << ") in unexpected state " << state_
989                  << " with SHUT_WRITE not set ("
990                  << std::hex << (int) shutdownFlags_ << ")";
991       assert(false);
992       return;
993   }
994
995   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
996               << fd_ << ") called in unknown state " << state_;
997 }
998
999 bool AsyncSocket::readable() const {
1000   if (fd_ == -1) {
1001     return false;
1002   }
1003   struct pollfd fds[1];
1004   fds[0].fd = fd_;
1005   fds[0].events = POLLIN;
1006   fds[0].revents = 0;
1007   int rc = poll(fds, 1, 0);
1008   return rc == 1;
1009 }
1010
1011 bool AsyncSocket::isPending() const {
1012   return ioHandler_.isPending();
1013 }
1014
1015 bool AsyncSocket::hangup() const {
1016   if (fd_ == -1) {
1017     // sanity check, no one should ask for hangup if we are not connected.
1018     assert(false);
1019     return false;
1020   }
1021 #ifdef POLLRDHUP // Linux-only
1022   struct pollfd fds[1];
1023   fds[0].fd = fd_;
1024   fds[0].events = POLLRDHUP|POLLHUP;
1025   fds[0].revents = 0;
1026   poll(fds, 1, 0);
1027   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1028 #else
1029   return false;
1030 #endif
1031 }
1032
1033 bool AsyncSocket::good() const {
1034   return ((state_ == StateEnum::CONNECTING ||
1035           state_ == StateEnum::ESTABLISHED) &&
1036           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1037 }
1038
1039 bool AsyncSocket::error() const {
1040   return (state_ == StateEnum::ERROR);
1041 }
1042
1043 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1044   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1045           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1046           << ", state=" << state_ << ", events="
1047           << std::hex << eventFlags_ << ")";
1048   assert(eventBase_ == nullptr);
1049   assert(eventBase->isInEventBaseThread());
1050
1051   eventBase_ = eventBase;
1052   ioHandler_.attachEventBase(eventBase);
1053   writeTimeout_.attachEventBase(eventBase);
1054 }
1055
1056 void AsyncSocket::detachEventBase() {
1057   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1058           << ", old evb=" << eventBase_ << ", state=" << state_
1059           << ", events=" << std::hex << eventFlags_ << ")";
1060   assert(eventBase_ != nullptr);
1061   assert(eventBase_->isInEventBaseThread());
1062
1063   eventBase_ = nullptr;
1064   ioHandler_.detachEventBase();
1065   writeTimeout_.detachEventBase();
1066 }
1067
1068 bool AsyncSocket::isDetachable() const {
1069   DCHECK(eventBase_ != nullptr);
1070   DCHECK(eventBase_->isInEventBaseThread());
1071
1072   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1073 }
1074
1075 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1076   address->setFromLocalAddress(fd_);
1077 }
1078
1079 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1080   if (!addr_.isInitialized()) {
1081     addr_.setFromPeerAddress(fd_);
1082   }
1083   *address = addr_;
1084 }
1085
1086 int AsyncSocket::setNoDelay(bool noDelay) {
1087   if (fd_ < 0) {
1088     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1089                << this << "(state=" << state_ << ")";
1090     return EINVAL;
1091
1092   }
1093
1094   int value = noDelay ? 1 : 0;
1095   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1096     int errnoCopy = errno;
1097     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1098             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1099             << strerror(errnoCopy);
1100     return errnoCopy;
1101   }
1102
1103   return 0;
1104 }
1105
1106 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1107
1108   #ifndef TCP_CONGESTION
1109   #define TCP_CONGESTION  13
1110   #endif
1111
1112   if (fd_ < 0) {
1113     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1114                << "socket " << this << "(state=" << state_ << ")";
1115     return EINVAL;
1116
1117   }
1118
1119   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1120         cname.length() + 1) != 0) {
1121     int errnoCopy = errno;
1122     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1123             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1124             << strerror(errnoCopy);
1125     return errnoCopy;
1126   }
1127
1128   return 0;
1129 }
1130
1131 int AsyncSocket::setQuickAck(bool quickack) {
1132   if (fd_ < 0) {
1133     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1134                << this << "(state=" << state_ << ")";
1135     return EINVAL;
1136
1137   }
1138
1139 #ifdef TCP_QUICKACK // Linux-only
1140   int value = quickack ? 1 : 0;
1141   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1142     int errnoCopy = errno;
1143     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1144             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1145             << strerror(errnoCopy);
1146     return errnoCopy;
1147   }
1148
1149   return 0;
1150 #else
1151   return ENOSYS;
1152 #endif
1153 }
1154
1155 int AsyncSocket::setSendBufSize(size_t bufsize) {
1156   if (fd_ < 0) {
1157     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1158                << this << "(state=" << state_ << ")";
1159     return EINVAL;
1160   }
1161
1162   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1163     int errnoCopy = errno;
1164     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1165             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1166             << strerror(errnoCopy);
1167     return errnoCopy;
1168   }
1169
1170   return 0;
1171 }
1172
1173 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1174   if (fd_ < 0) {
1175     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1176                << this << "(state=" << state_ << ")";
1177     return EINVAL;
1178   }
1179
1180   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1181     int errnoCopy = errno;
1182     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1183             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1184             << strerror(errnoCopy);
1185     return errnoCopy;
1186   }
1187
1188   return 0;
1189 }
1190
1191 int AsyncSocket::setTCPProfile(int profd) {
1192   if (fd_ < 0) {
1193     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1194                << this << "(state=" << state_ << ")";
1195     return EINVAL;
1196   }
1197
1198   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1199     int errnoCopy = errno;
1200     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1201             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1202             << strerror(errnoCopy);
1203     return errnoCopy;
1204   }
1205
1206   return 0;
1207 }
1208
1209 void AsyncSocket::ioReady(uint16_t events) noexcept {
1210   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1211           << ", events=" << std::hex << events << ", state=" << state_;
1212   DestructorGuard dg(this);
1213   assert(events & EventHandler::READ_WRITE);
1214   assert(eventBase_->isInEventBaseThread());
1215
1216   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1217   if (relevantEvents == EventHandler::READ) {
1218     handleRead();
1219   } else if (relevantEvents == EventHandler::WRITE) {
1220     handleWrite();
1221   } else if (relevantEvents == EventHandler::READ_WRITE) {
1222     EventBase* originalEventBase = eventBase_;
1223     // If both read and write events are ready, process writes first.
1224     handleWrite();
1225
1226     // Return now if handleWrite() detached us from our EventBase
1227     if (eventBase_ != originalEventBase) {
1228       return;
1229     }
1230
1231     // Only call handleRead() if a read callback is still installed.
1232     // (It's possible that the read callback was uninstalled during
1233     // handleWrite().)
1234     if (readCallback_) {
1235       handleRead();
1236     }
1237   } else {
1238     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1239                << std::hex << events << "(this=" << this << ")";
1240     abort();
1241   }
1242 }
1243
1244 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1245   VLOG(5) << "AsyncSocket::performRead() this=" << this
1246           << ", buf=" << *buf << ", buflen=" << *buflen;
1247
1248   int recvFlags = 0;
1249   if (peek_) {
1250     recvFlags |= MSG_PEEK;
1251   }
1252
1253   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1254   if (bytes < 0) {
1255     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1256       // No more data to read right now.
1257       return READ_BLOCKING;
1258     } else {
1259       return READ_ERROR;
1260     }
1261   } else {
1262     appBytesReceived_ += bytes;
1263     return bytes;
1264   }
1265 }
1266
1267 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1268   // no matter what, buffer should be preapared for non-ssl socket
1269   CHECK(readCallback_);
1270   readCallback_->getReadBuffer(buf, buflen);
1271 }
1272
1273 void AsyncSocket::handleRead() noexcept {
1274   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1275           << ", state=" << state_;
1276   assert(state_ == StateEnum::ESTABLISHED);
1277   assert((shutdownFlags_ & SHUT_READ) == 0);
1278   assert(readCallback_ != nullptr);
1279   assert(eventFlags_ & EventHandler::READ);
1280
1281   // Loop until:
1282   // - a read attempt would block
1283   // - readCallback_ is uninstalled
1284   // - the number of loop iterations exceeds the optional maximum
1285   // - this AsyncSocket is moved to another EventBase
1286   //
1287   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1288   // which is why need to check for it here.
1289   //
1290   // The last bullet point is slightly subtle.  readDataAvailable() may also
1291   // detach this socket from this EventBase.  However, before
1292   // readDataAvailable() returns another thread may pick it up, attach it to
1293   // a different EventBase, and install another readCallback_.  We need to
1294   // exit immediately after readDataAvailable() returns if the eventBase_ has
1295   // changed.  (The caller must perform some sort of locking to transfer the
1296   // AsyncSocket between threads properly.  This will be sufficient to ensure
1297   // that this thread sees the updated eventBase_ variable after
1298   // readDataAvailable() returns.)
1299   uint16_t numReads = 0;
1300   EventBase* originalEventBase = eventBase_;
1301   while (readCallback_ && eventBase_ == originalEventBase) {
1302     // Get the buffer to read into.
1303     void* buf = nullptr;
1304     size_t buflen = 0, offset = 0;
1305     try {
1306       prepareReadBuffer(&buf, &buflen);
1307       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1308     } catch (const AsyncSocketException& ex) {
1309       return failRead(__func__, ex);
1310     } catch (const std::exception& ex) {
1311       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1312                               string("ReadCallback::getReadBuffer() "
1313                                      "threw exception: ") +
1314                               ex.what());
1315       return failRead(__func__, tex);
1316     } catch (...) {
1317       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1318                              "ReadCallback::getReadBuffer() threw "
1319                              "non-exception type");
1320       return failRead(__func__, ex);
1321     }
1322     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1323       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1324                              "ReadCallback::getReadBuffer() returned "
1325                              "empty buffer");
1326       return failRead(__func__, ex);
1327     }
1328
1329     // Perform the read
1330     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1331     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1332             << bytesRead << " bytes";
1333     if (bytesRead > 0) {
1334       if (!isBufferMovable_) {
1335         readCallback_->readDataAvailable(bytesRead);
1336       } else {
1337         CHECK(kOpenSslModeMoveBufferOwnership);
1338         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1339                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1340                 << ", offset=" << offset;
1341         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1342         readBuf->trimStart(offset);
1343         readBuf->trimEnd(buflen - offset - bytesRead);
1344         readCallback_->readBufferAvailable(std::move(readBuf));
1345       }
1346
1347       // Fall through and continue around the loop if the read
1348       // completely filled the available buffer.
1349       // Note that readCallback_ may have been uninstalled or changed inside
1350       // readDataAvailable().
1351       if (size_t(bytesRead) < buflen) {
1352         return;
1353       }
1354     } else if (bytesRead == READ_BLOCKING) {
1355         // No more data to read right now.
1356         return;
1357     } else if (bytesRead == READ_ERROR) {
1358       readErr_ = READ_ERROR;
1359       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1360                              withAddr("recv() failed"), errno);
1361       return failRead(__func__, ex);
1362     } else {
1363       assert(bytesRead == READ_EOF);
1364       readErr_ = READ_EOF;
1365       // EOF
1366       shutdownFlags_ |= SHUT_READ;
1367       if (!updateEventRegistration(0, EventHandler::READ)) {
1368         // we've already been moved into STATE_ERROR
1369         assert(state_ == StateEnum::ERROR);
1370         assert(readCallback_ == nullptr);
1371         return;
1372       }
1373
1374       ReadCallback* callback = readCallback_;
1375       readCallback_ = nullptr;
1376       callback->readEOF();
1377       return;
1378     }
1379     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1380       if (readCallback_ != nullptr) {
1381         // We might still have data in the socket.
1382         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1383         scheduleImmediateRead();
1384       }
1385       return;
1386     }
1387   }
1388 }
1389
1390 /**
1391  * This function attempts to write as much data as possible, until no more data
1392  * can be written.
1393  *
1394  * - If it sends all available data, it unregisters for write events, and stops
1395  *   the writeTimeout_.
1396  *
1397  * - If not all of the data can be sent immediately, it reschedules
1398  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1399  *   registered for write events.
1400  */
1401 void AsyncSocket::handleWrite() noexcept {
1402   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1403           << ", state=" << state_;
1404   if (state_ == StateEnum::CONNECTING) {
1405     handleConnect();
1406     return;
1407   }
1408
1409   // Normal write
1410   assert(state_ == StateEnum::ESTABLISHED);
1411   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1412   assert(writeReqHead_ != nullptr);
1413
1414   // Loop until we run out of write requests,
1415   // or until this socket is moved to another EventBase.
1416   // (See the comment in handleRead() explaining how this can happen.)
1417   EventBase* originalEventBase = eventBase_;
1418   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1419     if (!writeReqHead_->performWrite()) {
1420       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1421                              withAddr("writev() failed"), errno);
1422       return failWrite(__func__, ex);
1423     } else if (writeReqHead_->isComplete()) {
1424       // We finished this request
1425       WriteRequest* req = writeReqHead_;
1426       writeReqHead_ = req->getNext();
1427
1428       if (writeReqHead_ == nullptr) {
1429         writeReqTail_ = nullptr;
1430         // This is the last write request.
1431         // Unregister for write events and cancel the send timer
1432         // before we invoke the callback.  We have to update the state properly
1433         // before calling the callback, since it may want to detach us from
1434         // the EventBase.
1435         if (eventFlags_ & EventHandler::WRITE) {
1436           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1437             assert(state_ == StateEnum::ERROR);
1438             return;
1439           }
1440           // Stop the send timeout
1441           writeTimeout_.cancelTimeout();
1442         }
1443         assert(!writeTimeout_.isScheduled());
1444
1445         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1446         // we finish sending the last write request.
1447         //
1448         // We have to do this before invoking writeSuccess(), since
1449         // writeSuccess() may detach us from our EventBase.
1450         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1451           assert(connectCallback_ == nullptr);
1452           shutdownFlags_ |= SHUT_WRITE;
1453
1454           if (shutdownFlags_ & SHUT_READ) {
1455             // Reads have already been shutdown.  Fully close the socket and
1456             // move to STATE_CLOSED.
1457             //
1458             // Note: This code currently moves us to STATE_CLOSED even if
1459             // close() hasn't ever been called.  This can occur if we have
1460             // received EOF from the peer and shutdownWrite() has been called
1461             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1462             // case, until close() is actually called?  I can't think of a
1463             // reason why we would need to do so.  No other operations besides
1464             // calling close() or destroying the socket can be performed at
1465             // this point.
1466             assert(readCallback_ == nullptr);
1467             state_ = StateEnum::CLOSED;
1468             if (fd_ >= 0) {
1469               ioHandler_.changeHandlerFD(-1);
1470               doClose();
1471             }
1472           } else {
1473             // Reads are still enabled, so we are only doing a half-shutdown
1474             ::shutdown(fd_, SHUT_WR);
1475           }
1476         }
1477       }
1478
1479       // Invoke the callback
1480       WriteCallback* callback = req->getCallback();
1481       req->destroy();
1482       if (callback) {
1483         callback->writeSuccess();
1484       }
1485       // We'll continue around the loop, trying to write another request
1486     } else {
1487       // Notify BufferCallback:
1488       BufferCallback* bufferCallback = writeReqHead_->getBufferCallback();
1489       if (bufferCallback) {
1490         bufferCallback->onEgressBuffered();
1491       }
1492       // Partial write.
1493       writeReqHead_->consume();
1494       // Stop after a partial write; it's highly likely that a subsequent write
1495       // attempt will just return EAGAIN.
1496       //
1497       // Ensure that we are registered for write events.
1498       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1499         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1500           assert(state_ == StateEnum::ERROR);
1501           return;
1502         }
1503       }
1504
1505       // Reschedule the send timeout, since we have made some write progress.
1506       if (sendTimeout_ > 0) {
1507         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1508           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1509               withAddr("failed to reschedule write timeout"));
1510           return failWrite(__func__, ex);
1511         }
1512       }
1513       return;
1514     }
1515   }
1516 }
1517
1518 void AsyncSocket::checkForImmediateRead() noexcept {
1519   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1520   // (However, note that some subclasses do override this method.)
1521   //
1522   // Simply calling handleRead() here would be bad, as this would call
1523   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1524   // buffer even though no data may be available.  This would waste lots of
1525   // memory, since the buffer will sit around unused until the socket actually
1526   // becomes readable.
1527   //
1528   // Checking if the socket is readable now also seems like it would probably
1529   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1530   // would just waste an extra system call.  Even if it is readable, waiting to
1531   // find out from libevent on the next event loop doesn't seem that bad.
1532 }
1533
1534 void AsyncSocket::handleInitialReadWrite() noexcept {
1535   // Our callers should already be holding a DestructorGuard, but grab
1536   // one here just to make sure, in case one of our calling code paths ever
1537   // changes.
1538   DestructorGuard dg(this);
1539
1540   // If we have a readCallback_, make sure we enable read events.  We
1541   // may already be registered for reads if connectSuccess() set
1542   // the read calback.
1543   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1544     assert(state_ == StateEnum::ESTABLISHED);
1545     assert((shutdownFlags_ & SHUT_READ) == 0);
1546     if (!updateEventRegistration(EventHandler::READ, 0)) {
1547       assert(state_ == StateEnum::ERROR);
1548       return;
1549     }
1550     checkForImmediateRead();
1551   } else if (readCallback_ == nullptr) {
1552     // Unregister for read events.
1553     updateEventRegistration(0, EventHandler::READ);
1554   }
1555
1556   // If we have write requests pending, try to send them immediately.
1557   // Since we just finished accepting, there is a very good chance that we can
1558   // write without blocking.
1559   //
1560   // However, we only process them if EventHandler::WRITE is not already set,
1561   // which means that we're already blocked on a write attempt.  (This can
1562   // happen if connectSuccess() called write() before returning.)
1563   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1564     // Call handleWrite() to perform write processing.
1565     handleWrite();
1566   } else if (writeReqHead_ == nullptr) {
1567     // Unregister for write event.
1568     updateEventRegistration(0, EventHandler::WRITE);
1569   }
1570 }
1571
1572 void AsyncSocket::handleConnect() noexcept {
1573   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1574           << ", state=" << state_;
1575   assert(state_ == StateEnum::CONNECTING);
1576   // SHUT_WRITE can never be set while we are still connecting;
1577   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1578   // finishes
1579   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1580
1581   // In case we had a connect timeout, cancel the timeout
1582   writeTimeout_.cancelTimeout();
1583   // We don't use a persistent registration when waiting on a connect event,
1584   // so we have been automatically unregistered now.  Update eventFlags_ to
1585   // reflect reality.
1586   assert(eventFlags_ == EventHandler::WRITE);
1587   eventFlags_ = EventHandler::NONE;
1588
1589   // Call getsockopt() to check if the connect succeeded
1590   int error;
1591   socklen_t len = sizeof(error);
1592   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1593   if (rv != 0) {
1594     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1595                            withAddr("error calling getsockopt() after connect"),
1596                            errno);
1597     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1598                << fd_ << " host=" << addr_.describe()
1599                << ") exception:" << ex.what();
1600     return failConnect(__func__, ex);
1601   }
1602
1603   if (error != 0) {
1604     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1605                            "connect failed", error);
1606     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1607             << fd_ << " host=" << addr_.describe()
1608             << ") exception: " << ex.what();
1609     return failConnect(__func__, ex);
1610   }
1611
1612   // Move into STATE_ESTABLISHED
1613   state_ = StateEnum::ESTABLISHED;
1614
1615   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1616   // perform, immediately shutdown the write half of the socket.
1617   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1618     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1619     // are still connecting we just abort the connect rather than waiting for
1620     // it to complete.
1621     assert((shutdownFlags_ & SHUT_READ) == 0);
1622     ::shutdown(fd_, SHUT_WR);
1623     shutdownFlags_ |= SHUT_WRITE;
1624   }
1625
1626   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1627           << "successfully connected; state=" << state_;
1628
1629   // Remember the EventBase we are attached to, before we start invoking any
1630   // callbacks (since the callbacks may call detachEventBase()).
1631   EventBase* originalEventBase = eventBase_;
1632
1633   invokeConnectSuccess();
1634   // Note that the connect callback may have changed our state.
1635   // (set or unset the read callback, called write(), closed the socket, etc.)
1636   // The following code needs to handle these situations correctly.
1637   //
1638   // If the socket has been closed, readCallback_ and writeReqHead_ will
1639   // always be nullptr, so that will prevent us from trying to read or write.
1640   //
1641   // The main thing to check for is if eventBase_ is still originalEventBase.
1642   // If not, we have been detached from this event base, so we shouldn't
1643   // perform any more operations.
1644   if (eventBase_ != originalEventBase) {
1645     return;
1646   }
1647
1648   handleInitialReadWrite();
1649 }
1650
1651 void AsyncSocket::timeoutExpired() noexcept {
1652   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1653           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1654   DestructorGuard dg(this);
1655   assert(eventBase_->isInEventBaseThread());
1656
1657   if (state_ == StateEnum::CONNECTING) {
1658     // connect() timed out
1659     // Unregister for I/O events.
1660     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1661                            "connect timed out");
1662     failConnect(__func__, ex);
1663   } else {
1664     // a normal write operation timed out
1665     assert(state_ == StateEnum::ESTABLISHED);
1666     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1667     failWrite(__func__, ex);
1668   }
1669 }
1670
1671 ssize_t AsyncSocket::performWrite(const iovec* vec,
1672                                    uint32_t count,
1673                                    WriteFlags flags,
1674                                    uint32_t* countWritten,
1675                                    uint32_t* partialWritten) {
1676   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1677   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1678   // (since it may terminate the program if the main program doesn't explicitly
1679   // ignore it).
1680   struct msghdr msg;
1681   msg.msg_name = nullptr;
1682   msg.msg_namelen = 0;
1683   msg.msg_iov = const_cast<iovec *>(vec);
1684 #ifdef IOV_MAX // not defined on Android
1685   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1686 #else
1687   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1688 #endif
1689   msg.msg_control = nullptr;
1690   msg.msg_controllen = 0;
1691   msg.msg_flags = 0;
1692
1693   int msg_flags = MSG_DONTWAIT;
1694
1695 #ifdef MSG_NOSIGNAL // Linux-only
1696   msg_flags |= MSG_NOSIGNAL;
1697   if (isSet(flags, WriteFlags::CORK)) {
1698     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1699     // give it the rest of the data rather than immediately sending a partial
1700     // frame, even when TCP_NODELAY is enabled.
1701     msg_flags |= MSG_MORE;
1702   }
1703 #endif
1704   if (isSet(flags, WriteFlags::EOR)) {
1705     // marks that this is the last byte of a record (response)
1706     msg_flags |= MSG_EOR;
1707   }
1708   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1709   if (totalWritten < 0) {
1710     if (errno == EAGAIN) {
1711       // TCP buffer is full; we can't write any more data right now.
1712       *countWritten = 0;
1713       *partialWritten = 0;
1714       return 0;
1715     }
1716     // error
1717     *countWritten = 0;
1718     *partialWritten = 0;
1719     return -1;
1720   }
1721
1722   appBytesWritten_ += totalWritten;
1723
1724   uint32_t bytesWritten;
1725   uint32_t n;
1726   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1727     const iovec* v = vec + n;
1728     if (v->iov_len > bytesWritten) {
1729       // Partial write finished in the middle of this iovec
1730       *countWritten = n;
1731       *partialWritten = bytesWritten;
1732       return totalWritten;
1733     }
1734
1735     bytesWritten -= v->iov_len;
1736   }
1737
1738   assert(bytesWritten == 0);
1739   *countWritten = n;
1740   *partialWritten = 0;
1741   return totalWritten;
1742 }
1743
1744 /**
1745  * Re-register the EventHandler after eventFlags_ has changed.
1746  *
1747  * If an error occurs, fail() is called to move the socket into the error state
1748  * and call all currently installed callbacks.  After an error, the
1749  * AsyncSocket is completely unregistered.
1750  *
1751  * @return Returns true on succcess, or false on error.
1752  */
1753 bool AsyncSocket::updateEventRegistration() {
1754   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1755           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1756           << ", events=" << std::hex << eventFlags_;
1757   assert(eventBase_->isInEventBaseThread());
1758   if (eventFlags_ == EventHandler::NONE) {
1759     ioHandler_.unregisterHandler();
1760     return true;
1761   }
1762
1763   // Always register for persistent events, so we don't have to re-register
1764   // after being called back.
1765   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1766     eventFlags_ = EventHandler::NONE; // we're not registered after error
1767     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1768         withAddr("failed to update AsyncSocket event registration"));
1769     fail("updateEventRegistration", ex);
1770     return false;
1771   }
1772
1773   return true;
1774 }
1775
1776 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1777                                            uint16_t disable) {
1778   uint16_t oldFlags = eventFlags_;
1779   eventFlags_ |= enable;
1780   eventFlags_ &= ~disable;
1781   if (eventFlags_ == oldFlags) {
1782     return true;
1783   } else {
1784     return updateEventRegistration();
1785   }
1786 }
1787
1788 void AsyncSocket::startFail() {
1789   // startFail() should only be called once
1790   assert(state_ != StateEnum::ERROR);
1791   assert(getDestructorGuardCount() > 0);
1792   state_ = StateEnum::ERROR;
1793   // Ensure that SHUT_READ and SHUT_WRITE are set,
1794   // so all future attempts to read or write will be rejected
1795   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1796
1797   if (eventFlags_ != EventHandler::NONE) {
1798     eventFlags_ = EventHandler::NONE;
1799     ioHandler_.unregisterHandler();
1800   }
1801   writeTimeout_.cancelTimeout();
1802
1803   if (fd_ >= 0) {
1804     ioHandler_.changeHandlerFD(-1);
1805     doClose();
1806   }
1807 }
1808
1809 void AsyncSocket::finishFail() {
1810   assert(state_ == StateEnum::ERROR);
1811   assert(getDestructorGuardCount() > 0);
1812
1813   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1814                          withAddr("socket closing after error"));
1815   invokeConnectErr(ex);
1816   failAllWrites(ex);
1817
1818   if (readCallback_) {
1819     ReadCallback* callback = readCallback_;
1820     readCallback_ = nullptr;
1821     callback->readErr(ex);
1822   }
1823 }
1824
1825 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1826   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1827              << state_ << " host=" << addr_.describe()
1828              << "): failed in " << fn << "(): "
1829              << ex.what();
1830   startFail();
1831   finishFail();
1832 }
1833
1834 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1835   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1836                << state_ << " host=" << addr_.describe()
1837                << "): failed while connecting in " << fn << "(): "
1838                << ex.what();
1839   startFail();
1840
1841   invokeConnectErr(ex);
1842   finishFail();
1843 }
1844
1845 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1846   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1847                << state_ << " host=" << addr_.describe()
1848                << "): failed while reading in " << fn << "(): "
1849                << ex.what();
1850   startFail();
1851
1852   if (readCallback_ != nullptr) {
1853     ReadCallback* callback = readCallback_;
1854     readCallback_ = nullptr;
1855     callback->readErr(ex);
1856   }
1857
1858   finishFail();
1859 }
1860
1861 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1862   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1863                << state_ << " host=" << addr_.describe()
1864                << "): failed while writing in " << fn << "(): "
1865                << ex.what();
1866   startFail();
1867
1868   // Only invoke the first write callback, since the error occurred while
1869   // writing this request.  Let any other pending write callbacks be invoked in
1870   // finishFail().
1871   if (writeReqHead_ != nullptr) {
1872     WriteRequest* req = writeReqHead_;
1873     writeReqHead_ = req->getNext();
1874     WriteCallback* callback = req->getCallback();
1875     uint32_t bytesWritten = req->getTotalBytesWritten();
1876     req->destroy();
1877     if (callback) {
1878       callback->writeErr(bytesWritten, ex);
1879     }
1880   }
1881
1882   finishFail();
1883 }
1884
1885 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1886                              size_t bytesWritten,
1887                              const AsyncSocketException& ex) {
1888   // This version of failWrite() is used when the failure occurs before
1889   // we've added the callback to writeReqHead_.
1890   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1891              << state_ << " host=" << addr_.describe()
1892              <<"): failed while writing in " << fn << "(): "
1893              << ex.what();
1894   startFail();
1895
1896   if (callback != nullptr) {
1897     callback->writeErr(bytesWritten, ex);
1898   }
1899
1900   finishFail();
1901 }
1902
1903 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1904   // Invoke writeError() on all write callbacks.
1905   // This is used when writes are forcibly shutdown with write requests
1906   // pending, or when an error occurs with writes pending.
1907   while (writeReqHead_ != nullptr) {
1908     WriteRequest* req = writeReqHead_;
1909     writeReqHead_ = req->getNext();
1910     WriteCallback* callback = req->getCallback();
1911     if (callback) {
1912       callback->writeErr(req->getTotalBytesWritten(), ex);
1913     }
1914     req->destroy();
1915   }
1916 }
1917
1918 void AsyncSocket::invalidState(ConnectCallback* callback) {
1919   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1920              << "): connect() called in invalid state " << state_;
1921
1922   /*
1923    * The invalidState() methods don't use the normal failure mechanisms,
1924    * since we don't know what state we are in.  We don't want to call
1925    * startFail()/finishFail() recursively if we are already in the middle of
1926    * cleaning up.
1927    */
1928
1929   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1930                          "connect() called with socket in invalid state");
1931   connectEndTime_ = std::chrono::steady_clock::now();
1932   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1933     if (callback) {
1934       callback->connectErr(ex);
1935     }
1936   } else {
1937     // We can't use failConnect() here since connectCallback_
1938     // may already be set to another callback.  Invoke this ConnectCallback
1939     // here; any other connectCallback_ will be invoked in finishFail()
1940     startFail();
1941     if (callback) {
1942       callback->connectErr(ex);
1943     }
1944     finishFail();
1945   }
1946 }
1947
1948 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1949   connectEndTime_ = std::chrono::steady_clock::now();
1950   if (connectCallback_) {
1951     ConnectCallback* callback = connectCallback_;
1952     connectCallback_ = nullptr;
1953     callback->connectErr(ex);
1954   }
1955 }
1956
1957 void AsyncSocket::invokeConnectSuccess() {
1958   connectEndTime_ = std::chrono::steady_clock::now();
1959   if (connectCallback_) {
1960     ConnectCallback* callback = connectCallback_;
1961     connectCallback_ = nullptr;
1962     callback->connectSuccess();
1963   }
1964 }
1965
1966 void AsyncSocket::invalidState(ReadCallback* callback) {
1967   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1968              << "): setReadCallback(" << callback
1969              << ") called in invalid state " << state_;
1970
1971   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1972                          "setReadCallback() called with socket in "
1973                          "invalid state");
1974   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1975     if (callback) {
1976       callback->readErr(ex);
1977     }
1978   } else {
1979     startFail();
1980     if (callback) {
1981       callback->readErr(ex);
1982     }
1983     finishFail();
1984   }
1985 }
1986
1987 void AsyncSocket::invalidState(WriteCallback* callback) {
1988   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1989              << "): write() called in invalid state " << state_;
1990
1991   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1992                          withAddr("write() called with socket in invalid state"));
1993   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1994     if (callback) {
1995       callback->writeErr(0, ex);
1996     }
1997   } else {
1998     startFail();
1999     if (callback) {
2000       callback->writeErr(0, ex);
2001     }
2002     finishFail();
2003   }
2004 }
2005
2006 void AsyncSocket::doClose() {
2007   if (fd_ == -1) return;
2008   if (shutdownSocketSet_) {
2009     shutdownSocketSet_->close(fd_);
2010   } else {
2011     ::close(fd_);
2012   }
2013   fd_ = -1;
2014 }
2015
2016 std::ostream& operator << (std::ostream& os,
2017                            const AsyncSocket::StateEnum& state) {
2018   os << static_cast<int>(state);
2019   return os;
2020 }
2021
2022 std::string AsyncSocket::withAddr(const std::string& s) {
2023   // Don't use addr_ directly because it may not be initialized
2024   // e.g. if constructed from fd
2025   folly::SocketAddress peer, local;
2026   try {
2027     getPeerAddress(&peer);
2028     getLocalAddress(&local);
2029   } catch (const std::exception&) {
2030     // ignore
2031   } catch (...) {
2032     // ignore
2033   }
2034   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2035 }
2036
2037 } // folly