Stop abusing errno
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/portability/SysUio.h>
24
25 #include <poll.h>
26 #include <errno.h>
27 #include <limits.h>
28 #include <unistd.h>
29 #include <thread>
30 #include <fcntl.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <boost/preprocessor/control/if.hpp>
36
37 using std::string;
38 using std::unique_ptr;
39
40 namespace folly {
41
42 // static members initializers
43 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
44
45 const AsyncSocketException socketClosedLocallyEx(
46     AsyncSocketException::END_OF_FILE, "socket closed locally");
47 const AsyncSocketException socketShutdownForWritesEx(
48     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
49
50 // TODO: It might help performance to provide a version of BytesWriteRequest that
51 // users could derive from, so we can avoid the extra allocation for each call
52 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
53 // protocols are currently templatized for transports.
54 //
55 // We would need the version for external users where they provide the iovec
56 // storage space, and only our internal version would allocate it at the end of
57 // the WriteRequest.
58
59 /* The default WriteRequest implementation, used for write(), writev() and
60  * writeChain()
61  *
62  * A new BytesWriteRequest operation is allocated on the heap for all write
63  * operations that cannot be completed immediately.
64  */
65 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
66  public:
67   static BytesWriteRequest* newRequest(AsyncSocket* socket,
68                                        WriteCallback* callback,
69                                        const iovec* ops,
70                                        uint32_t opCount,
71                                        uint32_t partialWritten,
72                                        uint32_t bytesWritten,
73                                        unique_ptr<IOBuf>&& ioBuf,
74                                        WriteFlags flags) {
75     assert(opCount > 0);
76     // Since we put a variable size iovec array at the end
77     // of each BytesWriteRequest, we have to manually allocate the memory.
78     void* buf = malloc(sizeof(BytesWriteRequest) +
79                        (opCount * sizeof(struct iovec)));
80     if (buf == nullptr) {
81       throw std::bad_alloc();
82     }
83
84     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
85                                       partialWritten, bytesWritten,
86                                       std::move(ioBuf), flags);
87   }
88
89   void destroy() override {
90     this->~BytesWriteRequest();
91     free(this);
92   }
93
94   WriteResult performWrite() override {
95     WriteFlags writeFlags = flags_;
96     if (getNext() != nullptr) {
97       writeFlags = writeFlags | WriteFlags::CORK;
98     }
99     return socket_->performWrite(
100         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
101   }
102
103   bool isComplete() override {
104     return opsWritten_ == getOpCount();
105   }
106
107   void consume() override {
108     // Advance opIndex_ forward by opsWritten_
109     opIndex_ += opsWritten_;
110     assert(opIndex_ < opCount_);
111
112     // If we've finished writing any IOBufs, release them
113     if (ioBuf_) {
114       for (uint32_t i = opsWritten_; i != 0; --i) {
115         assert(ioBuf_);
116         ioBuf_ = ioBuf_->pop();
117       }
118     }
119
120     // Move partialBytes_ forward into the current iovec buffer
121     struct iovec* currentOp = writeOps_ + opIndex_;
122     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
123     currentOp->iov_base =
124       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
125     currentOp->iov_len -= partialBytes_;
126
127     // Increment the totalBytesWritten_ count by bytesWritten_;
128     totalBytesWritten_ += bytesWritten_;
129   }
130
131  private:
132   BytesWriteRequest(AsyncSocket* socket,
133                     WriteCallback* callback,
134                     const struct iovec* ops,
135                     uint32_t opCount,
136                     uint32_t partialBytes,
137                     uint32_t bytesWritten,
138                     unique_ptr<IOBuf>&& ioBuf,
139                     WriteFlags flags)
140     : AsyncSocket::WriteRequest(socket, callback)
141     , opCount_(opCount)
142     , opIndex_(0)
143     , flags_(flags)
144     , ioBuf_(std::move(ioBuf))
145     , opsWritten_(0)
146     , partialBytes_(partialBytes)
147     , bytesWritten_(bytesWritten) {
148     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
149   }
150
151   // private destructor, to ensure callers use destroy()
152   ~BytesWriteRequest() override = default;
153
154   const struct iovec* getOps() const {
155     assert(opCount_ > opIndex_);
156     return writeOps_ + opIndex_;
157   }
158
159   uint32_t getOpCount() const {
160     assert(opCount_ > opIndex_);
161     return opCount_ - opIndex_;
162   }
163
164   uint32_t opCount_;            ///< number of entries in writeOps_
165   uint32_t opIndex_;            ///< current index into writeOps_
166   WriteFlags flags_;            ///< set for WriteFlags
167   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
168
169   // for consume(), how much we wrote on the last write
170   uint32_t opsWritten_;         ///< complete ops written
171   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
172   ssize_t bytesWritten_;        ///< bytes written altogether
173
174   struct iovec writeOps_[];     ///< write operation(s) list
175 };
176
177 AsyncSocket::AsyncSocket()
178   : eventBase_(nullptr)
179   , writeTimeout_(this, nullptr)
180   , ioHandler_(this, nullptr)
181   , immediateReadHandler_(this) {
182   VLOG(5) << "new AsyncSocket()";
183   init();
184 }
185
186 AsyncSocket::AsyncSocket(EventBase* evb)
187   : eventBase_(evb)
188   , writeTimeout_(this, evb)
189   , ioHandler_(this, evb)
190   , immediateReadHandler_(this) {
191   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
192   init();
193 }
194
195 AsyncSocket::AsyncSocket(EventBase* evb,
196                            const folly::SocketAddress& address,
197                            uint32_t connectTimeout)
198   : AsyncSocket(evb) {
199   connect(nullptr, address, connectTimeout);
200 }
201
202 AsyncSocket::AsyncSocket(EventBase* evb,
203                            const std::string& ip,
204                            uint16_t port,
205                            uint32_t connectTimeout)
206   : AsyncSocket(evb) {
207   connect(nullptr, ip, port, connectTimeout);
208 }
209
210 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
211   : eventBase_(evb)
212   , writeTimeout_(this, evb)
213   , ioHandler_(this, evb, fd)
214   , immediateReadHandler_(this) {
215   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
216           << fd << ")";
217   init();
218   fd_ = fd;
219   setCloseOnExec();
220   state_ = StateEnum::ESTABLISHED;
221 }
222
223 // init() method, since constructor forwarding isn't supported in most
224 // compilers yet.
225 void AsyncSocket::init() {
226   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
227   shutdownFlags_ = 0;
228   state_ = StateEnum::UNINIT;
229   eventFlags_ = EventHandler::NONE;
230   fd_ = -1;
231   sendTimeout_ = 0;
232   maxReadsPerEvent_ = 16;
233   connectCallback_ = nullptr;
234   readCallback_ = nullptr;
235   writeReqHead_ = nullptr;
236   writeReqTail_ = nullptr;
237   shutdownSocketSet_ = nullptr;
238   appBytesWritten_ = 0;
239   appBytesReceived_ = 0;
240 }
241
242 AsyncSocket::~AsyncSocket() {
243   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
244           << ", evb=" << eventBase_ << ", fd=" << fd_
245           << ", state=" << state_ << ")";
246 }
247
248 void AsyncSocket::destroy() {
249   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
250           << ", fd=" << fd_ << ", state=" << state_;
251   // When destroy is called, close the socket immediately
252   closeNow();
253
254   // Then call DelayedDestruction::destroy() to take care of
255   // whether or not we need immediate or delayed destruction
256   DelayedDestruction::destroy();
257 }
258
259 int AsyncSocket::detachFd() {
260   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
261           << ", evb=" << eventBase_ << ", state=" << state_
262           << ", events=" << std::hex << eventFlags_ << ")";
263   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
264   // actually close the descriptor.
265   if (shutdownSocketSet_) {
266     shutdownSocketSet_->remove(fd_);
267   }
268   int fd = fd_;
269   fd_ = -1;
270   // Call closeNow() to invoke all pending callbacks with an error.
271   closeNow();
272   // Update the EventHandler to stop using this fd.
273   // This can only be done after closeNow() unregisters the handler.
274   ioHandler_.changeHandlerFD(-1);
275   return fd;
276 }
277
278 const folly::SocketAddress& AsyncSocket::anyAddress() {
279   static const folly::SocketAddress anyAddress =
280     folly::SocketAddress("0.0.0.0", 0);
281   return anyAddress;
282 }
283
284 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
285   if (shutdownSocketSet_ == newSS) {
286     return;
287   }
288   if (shutdownSocketSet_ && fd_ != -1) {
289     shutdownSocketSet_->remove(fd_);
290   }
291   shutdownSocketSet_ = newSS;
292   if (shutdownSocketSet_ && fd_ != -1) {
293     shutdownSocketSet_->add(fd_);
294   }
295 }
296
297 void AsyncSocket::setCloseOnExec() {
298   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
299   if (rv != 0) {
300     auto errnoCopy = errno;
301     throw AsyncSocketException(
302         AsyncSocketException::INTERNAL_ERROR,
303         withAddr("failed to set close-on-exec flag"),
304         errnoCopy);
305   }
306 }
307
308 void AsyncSocket::connect(ConnectCallback* callback,
309                            const folly::SocketAddress& address,
310                            int timeout,
311                            const OptionMap &options,
312                            const folly::SocketAddress& bindAddr) noexcept {
313   DestructorGuard dg(this);
314   assert(eventBase_->isInEventBaseThread());
315
316   addr_ = address;
317
318   // Make sure we're in the uninitialized state
319   if (state_ != StateEnum::UNINIT) {
320     return invalidState(callback);
321   }
322
323   connectTimeout_ = std::chrono::milliseconds(timeout);
324   connectStartTime_ = std::chrono::steady_clock::now();
325   // Make connect end time at least >= connectStartTime.
326   connectEndTime_ = connectStartTime_;
327
328   assert(fd_ == -1);
329   state_ = StateEnum::CONNECTING;
330   connectCallback_ = callback;
331
332   sockaddr_storage addrStorage;
333   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
334
335   try {
336     // Create the socket
337     // Technically the first parameter should actually be a protocol family
338     // constant (PF_xxx) rather than an address family (AF_xxx), but the
339     // distinction is mainly just historical.  In pretty much all
340     // implementations the PF_foo and AF_foo constants are identical.
341     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
342     if (fd_ < 0) {
343       auto errnoCopy = errno;
344       throw AsyncSocketException(
345           AsyncSocketException::INTERNAL_ERROR,
346           withAddr("failed to create socket"),
347           errnoCopy);
348     }
349     if (shutdownSocketSet_) {
350       shutdownSocketSet_->add(fd_);
351     }
352     ioHandler_.changeHandlerFD(fd_);
353
354     setCloseOnExec();
355
356     // Put the socket in non-blocking mode
357     int flags = fcntl(fd_, F_GETFL, 0);
358     if (flags == -1) {
359       auto errnoCopy = errno;
360       throw AsyncSocketException(
361           AsyncSocketException::INTERNAL_ERROR,
362           withAddr("failed to get socket flags"),
363           errnoCopy);
364     }
365     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
366     if (rv == -1) {
367       auto errnoCopy = errno;
368       throw AsyncSocketException(
369           AsyncSocketException::INTERNAL_ERROR,
370           withAddr("failed to put socket in non-blocking mode"),
371           errnoCopy);
372     }
373
374 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
375     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
376     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
377     if (rv == -1) {
378       auto errnoCopy = errno;
379       throw AsyncSocketException(
380           AsyncSocketException::INTERNAL_ERROR,
381           "failed to enable F_SETNOSIGPIPE on socket",
382           errnoCopy);
383     }
384 #endif
385
386     // By default, turn on TCP_NODELAY
387     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
388     // setNoDelay() will log an error message if it fails.
389     if (address.getFamily() != AF_UNIX) {
390       (void)setNoDelay(true);
391     }
392
393     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
394             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
395
396     // bind the socket
397     if (bindAddr != anyAddress()) {
398       int one = 1;
399       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
400         auto errnoCopy = errno;
401         doClose();
402         throw AsyncSocketException(
403             AsyncSocketException::NOT_OPEN,
404             "failed to setsockopt prior to bind on " + bindAddr.describe(),
405             errnoCopy);
406       }
407
408       bindAddr.getAddress(&addrStorage);
409
410       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
411         auto errnoCopy = errno;
412         doClose();
413         throw AsyncSocketException(
414             AsyncSocketException::NOT_OPEN,
415             "failed to bind to async socket: " + bindAddr.describe(),
416             errnoCopy);
417       }
418     }
419
420     // Apply the additional options if any.
421     for (const auto& opt: options) {
422       int rv = opt.first.apply(fd_, opt.second);
423       if (rv != 0) {
424         auto errnoCopy = errno;
425         throw AsyncSocketException(
426             AsyncSocketException::INTERNAL_ERROR,
427             withAddr("failed to set socket option"),
428             errnoCopy);
429       }
430     }
431
432     // Perform the connect()
433     address.getAddress(&addrStorage);
434
435     rv = ::connect(fd_, saddr, address.getActualSize());
436     if (rv < 0) {
437       auto errnoCopy = errno;
438       if (errnoCopy == EINPROGRESS) {
439         // Connection in progress.
440         if (timeout > 0) {
441           // Start a timer in case the connection takes too long.
442           if (!writeTimeout_.scheduleTimeout(timeout)) {
443             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
444                 withAddr("failed to schedule AsyncSocket connect timeout"));
445           }
446         }
447
448         // Register for write events, so we'll
449         // be notified when the connection finishes/fails.
450         // Note that we don't register for a persistent event here.
451         assert(eventFlags_ == EventHandler::NONE);
452         eventFlags_ = EventHandler::WRITE;
453         if (!ioHandler_.registerHandler(eventFlags_)) {
454           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
455               withAddr("failed to register AsyncSocket connect handler"));
456         }
457         return;
458       } else {
459         throw AsyncSocketException(
460             AsyncSocketException::NOT_OPEN,
461             "connect failed (immediately)",
462             errnoCopy);
463       }
464     }
465
466     // If we're still here the connect() succeeded immediately.
467     // Fall through to call the callback outside of this try...catch block
468   } catch (const AsyncSocketException& ex) {
469     return failConnect(__func__, ex);
470   } catch (const std::exception& ex) {
471     // shouldn't happen, but handle it just in case
472     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
473                << "): unexpected " << typeid(ex).name() << " exception: "
474                << ex.what();
475     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
476                             withAddr(string("unexpected exception: ") +
477                                      ex.what()));
478     return failConnect(__func__, tex);
479   }
480
481   // The connection succeeded immediately
482   // The read callback may not have been set yet, and no writes may be pending
483   // yet, so we don't have to register for any events at the moment.
484   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
485   assert(readCallback_ == nullptr);
486   assert(writeReqHead_ == nullptr);
487   state_ = StateEnum::ESTABLISHED;
488   invokeConnectSuccess();
489 }
490
491 void AsyncSocket::connect(ConnectCallback* callback,
492                            const string& ip, uint16_t port,
493                            int timeout,
494                            const OptionMap &options) noexcept {
495   DestructorGuard dg(this);
496   try {
497     connectCallback_ = callback;
498     connect(callback, folly::SocketAddress(ip, port), timeout, options);
499   } catch (const std::exception& ex) {
500     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
501                             ex.what());
502     return failConnect(__func__, tex);
503   }
504 }
505
506 void AsyncSocket::cancelConnect() {
507   connectCallback_ = nullptr;
508   if (state_ == StateEnum::CONNECTING) {
509     closeNow();
510   }
511 }
512
513 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
514   sendTimeout_ = milliseconds;
515   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
516
517   // If we are currently pending on write requests, immediately update
518   // writeTimeout_ with the new value.
519   if ((eventFlags_ & EventHandler::WRITE) &&
520       (state_ != StateEnum::CONNECTING)) {
521     assert(state_ == StateEnum::ESTABLISHED);
522     assert((shutdownFlags_ & SHUT_WRITE) == 0);
523     if (sendTimeout_ > 0) {
524       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
525         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
526             withAddr("failed to reschedule send timeout in setSendTimeout"));
527         return failWrite(__func__, ex);
528       }
529     } else {
530       writeTimeout_.cancelTimeout();
531     }
532   }
533 }
534
535 void AsyncSocket::setReadCB(ReadCallback *callback) {
536   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
537           << ", callback=" << callback << ", state=" << state_;
538
539   // Short circuit if callback is the same as the existing readCallback_.
540   //
541   // Note that this is needed for proper functioning during some cleanup cases.
542   // During cleanup we allow setReadCallback(nullptr) to be called even if the
543   // read callback is already unset and we have been detached from an event
544   // base.  This check prevents us from asserting
545   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
546   if (callback == readCallback_) {
547     return;
548   }
549
550   /* We are removing a read callback */
551   if (callback == nullptr &&
552       immediateReadHandler_.isLoopCallbackScheduled()) {
553     immediateReadHandler_.cancelLoopCallback();
554   }
555
556   if (shutdownFlags_ & SHUT_READ) {
557     // Reads have already been shut down on this socket.
558     //
559     // Allow setReadCallback(nullptr) to be called in this case, but don't
560     // allow a new callback to be set.
561     //
562     // For example, setReadCallback(nullptr) can happen after an error if we
563     // invoke some other error callback before invoking readError().  The other
564     // error callback that is invoked first may go ahead and clear the read
565     // callback before we get a chance to invoke readError().
566     if (callback != nullptr) {
567       return invalidState(callback);
568     }
569     assert((eventFlags_ & EventHandler::READ) == 0);
570     readCallback_ = nullptr;
571     return;
572   }
573
574   DestructorGuard dg(this);
575   assert(eventBase_->isInEventBaseThread());
576
577   switch ((StateEnum)state_) {
578     case StateEnum::CONNECTING:
579       // For convenience, we allow the read callback to be set while we are
580       // still connecting.  We just store the callback for now.  Once the
581       // connection completes we'll register for read events.
582       readCallback_ = callback;
583       return;
584     case StateEnum::ESTABLISHED:
585     {
586       readCallback_ = callback;
587       uint16_t oldFlags = eventFlags_;
588       if (readCallback_) {
589         eventFlags_ |= EventHandler::READ;
590       } else {
591         eventFlags_ &= ~EventHandler::READ;
592       }
593
594       // Update our registration if our flags have changed
595       if (eventFlags_ != oldFlags) {
596         // We intentionally ignore the return value here.
597         // updateEventRegistration() will move us into the error state if it
598         // fails, and we don't need to do anything else here afterwards.
599         (void)updateEventRegistration();
600       }
601
602       if (readCallback_) {
603         checkForImmediateRead();
604       }
605       return;
606     }
607     case StateEnum::CLOSED:
608     case StateEnum::ERROR:
609       // We should never reach here.  SHUT_READ should always be set
610       // if we are in STATE_CLOSED or STATE_ERROR.
611       assert(false);
612       return invalidState(callback);
613     case StateEnum::UNINIT:
614       // We do not allow setReadCallback() to be called before we start
615       // connecting.
616       return invalidState(callback);
617   }
618
619   // We don't put a default case in the switch statement, so that the compiler
620   // will warn us to update the switch statement if a new state is added.
621   return invalidState(callback);
622 }
623
624 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
625   return readCallback_;
626 }
627
628 void AsyncSocket::write(WriteCallback* callback,
629                          const void* buf, size_t bytes, WriteFlags flags) {
630   iovec op;
631   op.iov_base = const_cast<void*>(buf);
632   op.iov_len = bytes;
633   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
634 }
635
636 void AsyncSocket::writev(WriteCallback* callback,
637                           const iovec* vec,
638                           size_t count,
639                           WriteFlags flags) {
640   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
641 }
642
643 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
644                               WriteFlags flags) {
645   constexpr size_t kSmallSizeMax = 64;
646   size_t count = buf->countChainElements();
647   if (count <= kSmallSizeMax) {
648     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
649     writeChainImpl(callback, vec, count, std::move(buf), flags);
650   } else {
651     iovec* vec = new iovec[count];
652     writeChainImpl(callback, vec, count, std::move(buf), flags);
653     delete[] vec;
654   }
655 }
656
657 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
658     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
659   size_t veclen = buf->fillIov(vec, count);
660   writeImpl(callback, vec, veclen, std::move(buf), flags);
661 }
662
663 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
664                              size_t count, unique_ptr<IOBuf>&& buf,
665                              WriteFlags flags) {
666   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
667           << ", callback=" << callback << ", count=" << count
668           << ", state=" << state_;
669   DestructorGuard dg(this);
670   unique_ptr<IOBuf>ioBuf(std::move(buf));
671   assert(eventBase_->isInEventBaseThread());
672
673   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
674     // No new writes may be performed after the write side of the socket has
675     // been shutdown.
676     //
677     // We could just call callback->writeError() here to fail just this write.
678     // However, fail hard and use invalidState() to fail all outstanding
679     // callbacks and move the socket into the error state.  There's most likely
680     // a bug in the caller's code, so we abort everything rather than trying to
681     // proceed as best we can.
682     return invalidState(callback);
683   }
684
685   uint32_t countWritten = 0;
686   uint32_t partialWritten = 0;
687   int bytesWritten = 0;
688   bool mustRegister = false;
689   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
690     if (writeReqHead_ == nullptr) {
691       // If we are established and there are no other writes pending,
692       // we can attempt to perform the write immediately.
693       assert(writeReqTail_ == nullptr);
694       assert((eventFlags_ & EventHandler::WRITE) == 0);
695
696       auto writeResult =
697           performWrite(vec, count, flags, &countWritten, &partialWritten);
698       bytesWritten = writeResult.writeReturn;
699       if (bytesWritten < 0) {
700         auto errnoCopy = errno;
701         if (writeResult.exception) {
702           return failWrite(__func__, callback, 0, *writeResult.exception);
703         }
704         AsyncSocketException ex(
705             AsyncSocketException::INTERNAL_ERROR,
706             withAddr("writev failed"),
707             errnoCopy);
708         return failWrite(__func__, callback, 0, ex);
709       } else if (countWritten == count) {
710         // We successfully wrote everything.
711         // Invoke the callback and return.
712         if (callback) {
713           callback->writeSuccess();
714         }
715         return;
716       } else { // continue writing the next writeReq
717         if (bufferCallback_) {
718           bufferCallback_->onEgressBuffered();
719         }
720       }
721       mustRegister = true;
722     }
723   } else if (!connecting()) {
724     // Invalid state for writing
725     return invalidState(callback);
726   }
727
728   // Create a new WriteRequest to add to the queue
729   WriteRequest* req;
730   try {
731     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
732                                         count - countWritten, partialWritten,
733                                         bytesWritten, std::move(ioBuf), flags);
734   } catch (const std::exception& ex) {
735     // we mainly expect to catch std::bad_alloc here
736     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
737         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
738     return failWrite(__func__, callback, bytesWritten, tex);
739   }
740   req->consume();
741   if (writeReqTail_ == nullptr) {
742     assert(writeReqHead_ == nullptr);
743     writeReqHead_ = writeReqTail_ = req;
744   } else {
745     writeReqTail_->append(req);
746     writeReqTail_ = req;
747   }
748
749   // Register for write events if are established and not currently
750   // waiting on write events
751   if (mustRegister) {
752     assert(state_ == StateEnum::ESTABLISHED);
753     assert((eventFlags_ & EventHandler::WRITE) == 0);
754     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
755       assert(state_ == StateEnum::ERROR);
756       return;
757     }
758     if (sendTimeout_ > 0) {
759       // Schedule a timeout to fire if the write takes too long.
760       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
761         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
762                                withAddr("failed to schedule send timeout"));
763         return failWrite(__func__, ex);
764       }
765     }
766   }
767 }
768
769 void AsyncSocket::writeRequest(WriteRequest* req) {
770   if (writeReqTail_ == nullptr) {
771     assert(writeReqHead_ == nullptr);
772     writeReqHead_ = writeReqTail_ = req;
773     req->start();
774   } else {
775     writeReqTail_->append(req);
776     writeReqTail_ = req;
777   }
778 }
779
780 void AsyncSocket::close() {
781   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
782           << ", state=" << state_ << ", shutdownFlags="
783           << std::hex << (int) shutdownFlags_;
784
785   // close() is only different from closeNow() when there are pending writes
786   // that need to drain before we can close.  In all other cases, just call
787   // closeNow().
788   //
789   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
790   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
791   // is still running.  (e.g., If there are multiple pending writes, and we
792   // call writeError() on the first one, it may call close().  In this case we
793   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
794   // writes will still be in the queue.)
795   //
796   // We only need to drain pending writes if we are still in STATE_CONNECTING
797   // or STATE_ESTABLISHED
798   if ((writeReqHead_ == nullptr) ||
799       !(state_ == StateEnum::CONNECTING ||
800       state_ == StateEnum::ESTABLISHED)) {
801     closeNow();
802     return;
803   }
804
805   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
806   // destroyed until close() returns.
807   DestructorGuard dg(this);
808   assert(eventBase_->isInEventBaseThread());
809
810   // Since there are write requests pending, we have to set the
811   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
812   // connect finishes and we finish writing these requests.
813   //
814   // Set SHUT_READ to indicate that reads are shut down, and set the
815   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
816   // pending writes complete.
817   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
818
819   // If a read callback is set, invoke readEOF() immediately to inform it that
820   // the socket has been closed and no more data can be read.
821   if (readCallback_) {
822     // Disable reads if they are enabled
823     if (!updateEventRegistration(0, EventHandler::READ)) {
824       // We're now in the error state; callbacks have been cleaned up
825       assert(state_ == StateEnum::ERROR);
826       assert(readCallback_ == nullptr);
827     } else {
828       ReadCallback* callback = readCallback_;
829       readCallback_ = nullptr;
830       callback->readEOF();
831     }
832   }
833 }
834
835 void AsyncSocket::closeNow() {
836   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
837           << ", state=" << state_ << ", shutdownFlags="
838           << std::hex << (int) shutdownFlags_;
839   DestructorGuard dg(this);
840   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
841
842   switch (state_) {
843     case StateEnum::ESTABLISHED:
844     case StateEnum::CONNECTING:
845     {
846       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
847       state_ = StateEnum::CLOSED;
848
849       // If the write timeout was set, cancel it.
850       writeTimeout_.cancelTimeout();
851
852       // If we are registered for I/O events, unregister.
853       if (eventFlags_ != EventHandler::NONE) {
854         eventFlags_ = EventHandler::NONE;
855         if (!updateEventRegistration()) {
856           // We will have been moved into the error state.
857           assert(state_ == StateEnum::ERROR);
858           return;
859         }
860       }
861
862       if (immediateReadHandler_.isLoopCallbackScheduled()) {
863         immediateReadHandler_.cancelLoopCallback();
864       }
865
866       if (fd_ >= 0) {
867         ioHandler_.changeHandlerFD(-1);
868         doClose();
869       }
870
871       invokeConnectErr(socketClosedLocallyEx);
872
873       failAllWrites(socketClosedLocallyEx);
874
875       if (readCallback_) {
876         ReadCallback* callback = readCallback_;
877         readCallback_ = nullptr;
878         callback->readEOF();
879       }
880       return;
881     }
882     case StateEnum::CLOSED:
883       // Do nothing.  It's possible that we are being called recursively
884       // from inside a callback that we invoked inside another call to close()
885       // that is still running.
886       return;
887     case StateEnum::ERROR:
888       // Do nothing.  The error handling code has performed (or is performing)
889       // cleanup.
890       return;
891     case StateEnum::UNINIT:
892       assert(eventFlags_ == EventHandler::NONE);
893       assert(connectCallback_ == nullptr);
894       assert(readCallback_ == nullptr);
895       assert(writeReqHead_ == nullptr);
896       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
897       state_ = StateEnum::CLOSED;
898       return;
899   }
900
901   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
902               << ") called in unknown state " << state_;
903 }
904
905 void AsyncSocket::closeWithReset() {
906   // Enable SO_LINGER, with the linger timeout set to 0.
907   // This will trigger a TCP reset when we close the socket.
908   if (fd_ >= 0) {
909     struct linger optLinger = {1, 0};
910     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
911       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
912               << "on " << fd_ << ": errno=" << errno;
913     }
914   }
915
916   // Then let closeNow() take care of the rest
917   closeNow();
918 }
919
920 void AsyncSocket::shutdownWrite() {
921   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
922           << ", state=" << state_ << ", shutdownFlags="
923           << std::hex << (int) shutdownFlags_;
924
925   // If there are no pending writes, shutdownWrite() is identical to
926   // shutdownWriteNow().
927   if (writeReqHead_ == nullptr) {
928     shutdownWriteNow();
929     return;
930   }
931
932   assert(eventBase_->isInEventBaseThread());
933
934   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
935   // shutdown will be performed once all writes complete.
936   shutdownFlags_ |= SHUT_WRITE_PENDING;
937 }
938
939 void AsyncSocket::shutdownWriteNow() {
940   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
941           << ", fd=" << fd_ << ", state=" << state_
942           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
943
944   if (shutdownFlags_ & SHUT_WRITE) {
945     // Writes are already shutdown; nothing else to do.
946     return;
947   }
948
949   // If SHUT_READ is already set, just call closeNow() to completely
950   // close the socket.  This can happen if close() was called with writes
951   // pending, and then shutdownWriteNow() is called before all pending writes
952   // complete.
953   if (shutdownFlags_ & SHUT_READ) {
954     closeNow();
955     return;
956   }
957
958   DestructorGuard dg(this);
959   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
960
961   switch (static_cast<StateEnum>(state_)) {
962     case StateEnum::ESTABLISHED:
963     {
964       shutdownFlags_ |= SHUT_WRITE;
965
966       // If the write timeout was set, cancel it.
967       writeTimeout_.cancelTimeout();
968
969       // If we are registered for write events, unregister.
970       if (!updateEventRegistration(0, EventHandler::WRITE)) {
971         // We will have been moved into the error state.
972         assert(state_ == StateEnum::ERROR);
973         return;
974       }
975
976       // Shutdown writes on the file descriptor
977       ::shutdown(fd_, SHUT_WR);
978
979       // Immediately fail all write requests
980       failAllWrites(socketShutdownForWritesEx);
981       return;
982     }
983     case StateEnum::CONNECTING:
984     {
985       // Set the SHUT_WRITE_PENDING flag.
986       // When the connection completes, it will check this flag,
987       // shutdown the write half of the socket, and then set SHUT_WRITE.
988       shutdownFlags_ |= SHUT_WRITE_PENDING;
989
990       // Immediately fail all write requests
991       failAllWrites(socketShutdownForWritesEx);
992       return;
993     }
994     case StateEnum::UNINIT:
995       // Callers normally shouldn't call shutdownWriteNow() before the socket
996       // even starts connecting.  Nonetheless, go ahead and set
997       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
998       // immediately shut down the write side of the socket.
999       shutdownFlags_ |= SHUT_WRITE_PENDING;
1000       return;
1001     case StateEnum::CLOSED:
1002     case StateEnum::ERROR:
1003       // We should never get here.  SHUT_WRITE should always be set
1004       // in STATE_CLOSED and STATE_ERROR.
1005       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1006                  << ", fd=" << fd_ << ") in unexpected state " << state_
1007                  << " with SHUT_WRITE not set ("
1008                  << std::hex << (int) shutdownFlags_ << ")";
1009       assert(false);
1010       return;
1011   }
1012
1013   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1014               << fd_ << ") called in unknown state " << state_;
1015 }
1016
1017 bool AsyncSocket::readable() const {
1018   if (fd_ == -1) {
1019     return false;
1020   }
1021   struct pollfd fds[1];
1022   fds[0].fd = fd_;
1023   fds[0].events = POLLIN;
1024   fds[0].revents = 0;
1025   int rc = poll(fds, 1, 0);
1026   return rc == 1;
1027 }
1028
1029 bool AsyncSocket::isPending() const {
1030   return ioHandler_.isPending();
1031 }
1032
1033 bool AsyncSocket::hangup() const {
1034   if (fd_ == -1) {
1035     // sanity check, no one should ask for hangup if we are not connected.
1036     assert(false);
1037     return false;
1038   }
1039 #ifdef POLLRDHUP // Linux-only
1040   struct pollfd fds[1];
1041   fds[0].fd = fd_;
1042   fds[0].events = POLLRDHUP|POLLHUP;
1043   fds[0].revents = 0;
1044   poll(fds, 1, 0);
1045   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1046 #else
1047   return false;
1048 #endif
1049 }
1050
1051 bool AsyncSocket::good() const {
1052   return ((state_ == StateEnum::CONNECTING ||
1053           state_ == StateEnum::ESTABLISHED) &&
1054           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1055 }
1056
1057 bool AsyncSocket::error() const {
1058   return (state_ == StateEnum::ERROR);
1059 }
1060
1061 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1062   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1063           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1064           << ", state=" << state_ << ", events="
1065           << std::hex << eventFlags_ << ")";
1066   assert(eventBase_ == nullptr);
1067   assert(eventBase->isInEventBaseThread());
1068
1069   eventBase_ = eventBase;
1070   ioHandler_.attachEventBase(eventBase);
1071   writeTimeout_.attachEventBase(eventBase);
1072 }
1073
1074 void AsyncSocket::detachEventBase() {
1075   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1076           << ", old evb=" << eventBase_ << ", state=" << state_
1077           << ", events=" << std::hex << eventFlags_ << ")";
1078   assert(eventBase_ != nullptr);
1079   assert(eventBase_->isInEventBaseThread());
1080
1081   eventBase_ = nullptr;
1082   ioHandler_.detachEventBase();
1083   writeTimeout_.detachEventBase();
1084 }
1085
1086 bool AsyncSocket::isDetachable() const {
1087   DCHECK(eventBase_ != nullptr);
1088   DCHECK(eventBase_->isInEventBaseThread());
1089
1090   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1091 }
1092
1093 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1094   if (!localAddr_.isInitialized()) {
1095     localAddr_.setFromLocalAddress(fd_);
1096   }
1097   *address = localAddr_;
1098 }
1099
1100 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1101   if (!addr_.isInitialized()) {
1102     addr_.setFromPeerAddress(fd_);
1103   }
1104   *address = addr_;
1105 }
1106
1107 int AsyncSocket::setNoDelay(bool noDelay) {
1108   if (fd_ < 0) {
1109     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1110                << this << "(state=" << state_ << ")";
1111     return EINVAL;
1112
1113   }
1114
1115   int value = noDelay ? 1 : 0;
1116   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1117     int errnoCopy = errno;
1118     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1119             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1120             << strerror(errnoCopy);
1121     return errnoCopy;
1122   }
1123
1124   return 0;
1125 }
1126
1127 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1128
1129   #ifndef TCP_CONGESTION
1130   #define TCP_CONGESTION  13
1131   #endif
1132
1133   if (fd_ < 0) {
1134     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1135                << "socket " << this << "(state=" << state_ << ")";
1136     return EINVAL;
1137
1138   }
1139
1140   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1141         cname.length() + 1) != 0) {
1142     int errnoCopy = errno;
1143     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1144             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1145             << strerror(errnoCopy);
1146     return errnoCopy;
1147   }
1148
1149   return 0;
1150 }
1151
1152 int AsyncSocket::setQuickAck(bool quickack) {
1153   if (fd_ < 0) {
1154     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1155                << this << "(state=" << state_ << ")";
1156     return EINVAL;
1157
1158   }
1159
1160 #ifdef TCP_QUICKACK // Linux-only
1161   int value = quickack ? 1 : 0;
1162   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1163     int errnoCopy = errno;
1164     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1165             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1166             << strerror(errnoCopy);
1167     return errnoCopy;
1168   }
1169
1170   return 0;
1171 #else
1172   return ENOSYS;
1173 #endif
1174 }
1175
1176 int AsyncSocket::setSendBufSize(size_t bufsize) {
1177   if (fd_ < 0) {
1178     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1179                << this << "(state=" << state_ << ")";
1180     return EINVAL;
1181   }
1182
1183   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1184     int errnoCopy = errno;
1185     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1186             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1187             << strerror(errnoCopy);
1188     return errnoCopy;
1189   }
1190
1191   return 0;
1192 }
1193
1194 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1195   if (fd_ < 0) {
1196     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1197                << this << "(state=" << state_ << ")";
1198     return EINVAL;
1199   }
1200
1201   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1202     int errnoCopy = errno;
1203     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1204             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1205             << strerror(errnoCopy);
1206     return errnoCopy;
1207   }
1208
1209   return 0;
1210 }
1211
1212 int AsyncSocket::setTCPProfile(int profd) {
1213   if (fd_ < 0) {
1214     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1215                << this << "(state=" << state_ << ")";
1216     return EINVAL;
1217   }
1218
1219   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1220     int errnoCopy = errno;
1221     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1222             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1223             << strerror(errnoCopy);
1224     return errnoCopy;
1225   }
1226
1227   return 0;
1228 }
1229
1230 void AsyncSocket::ioReady(uint16_t events) noexcept {
1231   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1232           << ", events=" << std::hex << events << ", state=" << state_;
1233   DestructorGuard dg(this);
1234   assert(events & EventHandler::READ_WRITE);
1235   assert(eventBase_->isInEventBaseThread());
1236
1237   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1238   if (relevantEvents == EventHandler::READ) {
1239     handleRead();
1240   } else if (relevantEvents == EventHandler::WRITE) {
1241     handleWrite();
1242   } else if (relevantEvents == EventHandler::READ_WRITE) {
1243     EventBase* originalEventBase = eventBase_;
1244     // If both read and write events are ready, process writes first.
1245     handleWrite();
1246
1247     // Return now if handleWrite() detached us from our EventBase
1248     if (eventBase_ != originalEventBase) {
1249       return;
1250     }
1251
1252     // Only call handleRead() if a read callback is still installed.
1253     // (It's possible that the read callback was uninstalled during
1254     // handleWrite().)
1255     if (readCallback_) {
1256       handleRead();
1257     }
1258   } else {
1259     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1260                << std::hex << events << "(this=" << this << ")";
1261     abort();
1262   }
1263 }
1264
1265 AsyncSocket::ReadResult
1266 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1267   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1268           << ", buflen=" << *buflen;
1269
1270   int recvFlags = 0;
1271   if (peek_) {
1272     recvFlags |= MSG_PEEK;
1273   }
1274
1275   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1276   if (bytes < 0) {
1277     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1278       // No more data to read right now.
1279       return ReadResult(READ_BLOCKING);
1280     } else {
1281       return ReadResult(READ_ERROR);
1282     }
1283   } else {
1284     appBytesReceived_ += bytes;
1285     return ReadResult(bytes);
1286   }
1287 }
1288
1289 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1290   // no matter what, buffer should be preapared for non-ssl socket
1291   CHECK(readCallback_);
1292   readCallback_->getReadBuffer(buf, buflen);
1293 }
1294
1295 void AsyncSocket::handleRead() noexcept {
1296   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1297           << ", state=" << state_;
1298   assert(state_ == StateEnum::ESTABLISHED);
1299   assert((shutdownFlags_ & SHUT_READ) == 0);
1300   assert(readCallback_ != nullptr);
1301   assert(eventFlags_ & EventHandler::READ);
1302
1303   // Loop until:
1304   // - a read attempt would block
1305   // - readCallback_ is uninstalled
1306   // - the number of loop iterations exceeds the optional maximum
1307   // - this AsyncSocket is moved to another EventBase
1308   //
1309   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1310   // which is why need to check for it here.
1311   //
1312   // The last bullet point is slightly subtle.  readDataAvailable() may also
1313   // detach this socket from this EventBase.  However, before
1314   // readDataAvailable() returns another thread may pick it up, attach it to
1315   // a different EventBase, and install another readCallback_.  We need to
1316   // exit immediately after readDataAvailable() returns if the eventBase_ has
1317   // changed.  (The caller must perform some sort of locking to transfer the
1318   // AsyncSocket between threads properly.  This will be sufficient to ensure
1319   // that this thread sees the updated eventBase_ variable after
1320   // readDataAvailable() returns.)
1321   uint16_t numReads = 0;
1322   EventBase* originalEventBase = eventBase_;
1323   while (readCallback_ && eventBase_ == originalEventBase) {
1324     // Get the buffer to read into.
1325     void* buf = nullptr;
1326     size_t buflen = 0, offset = 0;
1327     try {
1328       prepareReadBuffer(&buf, &buflen);
1329       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1330     } catch (const AsyncSocketException& ex) {
1331       return failRead(__func__, ex);
1332     } catch (const std::exception& ex) {
1333       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1334                               string("ReadCallback::getReadBuffer() "
1335                                      "threw exception: ") +
1336                               ex.what());
1337       return failRead(__func__, tex);
1338     } catch (...) {
1339       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1340                              "ReadCallback::getReadBuffer() threw "
1341                              "non-exception type");
1342       return failRead(__func__, ex);
1343     }
1344     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1345       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1346                              "ReadCallback::getReadBuffer() returned "
1347                              "empty buffer");
1348       return failRead(__func__, ex);
1349     }
1350
1351     // Perform the read
1352     auto readResult = performRead(&buf, &buflen, &offset);
1353     auto bytesRead = readResult.readReturn;
1354     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1355             << bytesRead << " bytes";
1356     if (bytesRead > 0) {
1357       if (!isBufferMovable_) {
1358         readCallback_->readDataAvailable(bytesRead);
1359       } else {
1360         CHECK(kOpenSslModeMoveBufferOwnership);
1361         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1362                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1363                 << ", offset=" << offset;
1364         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1365         readBuf->trimStart(offset);
1366         readBuf->trimEnd(buflen - offset - bytesRead);
1367         readCallback_->readBufferAvailable(std::move(readBuf));
1368       }
1369
1370       // Fall through and continue around the loop if the read
1371       // completely filled the available buffer.
1372       // Note that readCallback_ may have been uninstalled or changed inside
1373       // readDataAvailable().
1374       if (size_t(bytesRead) < buflen) {
1375         return;
1376       }
1377     } else if (bytesRead == READ_BLOCKING) {
1378         // No more data to read right now.
1379         return;
1380     } else if (bytesRead == READ_ERROR) {
1381       readErr_ = READ_ERROR;
1382       if (readResult.exception) {
1383         return failRead(__func__, *readResult.exception);
1384       }
1385       auto errnoCopy = errno;
1386       AsyncSocketException ex(
1387           AsyncSocketException::INTERNAL_ERROR,
1388           withAddr("recv() failed"),
1389           errnoCopy);
1390       return failRead(__func__, ex);
1391     } else {
1392       assert(bytesRead == READ_EOF);
1393       readErr_ = READ_EOF;
1394       // EOF
1395       shutdownFlags_ |= SHUT_READ;
1396       if (!updateEventRegistration(0, EventHandler::READ)) {
1397         // we've already been moved into STATE_ERROR
1398         assert(state_ == StateEnum::ERROR);
1399         assert(readCallback_ == nullptr);
1400         return;
1401       }
1402
1403       ReadCallback* callback = readCallback_;
1404       readCallback_ = nullptr;
1405       callback->readEOF();
1406       return;
1407     }
1408     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1409       if (readCallback_ != nullptr) {
1410         // We might still have data in the socket.
1411         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1412         scheduleImmediateRead();
1413       }
1414       return;
1415     }
1416   }
1417 }
1418
1419 /**
1420  * This function attempts to write as much data as possible, until no more data
1421  * can be written.
1422  *
1423  * - If it sends all available data, it unregisters for write events, and stops
1424  *   the writeTimeout_.
1425  *
1426  * - If not all of the data can be sent immediately, it reschedules
1427  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1428  *   registered for write events.
1429  */
1430 void AsyncSocket::handleWrite() noexcept {
1431   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1432           << ", state=" << state_;
1433   if (state_ == StateEnum::CONNECTING) {
1434     handleConnect();
1435     return;
1436   }
1437
1438   // Normal write
1439   assert(state_ == StateEnum::ESTABLISHED);
1440   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1441   assert(writeReqHead_ != nullptr);
1442
1443   // Loop until we run out of write requests,
1444   // or until this socket is moved to another EventBase.
1445   // (See the comment in handleRead() explaining how this can happen.)
1446   EventBase* originalEventBase = eventBase_;
1447   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1448     auto writeResult = writeReqHead_->performWrite();
1449     if (writeResult.writeReturn < 0) {
1450       if (writeResult.exception) {
1451         return failWrite(__func__, *writeResult.exception);
1452       }
1453       auto errnoCopy = errno;
1454       AsyncSocketException ex(
1455           AsyncSocketException::INTERNAL_ERROR,
1456           withAddr("writev() failed"),
1457           errnoCopy);
1458       return failWrite(__func__, ex);
1459     } else if (writeReqHead_->isComplete()) {
1460       // We finished this request
1461       WriteRequest* req = writeReqHead_;
1462       writeReqHead_ = req->getNext();
1463
1464       if (writeReqHead_ == nullptr) {
1465         writeReqTail_ = nullptr;
1466         // This is the last write request.
1467         // Unregister for write events and cancel the send timer
1468         // before we invoke the callback.  We have to update the state properly
1469         // before calling the callback, since it may want to detach us from
1470         // the EventBase.
1471         if (eventFlags_ & EventHandler::WRITE) {
1472           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1473             assert(state_ == StateEnum::ERROR);
1474             return;
1475           }
1476           // Stop the send timeout
1477           writeTimeout_.cancelTimeout();
1478         }
1479         assert(!writeTimeout_.isScheduled());
1480
1481         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1482         // we finish sending the last write request.
1483         //
1484         // We have to do this before invoking writeSuccess(), since
1485         // writeSuccess() may detach us from our EventBase.
1486         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1487           assert(connectCallback_ == nullptr);
1488           shutdownFlags_ |= SHUT_WRITE;
1489
1490           if (shutdownFlags_ & SHUT_READ) {
1491             // Reads have already been shutdown.  Fully close the socket and
1492             // move to STATE_CLOSED.
1493             //
1494             // Note: This code currently moves us to STATE_CLOSED even if
1495             // close() hasn't ever been called.  This can occur if we have
1496             // received EOF from the peer and shutdownWrite() has been called
1497             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1498             // case, until close() is actually called?  I can't think of a
1499             // reason why we would need to do so.  No other operations besides
1500             // calling close() or destroying the socket can be performed at
1501             // this point.
1502             assert(readCallback_ == nullptr);
1503             state_ = StateEnum::CLOSED;
1504             if (fd_ >= 0) {
1505               ioHandler_.changeHandlerFD(-1);
1506               doClose();
1507             }
1508           } else {
1509             // Reads are still enabled, so we are only doing a half-shutdown
1510             ::shutdown(fd_, SHUT_WR);
1511           }
1512         }
1513       }
1514
1515       // Invoke the callback
1516       WriteCallback* callback = req->getCallback();
1517       req->destroy();
1518       if (callback) {
1519         callback->writeSuccess();
1520       }
1521       // We'll continue around the loop, trying to write another request
1522     } else {
1523       // Partial write.
1524       if (bufferCallback_) {
1525         bufferCallback_->onEgressBuffered();
1526       }
1527       writeReqHead_->consume();
1528       // Stop after a partial write; it's highly likely that a subsequent write
1529       // attempt will just return EAGAIN.
1530       //
1531       // Ensure that we are registered for write events.
1532       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1533         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1534           assert(state_ == StateEnum::ERROR);
1535           return;
1536         }
1537       }
1538
1539       // Reschedule the send timeout, since we have made some write progress.
1540       if (sendTimeout_ > 0) {
1541         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1542           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1543               withAddr("failed to reschedule write timeout"));
1544           return failWrite(__func__, ex);
1545         }
1546       }
1547       return;
1548     }
1549   }
1550   if (!writeReqHead_ && bufferCallback_) {
1551     bufferCallback_->onEgressBufferCleared();
1552   }
1553 }
1554
1555 void AsyncSocket::checkForImmediateRead() noexcept {
1556   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1557   // (However, note that some subclasses do override this method.)
1558   //
1559   // Simply calling handleRead() here would be bad, as this would call
1560   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1561   // buffer even though no data may be available.  This would waste lots of
1562   // memory, since the buffer will sit around unused until the socket actually
1563   // becomes readable.
1564   //
1565   // Checking if the socket is readable now also seems like it would probably
1566   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1567   // would just waste an extra system call.  Even if it is readable, waiting to
1568   // find out from libevent on the next event loop doesn't seem that bad.
1569 }
1570
1571 void AsyncSocket::handleInitialReadWrite() noexcept {
1572   // Our callers should already be holding a DestructorGuard, but grab
1573   // one here just to make sure, in case one of our calling code paths ever
1574   // changes.
1575   DestructorGuard dg(this);
1576
1577   // If we have a readCallback_, make sure we enable read events.  We
1578   // may already be registered for reads if connectSuccess() set
1579   // the read calback.
1580   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1581     assert(state_ == StateEnum::ESTABLISHED);
1582     assert((shutdownFlags_ & SHUT_READ) == 0);
1583     if (!updateEventRegistration(EventHandler::READ, 0)) {
1584       assert(state_ == StateEnum::ERROR);
1585       return;
1586     }
1587     checkForImmediateRead();
1588   } else if (readCallback_ == nullptr) {
1589     // Unregister for read events.
1590     updateEventRegistration(0, EventHandler::READ);
1591   }
1592
1593   // If we have write requests pending, try to send them immediately.
1594   // Since we just finished accepting, there is a very good chance that we can
1595   // write without blocking.
1596   //
1597   // However, we only process them if EventHandler::WRITE is not already set,
1598   // which means that we're already blocked on a write attempt.  (This can
1599   // happen if connectSuccess() called write() before returning.)
1600   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1601     // Call handleWrite() to perform write processing.
1602     handleWrite();
1603   } else if (writeReqHead_ == nullptr) {
1604     // Unregister for write event.
1605     updateEventRegistration(0, EventHandler::WRITE);
1606   }
1607 }
1608
1609 void AsyncSocket::handleConnect() noexcept {
1610   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1611           << ", state=" << state_;
1612   assert(state_ == StateEnum::CONNECTING);
1613   // SHUT_WRITE can never be set while we are still connecting;
1614   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1615   // finishes
1616   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1617
1618   // In case we had a connect timeout, cancel the timeout
1619   writeTimeout_.cancelTimeout();
1620   // We don't use a persistent registration when waiting on a connect event,
1621   // so we have been automatically unregistered now.  Update eventFlags_ to
1622   // reflect reality.
1623   assert(eventFlags_ == EventHandler::WRITE);
1624   eventFlags_ = EventHandler::NONE;
1625
1626   // Call getsockopt() to check if the connect succeeded
1627   int error;
1628   socklen_t len = sizeof(error);
1629   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1630   if (rv != 0) {
1631     auto errnoCopy = errno;
1632     AsyncSocketException ex(
1633         AsyncSocketException::INTERNAL_ERROR,
1634         withAddr("error calling getsockopt() after connect"),
1635         errnoCopy);
1636     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1637                << fd_ << " host=" << addr_.describe()
1638                << ") exception:" << ex.what();
1639     return failConnect(__func__, ex);
1640   }
1641
1642   if (error != 0) {
1643     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1644                            "connect failed", error);
1645     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1646             << fd_ << " host=" << addr_.describe()
1647             << ") exception: " << ex.what();
1648     return failConnect(__func__, ex);
1649   }
1650
1651   // Move into STATE_ESTABLISHED
1652   state_ = StateEnum::ESTABLISHED;
1653
1654   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1655   // perform, immediately shutdown the write half of the socket.
1656   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1657     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1658     // are still connecting we just abort the connect rather than waiting for
1659     // it to complete.
1660     assert((shutdownFlags_ & SHUT_READ) == 0);
1661     ::shutdown(fd_, SHUT_WR);
1662     shutdownFlags_ |= SHUT_WRITE;
1663   }
1664
1665   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1666           << "successfully connected; state=" << state_;
1667
1668   // Remember the EventBase we are attached to, before we start invoking any
1669   // callbacks (since the callbacks may call detachEventBase()).
1670   EventBase* originalEventBase = eventBase_;
1671
1672   invokeConnectSuccess();
1673   // Note that the connect callback may have changed our state.
1674   // (set or unset the read callback, called write(), closed the socket, etc.)
1675   // The following code needs to handle these situations correctly.
1676   //
1677   // If the socket has been closed, readCallback_ and writeReqHead_ will
1678   // always be nullptr, so that will prevent us from trying to read or write.
1679   //
1680   // The main thing to check for is if eventBase_ is still originalEventBase.
1681   // If not, we have been detached from this event base, so we shouldn't
1682   // perform any more operations.
1683   if (eventBase_ != originalEventBase) {
1684     return;
1685   }
1686
1687   handleInitialReadWrite();
1688 }
1689
1690 void AsyncSocket::timeoutExpired() noexcept {
1691   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1692           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1693   DestructorGuard dg(this);
1694   assert(eventBase_->isInEventBaseThread());
1695
1696   if (state_ == StateEnum::CONNECTING) {
1697     // connect() timed out
1698     // Unregister for I/O events.
1699     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1700                            "connect timed out");
1701     failConnect(__func__, ex);
1702   } else {
1703     // a normal write operation timed out
1704     assert(state_ == StateEnum::ESTABLISHED);
1705     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1706     failWrite(__func__, ex);
1707   }
1708 }
1709
1710 AsyncSocket::WriteResult AsyncSocket::performWrite(
1711     const iovec* vec,
1712     uint32_t count,
1713     WriteFlags flags,
1714     uint32_t* countWritten,
1715     uint32_t* partialWritten) {
1716   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1717   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1718   // (since it may terminate the program if the main program doesn't explicitly
1719   // ignore it).
1720   struct msghdr msg;
1721   msg.msg_name = nullptr;
1722   msg.msg_namelen = 0;
1723   msg.msg_iov = const_cast<iovec *>(vec);
1724   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1725   msg.msg_control = nullptr;
1726   msg.msg_controllen = 0;
1727   msg.msg_flags = 0;
1728
1729   int msg_flags = MSG_DONTWAIT;
1730
1731 #ifdef MSG_NOSIGNAL // Linux-only
1732   msg_flags |= MSG_NOSIGNAL;
1733   if (isSet(flags, WriteFlags::CORK)) {
1734     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1735     // give it the rest of the data rather than immediately sending a partial
1736     // frame, even when TCP_NODELAY is enabled.
1737     msg_flags |= MSG_MORE;
1738   }
1739 #endif
1740   if (isSet(flags, WriteFlags::EOR)) {
1741     // marks that this is the last byte of a record (response)
1742     msg_flags |= MSG_EOR;
1743   }
1744   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1745   if (totalWritten < 0) {
1746     if (errno == EAGAIN) {
1747       // TCP buffer is full; we can't write any more data right now.
1748       *countWritten = 0;
1749       *partialWritten = 0;
1750       return WriteResult(0);
1751     }
1752     // error
1753     *countWritten = 0;
1754     *partialWritten = 0;
1755     return WriteResult(WRITE_ERROR);
1756   }
1757
1758   appBytesWritten_ += totalWritten;
1759
1760   uint32_t bytesWritten;
1761   uint32_t n;
1762   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1763     const iovec* v = vec + n;
1764     if (v->iov_len > bytesWritten) {
1765       // Partial write finished in the middle of this iovec
1766       *countWritten = n;
1767       *partialWritten = bytesWritten;
1768       return WriteResult(totalWritten);
1769     }
1770
1771     bytesWritten -= v->iov_len;
1772   }
1773
1774   assert(bytesWritten == 0);
1775   *countWritten = n;
1776   *partialWritten = 0;
1777   return WriteResult(totalWritten);
1778 }
1779
1780 /**
1781  * Re-register the EventHandler after eventFlags_ has changed.
1782  *
1783  * If an error occurs, fail() is called to move the socket into the error state
1784  * and call all currently installed callbacks.  After an error, the
1785  * AsyncSocket is completely unregistered.
1786  *
1787  * @return Returns true on succcess, or false on error.
1788  */
1789 bool AsyncSocket::updateEventRegistration() {
1790   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1791           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1792           << ", events=" << std::hex << eventFlags_;
1793   assert(eventBase_->isInEventBaseThread());
1794   if (eventFlags_ == EventHandler::NONE) {
1795     ioHandler_.unregisterHandler();
1796     return true;
1797   }
1798
1799   // Always register for persistent events, so we don't have to re-register
1800   // after being called back.
1801   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1802     eventFlags_ = EventHandler::NONE; // we're not registered after error
1803     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1804         withAddr("failed to update AsyncSocket event registration"));
1805     fail("updateEventRegistration", ex);
1806     return false;
1807   }
1808
1809   return true;
1810 }
1811
1812 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1813                                            uint16_t disable) {
1814   uint16_t oldFlags = eventFlags_;
1815   eventFlags_ |= enable;
1816   eventFlags_ &= ~disable;
1817   if (eventFlags_ == oldFlags) {
1818     return true;
1819   } else {
1820     return updateEventRegistration();
1821   }
1822 }
1823
1824 void AsyncSocket::startFail() {
1825   // startFail() should only be called once
1826   assert(state_ != StateEnum::ERROR);
1827   assert(getDestructorGuardCount() > 0);
1828   state_ = StateEnum::ERROR;
1829   // Ensure that SHUT_READ and SHUT_WRITE are set,
1830   // so all future attempts to read or write will be rejected
1831   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1832
1833   if (eventFlags_ != EventHandler::NONE) {
1834     eventFlags_ = EventHandler::NONE;
1835     ioHandler_.unregisterHandler();
1836   }
1837   writeTimeout_.cancelTimeout();
1838
1839   if (fd_ >= 0) {
1840     ioHandler_.changeHandlerFD(-1);
1841     doClose();
1842   }
1843 }
1844
1845 void AsyncSocket::finishFail() {
1846   assert(state_ == StateEnum::ERROR);
1847   assert(getDestructorGuardCount() > 0);
1848
1849   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1850                          withAddr("socket closing after error"));
1851   invokeConnectErr(ex);
1852   failAllWrites(ex);
1853
1854   if (readCallback_) {
1855     ReadCallback* callback = readCallback_;
1856     readCallback_ = nullptr;
1857     callback->readErr(ex);
1858   }
1859 }
1860
1861 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1862   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1863              << state_ << " host=" << addr_.describe()
1864              << "): failed in " << fn << "(): "
1865              << ex.what();
1866   startFail();
1867   finishFail();
1868 }
1869
1870 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1871   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1872                << state_ << " host=" << addr_.describe()
1873                << "): failed while connecting in " << fn << "(): "
1874                << ex.what();
1875   startFail();
1876
1877   invokeConnectErr(ex);
1878   finishFail();
1879 }
1880
1881 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1882   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1883                << state_ << " host=" << addr_.describe()
1884                << "): failed while reading in " << fn << "(): "
1885                << ex.what();
1886   startFail();
1887
1888   if (readCallback_ != nullptr) {
1889     ReadCallback* callback = readCallback_;
1890     readCallback_ = nullptr;
1891     callback->readErr(ex);
1892   }
1893
1894   finishFail();
1895 }
1896
1897 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1898   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1899                << state_ << " host=" << addr_.describe()
1900                << "): failed while writing in " << fn << "(): "
1901                << ex.what();
1902   startFail();
1903
1904   // Only invoke the first write callback, since the error occurred while
1905   // writing this request.  Let any other pending write callbacks be invoked in
1906   // finishFail().
1907   if (writeReqHead_ != nullptr) {
1908     WriteRequest* req = writeReqHead_;
1909     writeReqHead_ = req->getNext();
1910     WriteCallback* callback = req->getCallback();
1911     uint32_t bytesWritten = req->getTotalBytesWritten();
1912     req->destroy();
1913     if (callback) {
1914       callback->writeErr(bytesWritten, ex);
1915     }
1916   }
1917
1918   finishFail();
1919 }
1920
1921 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1922                              size_t bytesWritten,
1923                              const AsyncSocketException& ex) {
1924   // This version of failWrite() is used when the failure occurs before
1925   // we've added the callback to writeReqHead_.
1926   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1927              << state_ << " host=" << addr_.describe()
1928              <<"): failed while writing in " << fn << "(): "
1929              << ex.what();
1930   startFail();
1931
1932   if (callback != nullptr) {
1933     callback->writeErr(bytesWritten, ex);
1934   }
1935
1936   finishFail();
1937 }
1938
1939 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1940   // Invoke writeError() on all write callbacks.
1941   // This is used when writes are forcibly shutdown with write requests
1942   // pending, or when an error occurs with writes pending.
1943   while (writeReqHead_ != nullptr) {
1944     WriteRequest* req = writeReqHead_;
1945     writeReqHead_ = req->getNext();
1946     WriteCallback* callback = req->getCallback();
1947     if (callback) {
1948       callback->writeErr(req->getTotalBytesWritten(), ex);
1949     }
1950     req->destroy();
1951   }
1952 }
1953
1954 void AsyncSocket::invalidState(ConnectCallback* callback) {
1955   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1956              << "): connect() called in invalid state " << state_;
1957
1958   /*
1959    * The invalidState() methods don't use the normal failure mechanisms,
1960    * since we don't know what state we are in.  We don't want to call
1961    * startFail()/finishFail() recursively if we are already in the middle of
1962    * cleaning up.
1963    */
1964
1965   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1966                          "connect() called with socket in invalid state");
1967   connectEndTime_ = std::chrono::steady_clock::now();
1968   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1969     if (callback) {
1970       callback->connectErr(ex);
1971     }
1972   } else {
1973     // We can't use failConnect() here since connectCallback_
1974     // may already be set to another callback.  Invoke this ConnectCallback
1975     // here; any other connectCallback_ will be invoked in finishFail()
1976     startFail();
1977     if (callback) {
1978       callback->connectErr(ex);
1979     }
1980     finishFail();
1981   }
1982 }
1983
1984 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1985   connectEndTime_ = std::chrono::steady_clock::now();
1986   if (connectCallback_) {
1987     ConnectCallback* callback = connectCallback_;
1988     connectCallback_ = nullptr;
1989     callback->connectErr(ex);
1990   }
1991 }
1992
1993 void AsyncSocket::invokeConnectSuccess() {
1994   connectEndTime_ = std::chrono::steady_clock::now();
1995   if (connectCallback_) {
1996     ConnectCallback* callback = connectCallback_;
1997     connectCallback_ = nullptr;
1998     callback->connectSuccess();
1999   }
2000 }
2001
2002 void AsyncSocket::invalidState(ReadCallback* callback) {
2003   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2004              << "): setReadCallback(" << callback
2005              << ") called in invalid state " << state_;
2006
2007   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2008                          "setReadCallback() called with socket in "
2009                          "invalid state");
2010   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2011     if (callback) {
2012       callback->readErr(ex);
2013     }
2014   } else {
2015     startFail();
2016     if (callback) {
2017       callback->readErr(ex);
2018     }
2019     finishFail();
2020   }
2021 }
2022
2023 void AsyncSocket::invalidState(WriteCallback* callback) {
2024   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2025              << "): write() called in invalid state " << state_;
2026
2027   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2028                          withAddr("write() called with socket in invalid state"));
2029   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2030     if (callback) {
2031       callback->writeErr(0, ex);
2032     }
2033   } else {
2034     startFail();
2035     if (callback) {
2036       callback->writeErr(0, ex);
2037     }
2038     finishFail();
2039   }
2040 }
2041
2042 void AsyncSocket::doClose() {
2043   if (fd_ == -1) return;
2044   if (shutdownSocketSet_) {
2045     shutdownSocketSet_->close(fd_);
2046   } else {
2047     ::close(fd_);
2048   }
2049   fd_ = -1;
2050 }
2051
2052 std::ostream& operator << (std::ostream& os,
2053                            const AsyncSocket::StateEnum& state) {
2054   os << static_cast<int>(state);
2055   return os;
2056 }
2057
2058 std::string AsyncSocket::withAddr(const std::string& s) {
2059   // Don't use addr_ directly because it may not be initialized
2060   // e.g. if constructed from fd
2061   folly::SocketAddress peer, local;
2062   try {
2063     getPeerAddress(&peer);
2064     getLocalAddress(&local);
2065   } catch (const std::exception&) {
2066     // ignore
2067   } catch (...) {
2068     // ignore
2069   }
2070   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2071 }
2072
2073 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2074   bufferCallback_ = cb;
2075 }
2076
2077 } // folly