Remove persistentCork in native land
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/portability/SysUio.h>
24
25 #include <poll.h>
26 #include <errno.h>
27 #include <limits.h>
28 #include <unistd.h>
29 #include <thread>
30 #include <fcntl.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <boost/preprocessor/control/if.hpp>
36
37 using std::string;
38 using std::unique_ptr;
39
40 namespace folly {
41
42 // static members initializers
43 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
44
45 const AsyncSocketException socketClosedLocallyEx(
46     AsyncSocketException::END_OF_FILE, "socket closed locally");
47 const AsyncSocketException socketShutdownForWritesEx(
48     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
49
50 // TODO: It might help performance to provide a version of BytesWriteRequest that
51 // users could derive from, so we can avoid the extra allocation for each call
52 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
53 // protocols are currently templatized for transports.
54 //
55 // We would need the version for external users where they provide the iovec
56 // storage space, and only our internal version would allocate it at the end of
57 // the WriteRequest.
58
59 /* The default WriteRequest implementation, used for write(), writev() and
60  * writeChain()
61  *
62  * A new BytesWriteRequest operation is allocated on the heap for all write
63  * operations that cannot be completed immediately.
64  */
65 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
66  public:
67   static BytesWriteRequest* newRequest(AsyncSocket* socket,
68                                        WriteCallback* callback,
69                                        const iovec* ops,
70                                        uint32_t opCount,
71                                        uint32_t partialWritten,
72                                        uint32_t bytesWritten,
73                                        unique_ptr<IOBuf>&& ioBuf,
74                                        WriteFlags flags) {
75     assert(opCount > 0);
76     // Since we put a variable size iovec array at the end
77     // of each BytesWriteRequest, we have to manually allocate the memory.
78     void* buf = malloc(sizeof(BytesWriteRequest) +
79                        (opCount * sizeof(struct iovec)));
80     if (buf == nullptr) {
81       throw std::bad_alloc();
82     }
83
84     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
85                                       partialWritten, bytesWritten,
86                                       std::move(ioBuf), flags);
87   }
88
89   void destroy() override {
90     this->~BytesWriteRequest();
91     free(this);
92   }
93
94   bool performWrite() override {
95     WriteFlags writeFlags = flags_;
96     if (getNext() != nullptr) {
97       writeFlags = writeFlags | WriteFlags::CORK;
98     }
99     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
100                                           &opsWritten_, &partialBytes_);
101     return bytesWritten_ >= 0;
102   }
103
104   bool isComplete() override {
105     return opsWritten_ == getOpCount();
106   }
107
108   void consume() override {
109     // Advance opIndex_ forward by opsWritten_
110     opIndex_ += opsWritten_;
111     assert(opIndex_ < opCount_);
112
113     // If we've finished writing any IOBufs, release them
114     if (ioBuf_) {
115       for (uint32_t i = opsWritten_; i != 0; --i) {
116         assert(ioBuf_);
117         ioBuf_ = ioBuf_->pop();
118       }
119     }
120
121     // Move partialBytes_ forward into the current iovec buffer
122     struct iovec* currentOp = writeOps_ + opIndex_;
123     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
124     currentOp->iov_base =
125       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
126     currentOp->iov_len -= partialBytes_;
127
128     // Increment the totalBytesWritten_ count by bytesWritten_;
129     totalBytesWritten_ += bytesWritten_;
130   }
131
132  private:
133   BytesWriteRequest(AsyncSocket* socket,
134                     WriteCallback* callback,
135                     const struct iovec* ops,
136                     uint32_t opCount,
137                     uint32_t partialBytes,
138                     uint32_t bytesWritten,
139                     unique_ptr<IOBuf>&& ioBuf,
140                     WriteFlags flags)
141     : AsyncSocket::WriteRequest(socket, callback)
142     , opCount_(opCount)
143     , opIndex_(0)
144     , flags_(flags)
145     , ioBuf_(std::move(ioBuf))
146     , opsWritten_(0)
147     , partialBytes_(partialBytes)
148     , bytesWritten_(bytesWritten) {
149     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
150   }
151
152   // private destructor, to ensure callers use destroy()
153   ~BytesWriteRequest() override = default;
154
155   const struct iovec* getOps() const {
156     assert(opCount_ > opIndex_);
157     return writeOps_ + opIndex_;
158   }
159
160   uint32_t getOpCount() const {
161     assert(opCount_ > opIndex_);
162     return opCount_ - opIndex_;
163   }
164
165   uint32_t opCount_;            ///< number of entries in writeOps_
166   uint32_t opIndex_;            ///< current index into writeOps_
167   WriteFlags flags_;            ///< set for WriteFlags
168   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
169
170   // for consume(), how much we wrote on the last write
171   uint32_t opsWritten_;         ///< complete ops written
172   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
173   ssize_t bytesWritten_;        ///< bytes written altogether
174
175   struct iovec writeOps_[];     ///< write operation(s) list
176 };
177
178 AsyncSocket::AsyncSocket()
179   : eventBase_(nullptr)
180   , writeTimeout_(this, nullptr)
181   , ioHandler_(this, nullptr)
182   , immediateReadHandler_(this) {
183   VLOG(5) << "new AsyncSocket()";
184   init();
185 }
186
187 AsyncSocket::AsyncSocket(EventBase* evb)
188   : eventBase_(evb)
189   , writeTimeout_(this, evb)
190   , ioHandler_(this, evb)
191   , immediateReadHandler_(this) {
192   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
193   init();
194 }
195
196 AsyncSocket::AsyncSocket(EventBase* evb,
197                            const folly::SocketAddress& address,
198                            uint32_t connectTimeout)
199   : AsyncSocket(evb) {
200   connect(nullptr, address, connectTimeout);
201 }
202
203 AsyncSocket::AsyncSocket(EventBase* evb,
204                            const std::string& ip,
205                            uint16_t port,
206                            uint32_t connectTimeout)
207   : AsyncSocket(evb) {
208   connect(nullptr, ip, port, connectTimeout);
209 }
210
211 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
212   : eventBase_(evb)
213   , writeTimeout_(this, evb)
214   , ioHandler_(this, evb, fd)
215   , immediateReadHandler_(this) {
216   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
217           << fd << ")";
218   init();
219   fd_ = fd;
220   setCloseOnExec();
221   state_ = StateEnum::ESTABLISHED;
222 }
223
224 // init() method, since constructor forwarding isn't supported in most
225 // compilers yet.
226 void AsyncSocket::init() {
227   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
228   shutdownFlags_ = 0;
229   state_ = StateEnum::UNINIT;
230   eventFlags_ = EventHandler::NONE;
231   fd_ = -1;
232   sendTimeout_ = 0;
233   maxReadsPerEvent_ = 16;
234   connectCallback_ = nullptr;
235   readCallback_ = nullptr;
236   writeReqHead_ = nullptr;
237   writeReqTail_ = nullptr;
238   shutdownSocketSet_ = nullptr;
239   appBytesWritten_ = 0;
240   appBytesReceived_ = 0;
241 }
242
243 AsyncSocket::~AsyncSocket() {
244   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
245           << ", evb=" << eventBase_ << ", fd=" << fd_
246           << ", state=" << state_ << ")";
247 }
248
249 void AsyncSocket::destroy() {
250   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
251           << ", fd=" << fd_ << ", state=" << state_;
252   // When destroy is called, close the socket immediately
253   closeNow();
254
255   // Then call DelayedDestruction::destroy() to take care of
256   // whether or not we need immediate or delayed destruction
257   DelayedDestruction::destroy();
258 }
259
260 int AsyncSocket::detachFd() {
261   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
262           << ", evb=" << eventBase_ << ", state=" << state_
263           << ", events=" << std::hex << eventFlags_ << ")";
264   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
265   // actually close the descriptor.
266   if (shutdownSocketSet_) {
267     shutdownSocketSet_->remove(fd_);
268   }
269   int fd = fd_;
270   fd_ = -1;
271   // Call closeNow() to invoke all pending callbacks with an error.
272   closeNow();
273   // Update the EventHandler to stop using this fd.
274   // This can only be done after closeNow() unregisters the handler.
275   ioHandler_.changeHandlerFD(-1);
276   return fd;
277 }
278
279 const folly::SocketAddress& AsyncSocket::anyAddress() {
280   static const folly::SocketAddress anyAddress =
281     folly::SocketAddress("0.0.0.0", 0);
282   return anyAddress;
283 }
284
285 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
286   if (shutdownSocketSet_ == newSS) {
287     return;
288   }
289   if (shutdownSocketSet_ && fd_ != -1) {
290     shutdownSocketSet_->remove(fd_);
291   }
292   shutdownSocketSet_ = newSS;
293   if (shutdownSocketSet_ && fd_ != -1) {
294     shutdownSocketSet_->add(fd_);
295   }
296 }
297
298 void AsyncSocket::setCloseOnExec() {
299   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
300   if (rv != 0) {
301     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
302                                withAddr("failed to set close-on-exec flag"),
303                                errno);
304   }
305 }
306
307 void AsyncSocket::connect(ConnectCallback* callback,
308                            const folly::SocketAddress& address,
309                            int timeout,
310                            const OptionMap &options,
311                            const folly::SocketAddress& bindAddr) noexcept {
312   DestructorGuard dg(this);
313   assert(eventBase_->isInEventBaseThread());
314
315   addr_ = address;
316
317   // Make sure we're in the uninitialized state
318   if (state_ != StateEnum::UNINIT) {
319     return invalidState(callback);
320   }
321
322   connectStartTime_ = std::chrono::steady_clock::now();
323   // Make connect end time at least >= connectStartTime.
324   connectEndTime_ = connectStartTime_;
325
326   assert(fd_ == -1);
327   state_ = StateEnum::CONNECTING;
328   connectCallback_ = callback;
329
330   sockaddr_storage addrStorage;
331   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
332
333   try {
334     // Create the socket
335     // Technically the first parameter should actually be a protocol family
336     // constant (PF_xxx) rather than an address family (AF_xxx), but the
337     // distinction is mainly just historical.  In pretty much all
338     // implementations the PF_foo and AF_foo constants are identical.
339     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
340     if (fd_ < 0) {
341       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
342                                 withAddr("failed to create socket"), errno);
343     }
344     if (shutdownSocketSet_) {
345       shutdownSocketSet_->add(fd_);
346     }
347     ioHandler_.changeHandlerFD(fd_);
348
349     setCloseOnExec();
350
351     // Put the socket in non-blocking mode
352     int flags = fcntl(fd_, F_GETFL, 0);
353     if (flags == -1) {
354       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
355                                 withAddr("failed to get socket flags"), errno);
356     }
357     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
358     if (rv == -1) {
359       throw AsyncSocketException(
360           AsyncSocketException::INTERNAL_ERROR,
361           withAddr("failed to put socket in non-blocking mode"),
362           errno);
363     }
364
365 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
366     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
367     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
368     if (rv == -1) {
369       throw AsyncSocketException(
370           AsyncSocketException::INTERNAL_ERROR,
371           "failed to enable F_SETNOSIGPIPE on socket",
372           errno);
373     }
374 #endif
375
376     // By default, turn on TCP_NODELAY
377     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
378     // setNoDelay() will log an error message if it fails.
379     if (address.getFamily() != AF_UNIX) {
380       (void)setNoDelay(true);
381     }
382
383     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
384             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
385
386     // bind the socket
387     if (bindAddr != anyAddress()) {
388       int one = 1;
389       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
390         doClose();
391         throw AsyncSocketException(
392           AsyncSocketException::NOT_OPEN,
393           "failed to setsockopt prior to bind on " + bindAddr.describe(),
394           errno);
395       }
396
397       bindAddr.getAddress(&addrStorage);
398
399       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
400         doClose();
401         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
402                                   "failed to bind to async socket: " +
403                                   bindAddr.describe(),
404                                   errno);
405       }
406     }
407
408     // Apply the additional options if any.
409     for (const auto& opt: options) {
410       int rv = opt.first.apply(fd_, opt.second);
411       if (rv != 0) {
412         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
413                                   withAddr("failed to set socket option"),
414                                   errno);
415       }
416     }
417
418     // Perform the connect()
419     address.getAddress(&addrStorage);
420
421     rv = ::connect(fd_, saddr, address.getActualSize());
422     if (rv < 0) {
423       if (errno == EINPROGRESS) {
424         // Connection in progress.
425         if (timeout > 0) {
426           // Start a timer in case the connection takes too long.
427           if (!writeTimeout_.scheduleTimeout(timeout)) {
428             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
429                 withAddr("failed to schedule AsyncSocket connect timeout"));
430           }
431         }
432
433         // Register for write events, so we'll
434         // be notified when the connection finishes/fails.
435         // Note that we don't register for a persistent event here.
436         assert(eventFlags_ == EventHandler::NONE);
437         eventFlags_ = EventHandler::WRITE;
438         if (!ioHandler_.registerHandler(eventFlags_)) {
439           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
440               withAddr("failed to register AsyncSocket connect handler"));
441         }
442         return;
443       } else {
444         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
445                                   "connect failed (immediately)", errno);
446       }
447     }
448
449     // If we're still here the connect() succeeded immediately.
450     // Fall through to call the callback outside of this try...catch block
451   } catch (const AsyncSocketException& ex) {
452     return failConnect(__func__, ex);
453   } catch (const std::exception& ex) {
454     // shouldn't happen, but handle it just in case
455     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
456                << "): unexpected " << typeid(ex).name() << " exception: "
457                << ex.what();
458     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
459                             withAddr(string("unexpected exception: ") +
460                                      ex.what()));
461     return failConnect(__func__, tex);
462   }
463
464   // The connection succeeded immediately
465   // The read callback may not have been set yet, and no writes may be pending
466   // yet, so we don't have to register for any events at the moment.
467   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
468   assert(readCallback_ == nullptr);
469   assert(writeReqHead_ == nullptr);
470   state_ = StateEnum::ESTABLISHED;
471   invokeConnectSuccess();
472 }
473
474 void AsyncSocket::connect(ConnectCallback* callback,
475                            const string& ip, uint16_t port,
476                            int timeout,
477                            const OptionMap &options) noexcept {
478   DestructorGuard dg(this);
479   try {
480     connectCallback_ = callback;
481     connect(callback, folly::SocketAddress(ip, port), timeout, options);
482   } catch (const std::exception& ex) {
483     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
484                             ex.what());
485     return failConnect(__func__, tex);
486   }
487 }
488
489 void AsyncSocket::cancelConnect() {
490   connectCallback_ = nullptr;
491   if (state_ == StateEnum::CONNECTING) {
492     closeNow();
493   }
494 }
495
496 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
497   sendTimeout_ = milliseconds;
498   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
499
500   // If we are currently pending on write requests, immediately update
501   // writeTimeout_ with the new value.
502   if ((eventFlags_ & EventHandler::WRITE) &&
503       (state_ != StateEnum::CONNECTING)) {
504     assert(state_ == StateEnum::ESTABLISHED);
505     assert((shutdownFlags_ & SHUT_WRITE) == 0);
506     if (sendTimeout_ > 0) {
507       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
508         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
509             withAddr("failed to reschedule send timeout in setSendTimeout"));
510         return failWrite(__func__, ex);
511       }
512     } else {
513       writeTimeout_.cancelTimeout();
514     }
515   }
516 }
517
518 void AsyncSocket::setReadCB(ReadCallback *callback) {
519   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
520           << ", callback=" << callback << ", state=" << state_;
521
522   // Short circuit if callback is the same as the existing readCallback_.
523   //
524   // Note that this is needed for proper functioning during some cleanup cases.
525   // During cleanup we allow setReadCallback(nullptr) to be called even if the
526   // read callback is already unset and we have been detached from an event
527   // base.  This check prevents us from asserting
528   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
529   if (callback == readCallback_) {
530     return;
531   }
532
533   /* We are removing a read callback */
534   if (callback == nullptr &&
535       immediateReadHandler_.isLoopCallbackScheduled()) {
536     immediateReadHandler_.cancelLoopCallback();
537   }
538
539   if (shutdownFlags_ & SHUT_READ) {
540     // Reads have already been shut down on this socket.
541     //
542     // Allow setReadCallback(nullptr) to be called in this case, but don't
543     // allow a new callback to be set.
544     //
545     // For example, setReadCallback(nullptr) can happen after an error if we
546     // invoke some other error callback before invoking readError().  The other
547     // error callback that is invoked first may go ahead and clear the read
548     // callback before we get a chance to invoke readError().
549     if (callback != nullptr) {
550       return invalidState(callback);
551     }
552     assert((eventFlags_ & EventHandler::READ) == 0);
553     readCallback_ = nullptr;
554     return;
555   }
556
557   DestructorGuard dg(this);
558   assert(eventBase_->isInEventBaseThread());
559
560   switch ((StateEnum)state_) {
561     case StateEnum::CONNECTING:
562       // For convenience, we allow the read callback to be set while we are
563       // still connecting.  We just store the callback for now.  Once the
564       // connection completes we'll register for read events.
565       readCallback_ = callback;
566       return;
567     case StateEnum::ESTABLISHED:
568     {
569       readCallback_ = callback;
570       uint16_t oldFlags = eventFlags_;
571       if (readCallback_) {
572         eventFlags_ |= EventHandler::READ;
573       } else {
574         eventFlags_ &= ~EventHandler::READ;
575       }
576
577       // Update our registration if our flags have changed
578       if (eventFlags_ != oldFlags) {
579         // We intentionally ignore the return value here.
580         // updateEventRegistration() will move us into the error state if it
581         // fails, and we don't need to do anything else here afterwards.
582         (void)updateEventRegistration();
583       }
584
585       if (readCallback_) {
586         checkForImmediateRead();
587       }
588       return;
589     }
590     case StateEnum::CLOSED:
591     case StateEnum::ERROR:
592       // We should never reach here.  SHUT_READ should always be set
593       // if we are in STATE_CLOSED or STATE_ERROR.
594       assert(false);
595       return invalidState(callback);
596     case StateEnum::UNINIT:
597       // We do not allow setReadCallback() to be called before we start
598       // connecting.
599       return invalidState(callback);
600   }
601
602   // We don't put a default case in the switch statement, so that the compiler
603   // will warn us to update the switch statement if a new state is added.
604   return invalidState(callback);
605 }
606
607 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
608   return readCallback_;
609 }
610
611 void AsyncSocket::write(WriteCallback* callback,
612                          const void* buf, size_t bytes, WriteFlags flags) {
613   iovec op;
614   op.iov_base = const_cast<void*>(buf);
615   op.iov_len = bytes;
616   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
617 }
618
619 void AsyncSocket::writev(WriteCallback* callback,
620                           const iovec* vec,
621                           size_t count,
622                           WriteFlags flags) {
623   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
624 }
625
626 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
627                               WriteFlags flags) {
628   constexpr size_t kSmallSizeMax = 64;
629   size_t count = buf->countChainElements();
630   if (count <= kSmallSizeMax) {
631     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
632     writeChainImpl(callback, vec, count, std::move(buf), flags);
633   } else {
634     iovec* vec = new iovec[count];
635     writeChainImpl(callback, vec, count, std::move(buf), flags);
636     delete[] vec;
637   }
638 }
639
640 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
641     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
642   size_t veclen = buf->fillIov(vec, count);
643   writeImpl(callback, vec, veclen, std::move(buf), flags);
644 }
645
646 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
647                              size_t count, unique_ptr<IOBuf>&& buf,
648                              WriteFlags flags) {
649   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
650           << ", callback=" << callback << ", count=" << count
651           << ", state=" << state_;
652   DestructorGuard dg(this);
653   unique_ptr<IOBuf>ioBuf(std::move(buf));
654   assert(eventBase_->isInEventBaseThread());
655
656   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
657     // No new writes may be performed after the write side of the socket has
658     // been shutdown.
659     //
660     // We could just call callback->writeError() here to fail just this write.
661     // However, fail hard and use invalidState() to fail all outstanding
662     // callbacks and move the socket into the error state.  There's most likely
663     // a bug in the caller's code, so we abort everything rather than trying to
664     // proceed as best we can.
665     return invalidState(callback);
666   }
667
668   uint32_t countWritten = 0;
669   uint32_t partialWritten = 0;
670   int bytesWritten = 0;
671   bool mustRegister = false;
672   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
673     if (writeReqHead_ == nullptr) {
674       // If we are established and there are no other writes pending,
675       // we can attempt to perform the write immediately.
676       assert(writeReqTail_ == nullptr);
677       assert((eventFlags_ & EventHandler::WRITE) == 0);
678
679       bytesWritten = performWrite(vec, count, flags,
680                                   &countWritten, &partialWritten);
681       if (bytesWritten < 0) {
682         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
683                                withAddr("writev failed"), errno);
684         return failWrite(__func__, callback, 0, ex);
685       } else if (countWritten == count) {
686         // We successfully wrote everything.
687         // Invoke the callback and return.
688         if (callback) {
689           callback->writeSuccess();
690         }
691         return;
692       } else { // continue writing the next writeReq
693         if (bufferCallback_) {
694           bufferCallback_->onEgressBuffered();
695         }
696       }
697       mustRegister = true;
698     }
699   } else if (!connecting()) {
700     // Invalid state for writing
701     return invalidState(callback);
702   }
703
704   // Create a new WriteRequest to add to the queue
705   WriteRequest* req;
706   try {
707     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
708                                         count - countWritten, partialWritten,
709                                         bytesWritten, std::move(ioBuf), flags);
710   } catch (const std::exception& ex) {
711     // we mainly expect to catch std::bad_alloc here
712     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
713         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
714     return failWrite(__func__, callback, bytesWritten, tex);
715   }
716   req->consume();
717   if (writeReqTail_ == nullptr) {
718     assert(writeReqHead_ == nullptr);
719     writeReqHead_ = writeReqTail_ = req;
720   } else {
721     writeReqTail_->append(req);
722     writeReqTail_ = req;
723   }
724
725   // Register for write events if are established and not currently
726   // waiting on write events
727   if (mustRegister) {
728     assert(state_ == StateEnum::ESTABLISHED);
729     assert((eventFlags_ & EventHandler::WRITE) == 0);
730     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
731       assert(state_ == StateEnum::ERROR);
732       return;
733     }
734     if (sendTimeout_ > 0) {
735       // Schedule a timeout to fire if the write takes too long.
736       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
737         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
738                                withAddr("failed to schedule send timeout"));
739         return failWrite(__func__, ex);
740       }
741     }
742   }
743 }
744
745 void AsyncSocket::writeRequest(WriteRequest* req) {
746   if (writeReqTail_ == nullptr) {
747     assert(writeReqHead_ == nullptr);
748     writeReqHead_ = writeReqTail_ = req;
749     req->start();
750   } else {
751     writeReqTail_->append(req);
752     writeReqTail_ = req;
753   }
754 }
755
756 void AsyncSocket::close() {
757   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
758           << ", state=" << state_ << ", shutdownFlags="
759           << std::hex << (int) shutdownFlags_;
760
761   // close() is only different from closeNow() when there are pending writes
762   // that need to drain before we can close.  In all other cases, just call
763   // closeNow().
764   //
765   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
766   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
767   // is still running.  (e.g., If there are multiple pending writes, and we
768   // call writeError() on the first one, it may call close().  In this case we
769   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
770   // writes will still be in the queue.)
771   //
772   // We only need to drain pending writes if we are still in STATE_CONNECTING
773   // or STATE_ESTABLISHED
774   if ((writeReqHead_ == nullptr) ||
775       !(state_ == StateEnum::CONNECTING ||
776       state_ == StateEnum::ESTABLISHED)) {
777     closeNow();
778     return;
779   }
780
781   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
782   // destroyed until close() returns.
783   DestructorGuard dg(this);
784   assert(eventBase_->isInEventBaseThread());
785
786   // Since there are write requests pending, we have to set the
787   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
788   // connect finishes and we finish writing these requests.
789   //
790   // Set SHUT_READ to indicate that reads are shut down, and set the
791   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
792   // pending writes complete.
793   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
794
795   // If a read callback is set, invoke readEOF() immediately to inform it that
796   // the socket has been closed and no more data can be read.
797   if (readCallback_) {
798     // Disable reads if they are enabled
799     if (!updateEventRegistration(0, EventHandler::READ)) {
800       // We're now in the error state; callbacks have been cleaned up
801       assert(state_ == StateEnum::ERROR);
802       assert(readCallback_ == nullptr);
803     } else {
804       ReadCallback* callback = readCallback_;
805       readCallback_ = nullptr;
806       callback->readEOF();
807     }
808   }
809 }
810
811 void AsyncSocket::closeNow() {
812   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
813           << ", state=" << state_ << ", shutdownFlags="
814           << std::hex << (int) shutdownFlags_;
815   DestructorGuard dg(this);
816   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
817
818   switch (state_) {
819     case StateEnum::ESTABLISHED:
820     case StateEnum::CONNECTING:
821     {
822       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
823       state_ = StateEnum::CLOSED;
824
825       // If the write timeout was set, cancel it.
826       writeTimeout_.cancelTimeout();
827
828       // If we are registered for I/O events, unregister.
829       if (eventFlags_ != EventHandler::NONE) {
830         eventFlags_ = EventHandler::NONE;
831         if (!updateEventRegistration()) {
832           // We will have been moved into the error state.
833           assert(state_ == StateEnum::ERROR);
834           return;
835         }
836       }
837
838       if (immediateReadHandler_.isLoopCallbackScheduled()) {
839         immediateReadHandler_.cancelLoopCallback();
840       }
841
842       if (fd_ >= 0) {
843         ioHandler_.changeHandlerFD(-1);
844         doClose();
845       }
846
847       invokeConnectErr(socketClosedLocallyEx);
848
849       failAllWrites(socketClosedLocallyEx);
850
851       if (readCallback_) {
852         ReadCallback* callback = readCallback_;
853         readCallback_ = nullptr;
854         callback->readEOF();
855       }
856       return;
857     }
858     case StateEnum::CLOSED:
859       // Do nothing.  It's possible that we are being called recursively
860       // from inside a callback that we invoked inside another call to close()
861       // that is still running.
862       return;
863     case StateEnum::ERROR:
864       // Do nothing.  The error handling code has performed (or is performing)
865       // cleanup.
866       return;
867     case StateEnum::UNINIT:
868       assert(eventFlags_ == EventHandler::NONE);
869       assert(connectCallback_ == nullptr);
870       assert(readCallback_ == nullptr);
871       assert(writeReqHead_ == nullptr);
872       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
873       state_ = StateEnum::CLOSED;
874       return;
875   }
876
877   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
878               << ") called in unknown state " << state_;
879 }
880
881 void AsyncSocket::closeWithReset() {
882   // Enable SO_LINGER, with the linger timeout set to 0.
883   // This will trigger a TCP reset when we close the socket.
884   if (fd_ >= 0) {
885     struct linger optLinger = {1, 0};
886     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
887       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
888               << "on " << fd_ << ": errno=" << errno;
889     }
890   }
891
892   // Then let closeNow() take care of the rest
893   closeNow();
894 }
895
896 void AsyncSocket::shutdownWrite() {
897   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
898           << ", state=" << state_ << ", shutdownFlags="
899           << std::hex << (int) shutdownFlags_;
900
901   // If there are no pending writes, shutdownWrite() is identical to
902   // shutdownWriteNow().
903   if (writeReqHead_ == nullptr) {
904     shutdownWriteNow();
905     return;
906   }
907
908   assert(eventBase_->isInEventBaseThread());
909
910   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
911   // shutdown will be performed once all writes complete.
912   shutdownFlags_ |= SHUT_WRITE_PENDING;
913 }
914
915 void AsyncSocket::shutdownWriteNow() {
916   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
917           << ", fd=" << fd_ << ", state=" << state_
918           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
919
920   if (shutdownFlags_ & SHUT_WRITE) {
921     // Writes are already shutdown; nothing else to do.
922     return;
923   }
924
925   // If SHUT_READ is already set, just call closeNow() to completely
926   // close the socket.  This can happen if close() was called with writes
927   // pending, and then shutdownWriteNow() is called before all pending writes
928   // complete.
929   if (shutdownFlags_ & SHUT_READ) {
930     closeNow();
931     return;
932   }
933
934   DestructorGuard dg(this);
935   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
936
937   switch (static_cast<StateEnum>(state_)) {
938     case StateEnum::ESTABLISHED:
939     {
940       shutdownFlags_ |= SHUT_WRITE;
941
942       // If the write timeout was set, cancel it.
943       writeTimeout_.cancelTimeout();
944
945       // If we are registered for write events, unregister.
946       if (!updateEventRegistration(0, EventHandler::WRITE)) {
947         // We will have been moved into the error state.
948         assert(state_ == StateEnum::ERROR);
949         return;
950       }
951
952       // Shutdown writes on the file descriptor
953       ::shutdown(fd_, SHUT_WR);
954
955       // Immediately fail all write requests
956       failAllWrites(socketShutdownForWritesEx);
957       return;
958     }
959     case StateEnum::CONNECTING:
960     {
961       // Set the SHUT_WRITE_PENDING flag.
962       // When the connection completes, it will check this flag,
963       // shutdown the write half of the socket, and then set SHUT_WRITE.
964       shutdownFlags_ |= SHUT_WRITE_PENDING;
965
966       // Immediately fail all write requests
967       failAllWrites(socketShutdownForWritesEx);
968       return;
969     }
970     case StateEnum::UNINIT:
971       // Callers normally shouldn't call shutdownWriteNow() before the socket
972       // even starts connecting.  Nonetheless, go ahead and set
973       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
974       // immediately shut down the write side of the socket.
975       shutdownFlags_ |= SHUT_WRITE_PENDING;
976       return;
977     case StateEnum::CLOSED:
978     case StateEnum::ERROR:
979       // We should never get here.  SHUT_WRITE should always be set
980       // in STATE_CLOSED and STATE_ERROR.
981       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
982                  << ", fd=" << fd_ << ") in unexpected state " << state_
983                  << " with SHUT_WRITE not set ("
984                  << std::hex << (int) shutdownFlags_ << ")";
985       assert(false);
986       return;
987   }
988
989   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
990               << fd_ << ") called in unknown state " << state_;
991 }
992
993 bool AsyncSocket::readable() const {
994   if (fd_ == -1) {
995     return false;
996   }
997   struct pollfd fds[1];
998   fds[0].fd = fd_;
999   fds[0].events = POLLIN;
1000   fds[0].revents = 0;
1001   int rc = poll(fds, 1, 0);
1002   return rc == 1;
1003 }
1004
1005 bool AsyncSocket::isPending() const {
1006   return ioHandler_.isPending();
1007 }
1008
1009 bool AsyncSocket::hangup() const {
1010   if (fd_ == -1) {
1011     // sanity check, no one should ask for hangup if we are not connected.
1012     assert(false);
1013     return false;
1014   }
1015 #ifdef POLLRDHUP // Linux-only
1016   struct pollfd fds[1];
1017   fds[0].fd = fd_;
1018   fds[0].events = POLLRDHUP|POLLHUP;
1019   fds[0].revents = 0;
1020   poll(fds, 1, 0);
1021   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1022 #else
1023   return false;
1024 #endif
1025 }
1026
1027 bool AsyncSocket::good() const {
1028   return ((state_ == StateEnum::CONNECTING ||
1029           state_ == StateEnum::ESTABLISHED) &&
1030           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1031 }
1032
1033 bool AsyncSocket::error() const {
1034   return (state_ == StateEnum::ERROR);
1035 }
1036
1037 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1038   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1039           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1040           << ", state=" << state_ << ", events="
1041           << std::hex << eventFlags_ << ")";
1042   assert(eventBase_ == nullptr);
1043   assert(eventBase->isInEventBaseThread());
1044
1045   eventBase_ = eventBase;
1046   ioHandler_.attachEventBase(eventBase);
1047   writeTimeout_.attachEventBase(eventBase);
1048 }
1049
1050 void AsyncSocket::detachEventBase() {
1051   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1052           << ", old evb=" << eventBase_ << ", state=" << state_
1053           << ", events=" << std::hex << eventFlags_ << ")";
1054   assert(eventBase_ != nullptr);
1055   assert(eventBase_->isInEventBaseThread());
1056
1057   eventBase_ = nullptr;
1058   ioHandler_.detachEventBase();
1059   writeTimeout_.detachEventBase();
1060 }
1061
1062 bool AsyncSocket::isDetachable() const {
1063   DCHECK(eventBase_ != nullptr);
1064   DCHECK(eventBase_->isInEventBaseThread());
1065
1066   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1067 }
1068
1069 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1070   if (!localAddr_.isInitialized()) {
1071     localAddr_.setFromLocalAddress(fd_);
1072   }
1073   *address = localAddr_;
1074 }
1075
1076 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1077   if (!addr_.isInitialized()) {
1078     addr_.setFromPeerAddress(fd_);
1079   }
1080   *address = addr_;
1081 }
1082
1083 int AsyncSocket::setNoDelay(bool noDelay) {
1084   if (fd_ < 0) {
1085     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1086                << this << "(state=" << state_ << ")";
1087     return EINVAL;
1088
1089   }
1090
1091   int value = noDelay ? 1 : 0;
1092   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1093     int errnoCopy = errno;
1094     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1095             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1096             << strerror(errnoCopy);
1097     return errnoCopy;
1098   }
1099
1100   return 0;
1101 }
1102
1103 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1104
1105   #ifndef TCP_CONGESTION
1106   #define TCP_CONGESTION  13
1107   #endif
1108
1109   if (fd_ < 0) {
1110     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1111                << "socket " << this << "(state=" << state_ << ")";
1112     return EINVAL;
1113
1114   }
1115
1116   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1117         cname.length() + 1) != 0) {
1118     int errnoCopy = errno;
1119     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1120             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1121             << strerror(errnoCopy);
1122     return errnoCopy;
1123   }
1124
1125   return 0;
1126 }
1127
1128 int AsyncSocket::setQuickAck(bool quickack) {
1129   if (fd_ < 0) {
1130     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1131                << this << "(state=" << state_ << ")";
1132     return EINVAL;
1133
1134   }
1135
1136 #ifdef TCP_QUICKACK // Linux-only
1137   int value = quickack ? 1 : 0;
1138   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1139     int errnoCopy = errno;
1140     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1141             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1142             << strerror(errnoCopy);
1143     return errnoCopy;
1144   }
1145
1146   return 0;
1147 #else
1148   return ENOSYS;
1149 #endif
1150 }
1151
1152 int AsyncSocket::setSendBufSize(size_t bufsize) {
1153   if (fd_ < 0) {
1154     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1155                << this << "(state=" << state_ << ")";
1156     return EINVAL;
1157   }
1158
1159   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1160     int errnoCopy = errno;
1161     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1162             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1163             << strerror(errnoCopy);
1164     return errnoCopy;
1165   }
1166
1167   return 0;
1168 }
1169
1170 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1171   if (fd_ < 0) {
1172     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1173                << this << "(state=" << state_ << ")";
1174     return EINVAL;
1175   }
1176
1177   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1178     int errnoCopy = errno;
1179     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1180             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1181             << strerror(errnoCopy);
1182     return errnoCopy;
1183   }
1184
1185   return 0;
1186 }
1187
1188 int AsyncSocket::setTCPProfile(int profd) {
1189   if (fd_ < 0) {
1190     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1191                << this << "(state=" << state_ << ")";
1192     return EINVAL;
1193   }
1194
1195   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1196     int errnoCopy = errno;
1197     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1198             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1199             << strerror(errnoCopy);
1200     return errnoCopy;
1201   }
1202
1203   return 0;
1204 }
1205
1206 void AsyncSocket::ioReady(uint16_t events) noexcept {
1207   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1208           << ", events=" << std::hex << events << ", state=" << state_;
1209   DestructorGuard dg(this);
1210   assert(events & EventHandler::READ_WRITE);
1211   assert(eventBase_->isInEventBaseThread());
1212
1213   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1214   if (relevantEvents == EventHandler::READ) {
1215     handleRead();
1216   } else if (relevantEvents == EventHandler::WRITE) {
1217     handleWrite();
1218   } else if (relevantEvents == EventHandler::READ_WRITE) {
1219     EventBase* originalEventBase = eventBase_;
1220     // If both read and write events are ready, process writes first.
1221     handleWrite();
1222
1223     // Return now if handleWrite() detached us from our EventBase
1224     if (eventBase_ != originalEventBase) {
1225       return;
1226     }
1227
1228     // Only call handleRead() if a read callback is still installed.
1229     // (It's possible that the read callback was uninstalled during
1230     // handleWrite().)
1231     if (readCallback_) {
1232       handleRead();
1233     }
1234   } else {
1235     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1236                << std::hex << events << "(this=" << this << ")";
1237     abort();
1238   }
1239 }
1240
1241 ssize_t AsyncSocket::performRead(void** buf,
1242                                  size_t* buflen,
1243                                  size_t* /* offset */) {
1244   VLOG(5) << "AsyncSocket::performRead() this=" << this
1245           << ", buf=" << *buf << ", buflen=" << *buflen;
1246
1247   int recvFlags = 0;
1248   if (peek_) {
1249     recvFlags |= MSG_PEEK;
1250   }
1251
1252   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1253   if (bytes < 0) {
1254     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1255       // No more data to read right now.
1256       return READ_BLOCKING;
1257     } else {
1258       return READ_ERROR;
1259     }
1260   } else {
1261     appBytesReceived_ += bytes;
1262     return bytes;
1263   }
1264 }
1265
1266 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1267   // no matter what, buffer should be preapared for non-ssl socket
1268   CHECK(readCallback_);
1269   readCallback_->getReadBuffer(buf, buflen);
1270 }
1271
1272 void AsyncSocket::handleRead() noexcept {
1273   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1274           << ", state=" << state_;
1275   assert(state_ == StateEnum::ESTABLISHED);
1276   assert((shutdownFlags_ & SHUT_READ) == 0);
1277   assert(readCallback_ != nullptr);
1278   assert(eventFlags_ & EventHandler::READ);
1279
1280   // Loop until:
1281   // - a read attempt would block
1282   // - readCallback_ is uninstalled
1283   // - the number of loop iterations exceeds the optional maximum
1284   // - this AsyncSocket is moved to another EventBase
1285   //
1286   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1287   // which is why need to check for it here.
1288   //
1289   // The last bullet point is slightly subtle.  readDataAvailable() may also
1290   // detach this socket from this EventBase.  However, before
1291   // readDataAvailable() returns another thread may pick it up, attach it to
1292   // a different EventBase, and install another readCallback_.  We need to
1293   // exit immediately after readDataAvailable() returns if the eventBase_ has
1294   // changed.  (The caller must perform some sort of locking to transfer the
1295   // AsyncSocket between threads properly.  This will be sufficient to ensure
1296   // that this thread sees the updated eventBase_ variable after
1297   // readDataAvailable() returns.)
1298   uint16_t numReads = 0;
1299   EventBase* originalEventBase = eventBase_;
1300   while (readCallback_ && eventBase_ == originalEventBase) {
1301     // Get the buffer to read into.
1302     void* buf = nullptr;
1303     size_t buflen = 0, offset = 0;
1304     try {
1305       prepareReadBuffer(&buf, &buflen);
1306       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1307     } catch (const AsyncSocketException& ex) {
1308       return failRead(__func__, ex);
1309     } catch (const std::exception& ex) {
1310       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1311                               string("ReadCallback::getReadBuffer() "
1312                                      "threw exception: ") +
1313                               ex.what());
1314       return failRead(__func__, tex);
1315     } catch (...) {
1316       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1317                              "ReadCallback::getReadBuffer() threw "
1318                              "non-exception type");
1319       return failRead(__func__, ex);
1320     }
1321     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1322       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1323                              "ReadCallback::getReadBuffer() returned "
1324                              "empty buffer");
1325       return failRead(__func__, ex);
1326     }
1327
1328     // Perform the read
1329     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1330     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1331             << bytesRead << " bytes";
1332     if (bytesRead > 0) {
1333       if (!isBufferMovable_) {
1334         readCallback_->readDataAvailable(bytesRead);
1335       } else {
1336         CHECK(kOpenSslModeMoveBufferOwnership);
1337         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1338                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1339                 << ", offset=" << offset;
1340         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1341         readBuf->trimStart(offset);
1342         readBuf->trimEnd(buflen - offset - bytesRead);
1343         readCallback_->readBufferAvailable(std::move(readBuf));
1344       }
1345
1346       // Fall through and continue around the loop if the read
1347       // completely filled the available buffer.
1348       // Note that readCallback_ may have been uninstalled or changed inside
1349       // readDataAvailable().
1350       if (size_t(bytesRead) < buflen) {
1351         return;
1352       }
1353     } else if (bytesRead == READ_BLOCKING) {
1354         // No more data to read right now.
1355         return;
1356     } else if (bytesRead == READ_ERROR) {
1357       readErr_ = READ_ERROR;
1358       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1359                              withAddr("recv() failed"), errno);
1360       return failRead(__func__, ex);
1361     } else {
1362       assert(bytesRead == READ_EOF);
1363       readErr_ = READ_EOF;
1364       // EOF
1365       shutdownFlags_ |= SHUT_READ;
1366       if (!updateEventRegistration(0, EventHandler::READ)) {
1367         // we've already been moved into STATE_ERROR
1368         assert(state_ == StateEnum::ERROR);
1369         assert(readCallback_ == nullptr);
1370         return;
1371       }
1372
1373       ReadCallback* callback = readCallback_;
1374       readCallback_ = nullptr;
1375       callback->readEOF();
1376       return;
1377     }
1378     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1379       if (readCallback_ != nullptr) {
1380         // We might still have data in the socket.
1381         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1382         scheduleImmediateRead();
1383       }
1384       return;
1385     }
1386   }
1387 }
1388
1389 /**
1390  * This function attempts to write as much data as possible, until no more data
1391  * can be written.
1392  *
1393  * - If it sends all available data, it unregisters for write events, and stops
1394  *   the writeTimeout_.
1395  *
1396  * - If not all of the data can be sent immediately, it reschedules
1397  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1398  *   registered for write events.
1399  */
1400 void AsyncSocket::handleWrite() noexcept {
1401   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1402           << ", state=" << state_;
1403   if (state_ == StateEnum::CONNECTING) {
1404     handleConnect();
1405     return;
1406   }
1407
1408   // Normal write
1409   assert(state_ == StateEnum::ESTABLISHED);
1410   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1411   assert(writeReqHead_ != nullptr);
1412
1413   // Loop until we run out of write requests,
1414   // or until this socket is moved to another EventBase.
1415   // (See the comment in handleRead() explaining how this can happen.)
1416   EventBase* originalEventBase = eventBase_;
1417   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1418     if (!writeReqHead_->performWrite()) {
1419       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1420                              withAddr("writev() failed"), errno);
1421       return failWrite(__func__, ex);
1422     } else if (writeReqHead_->isComplete()) {
1423       // We finished this request
1424       WriteRequest* req = writeReqHead_;
1425       writeReqHead_ = req->getNext();
1426
1427       if (writeReqHead_ == nullptr) {
1428         writeReqTail_ = nullptr;
1429         // This is the last write request.
1430         // Unregister for write events and cancel the send timer
1431         // before we invoke the callback.  We have to update the state properly
1432         // before calling the callback, since it may want to detach us from
1433         // the EventBase.
1434         if (eventFlags_ & EventHandler::WRITE) {
1435           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1436             assert(state_ == StateEnum::ERROR);
1437             return;
1438           }
1439           // Stop the send timeout
1440           writeTimeout_.cancelTimeout();
1441         }
1442         assert(!writeTimeout_.isScheduled());
1443
1444         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1445         // we finish sending the last write request.
1446         //
1447         // We have to do this before invoking writeSuccess(), since
1448         // writeSuccess() may detach us from our EventBase.
1449         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1450           assert(connectCallback_ == nullptr);
1451           shutdownFlags_ |= SHUT_WRITE;
1452
1453           if (shutdownFlags_ & SHUT_READ) {
1454             // Reads have already been shutdown.  Fully close the socket and
1455             // move to STATE_CLOSED.
1456             //
1457             // Note: This code currently moves us to STATE_CLOSED even if
1458             // close() hasn't ever been called.  This can occur if we have
1459             // received EOF from the peer and shutdownWrite() has been called
1460             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1461             // case, until close() is actually called?  I can't think of a
1462             // reason why we would need to do so.  No other operations besides
1463             // calling close() or destroying the socket can be performed at
1464             // this point.
1465             assert(readCallback_ == nullptr);
1466             state_ = StateEnum::CLOSED;
1467             if (fd_ >= 0) {
1468               ioHandler_.changeHandlerFD(-1);
1469               doClose();
1470             }
1471           } else {
1472             // Reads are still enabled, so we are only doing a half-shutdown
1473             ::shutdown(fd_, SHUT_WR);
1474           }
1475         }
1476       }
1477
1478       // Invoke the callback
1479       WriteCallback* callback = req->getCallback();
1480       req->destroy();
1481       if (callback) {
1482         callback->writeSuccess();
1483       }
1484       // We'll continue around the loop, trying to write another request
1485     } else {
1486       // Partial write.
1487       if (bufferCallback_) {
1488         bufferCallback_->onEgressBuffered();
1489       }
1490       writeReqHead_->consume();
1491       // Stop after a partial write; it's highly likely that a subsequent write
1492       // attempt will just return EAGAIN.
1493       //
1494       // Ensure that we are registered for write events.
1495       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1496         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1497           assert(state_ == StateEnum::ERROR);
1498           return;
1499         }
1500       }
1501
1502       // Reschedule the send timeout, since we have made some write progress.
1503       if (sendTimeout_ > 0) {
1504         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1505           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1506               withAddr("failed to reschedule write timeout"));
1507           return failWrite(__func__, ex);
1508         }
1509       }
1510       return;
1511     }
1512   }
1513   if (!writeReqHead_ && bufferCallback_) {
1514     bufferCallback_->onEgressBufferCleared();
1515   }
1516 }
1517
1518 void AsyncSocket::checkForImmediateRead() noexcept {
1519   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1520   // (However, note that some subclasses do override this method.)
1521   //
1522   // Simply calling handleRead() here would be bad, as this would call
1523   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1524   // buffer even though no data may be available.  This would waste lots of
1525   // memory, since the buffer will sit around unused until the socket actually
1526   // becomes readable.
1527   //
1528   // Checking if the socket is readable now also seems like it would probably
1529   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1530   // would just waste an extra system call.  Even if it is readable, waiting to
1531   // find out from libevent on the next event loop doesn't seem that bad.
1532 }
1533
1534 void AsyncSocket::handleInitialReadWrite() noexcept {
1535   // Our callers should already be holding a DestructorGuard, but grab
1536   // one here just to make sure, in case one of our calling code paths ever
1537   // changes.
1538   DestructorGuard dg(this);
1539
1540   // If we have a readCallback_, make sure we enable read events.  We
1541   // may already be registered for reads if connectSuccess() set
1542   // the read calback.
1543   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1544     assert(state_ == StateEnum::ESTABLISHED);
1545     assert((shutdownFlags_ & SHUT_READ) == 0);
1546     if (!updateEventRegistration(EventHandler::READ, 0)) {
1547       assert(state_ == StateEnum::ERROR);
1548       return;
1549     }
1550     checkForImmediateRead();
1551   } else if (readCallback_ == nullptr) {
1552     // Unregister for read events.
1553     updateEventRegistration(0, EventHandler::READ);
1554   }
1555
1556   // If we have write requests pending, try to send them immediately.
1557   // Since we just finished accepting, there is a very good chance that we can
1558   // write without blocking.
1559   //
1560   // However, we only process them if EventHandler::WRITE is not already set,
1561   // which means that we're already blocked on a write attempt.  (This can
1562   // happen if connectSuccess() called write() before returning.)
1563   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1564     // Call handleWrite() to perform write processing.
1565     handleWrite();
1566   } else if (writeReqHead_ == nullptr) {
1567     // Unregister for write event.
1568     updateEventRegistration(0, EventHandler::WRITE);
1569   }
1570 }
1571
1572 void AsyncSocket::handleConnect() noexcept {
1573   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1574           << ", state=" << state_;
1575   assert(state_ == StateEnum::CONNECTING);
1576   // SHUT_WRITE can never be set while we are still connecting;
1577   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1578   // finishes
1579   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1580
1581   // In case we had a connect timeout, cancel the timeout
1582   writeTimeout_.cancelTimeout();
1583   // We don't use a persistent registration when waiting on a connect event,
1584   // so we have been automatically unregistered now.  Update eventFlags_ to
1585   // reflect reality.
1586   assert(eventFlags_ == EventHandler::WRITE);
1587   eventFlags_ = EventHandler::NONE;
1588
1589   // Call getsockopt() to check if the connect succeeded
1590   int error;
1591   socklen_t len = sizeof(error);
1592   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1593   if (rv != 0) {
1594     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1595                            withAddr("error calling getsockopt() after connect"),
1596                            errno);
1597     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1598                << fd_ << " host=" << addr_.describe()
1599                << ") exception:" << ex.what();
1600     return failConnect(__func__, ex);
1601   }
1602
1603   if (error != 0) {
1604     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1605                            "connect failed", error);
1606     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1607             << fd_ << " host=" << addr_.describe()
1608             << ") exception: " << ex.what();
1609     return failConnect(__func__, ex);
1610   }
1611
1612   // Move into STATE_ESTABLISHED
1613   state_ = StateEnum::ESTABLISHED;
1614
1615   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1616   // perform, immediately shutdown the write half of the socket.
1617   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1618     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1619     // are still connecting we just abort the connect rather than waiting for
1620     // it to complete.
1621     assert((shutdownFlags_ & SHUT_READ) == 0);
1622     ::shutdown(fd_, SHUT_WR);
1623     shutdownFlags_ |= SHUT_WRITE;
1624   }
1625
1626   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1627           << "successfully connected; state=" << state_;
1628
1629   // Remember the EventBase we are attached to, before we start invoking any
1630   // callbacks (since the callbacks may call detachEventBase()).
1631   EventBase* originalEventBase = eventBase_;
1632
1633   invokeConnectSuccess();
1634   // Note that the connect callback may have changed our state.
1635   // (set or unset the read callback, called write(), closed the socket, etc.)
1636   // The following code needs to handle these situations correctly.
1637   //
1638   // If the socket has been closed, readCallback_ and writeReqHead_ will
1639   // always be nullptr, so that will prevent us from trying to read or write.
1640   //
1641   // The main thing to check for is if eventBase_ is still originalEventBase.
1642   // If not, we have been detached from this event base, so we shouldn't
1643   // perform any more operations.
1644   if (eventBase_ != originalEventBase) {
1645     return;
1646   }
1647
1648   handleInitialReadWrite();
1649 }
1650
1651 void AsyncSocket::timeoutExpired() noexcept {
1652   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1653           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1654   DestructorGuard dg(this);
1655   assert(eventBase_->isInEventBaseThread());
1656
1657   if (state_ == StateEnum::CONNECTING) {
1658     // connect() timed out
1659     // Unregister for I/O events.
1660     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1661                            "connect timed out");
1662     failConnect(__func__, ex);
1663   } else {
1664     // a normal write operation timed out
1665     assert(state_ == StateEnum::ESTABLISHED);
1666     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1667     failWrite(__func__, ex);
1668   }
1669 }
1670
1671 ssize_t AsyncSocket::performWrite(const iovec* vec,
1672                                    uint32_t count,
1673                                    WriteFlags flags,
1674                                    uint32_t* countWritten,
1675                                    uint32_t* partialWritten) {
1676   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1677   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1678   // (since it may terminate the program if the main program doesn't explicitly
1679   // ignore it).
1680   struct msghdr msg;
1681   msg.msg_name = nullptr;
1682   msg.msg_namelen = 0;
1683   msg.msg_iov = const_cast<iovec *>(vec);
1684   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1685   msg.msg_control = nullptr;
1686   msg.msg_controllen = 0;
1687   msg.msg_flags = 0;
1688
1689   int msg_flags = MSG_DONTWAIT;
1690
1691 #ifdef MSG_NOSIGNAL // Linux-only
1692   msg_flags |= MSG_NOSIGNAL;
1693   if (isSet(flags, WriteFlags::CORK)) {
1694     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1695     // give it the rest of the data rather than immediately sending a partial
1696     // frame, even when TCP_NODELAY is enabled.
1697     msg_flags |= MSG_MORE;
1698   }
1699 #endif
1700   if (isSet(flags, WriteFlags::EOR)) {
1701     // marks that this is the last byte of a record (response)
1702     msg_flags |= MSG_EOR;
1703   }
1704   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1705   if (totalWritten < 0) {
1706     if (errno == EAGAIN) {
1707       // TCP buffer is full; we can't write any more data right now.
1708       *countWritten = 0;
1709       *partialWritten = 0;
1710       return 0;
1711     }
1712     // error
1713     *countWritten = 0;
1714     *partialWritten = 0;
1715     return -1;
1716   }
1717
1718   appBytesWritten_ += totalWritten;
1719
1720   uint32_t bytesWritten;
1721   uint32_t n;
1722   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1723     const iovec* v = vec + n;
1724     if (v->iov_len > bytesWritten) {
1725       // Partial write finished in the middle of this iovec
1726       *countWritten = n;
1727       *partialWritten = bytesWritten;
1728       return totalWritten;
1729     }
1730
1731     bytesWritten -= v->iov_len;
1732   }
1733
1734   assert(bytesWritten == 0);
1735   *countWritten = n;
1736   *partialWritten = 0;
1737   return totalWritten;
1738 }
1739
1740 /**
1741  * Re-register the EventHandler after eventFlags_ has changed.
1742  *
1743  * If an error occurs, fail() is called to move the socket into the error state
1744  * and call all currently installed callbacks.  After an error, the
1745  * AsyncSocket is completely unregistered.
1746  *
1747  * @return Returns true on succcess, or false on error.
1748  */
1749 bool AsyncSocket::updateEventRegistration() {
1750   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1751           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1752           << ", events=" << std::hex << eventFlags_;
1753   assert(eventBase_->isInEventBaseThread());
1754   if (eventFlags_ == EventHandler::NONE) {
1755     ioHandler_.unregisterHandler();
1756     return true;
1757   }
1758
1759   // Always register for persistent events, so we don't have to re-register
1760   // after being called back.
1761   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1762     eventFlags_ = EventHandler::NONE; // we're not registered after error
1763     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1764         withAddr("failed to update AsyncSocket event registration"));
1765     fail("updateEventRegistration", ex);
1766     return false;
1767   }
1768
1769   return true;
1770 }
1771
1772 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1773                                            uint16_t disable) {
1774   uint16_t oldFlags = eventFlags_;
1775   eventFlags_ |= enable;
1776   eventFlags_ &= ~disable;
1777   if (eventFlags_ == oldFlags) {
1778     return true;
1779   } else {
1780     return updateEventRegistration();
1781   }
1782 }
1783
1784 void AsyncSocket::startFail() {
1785   // startFail() should only be called once
1786   assert(state_ != StateEnum::ERROR);
1787   assert(getDestructorGuardCount() > 0);
1788   state_ = StateEnum::ERROR;
1789   // Ensure that SHUT_READ and SHUT_WRITE are set,
1790   // so all future attempts to read or write will be rejected
1791   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1792
1793   if (eventFlags_ != EventHandler::NONE) {
1794     eventFlags_ = EventHandler::NONE;
1795     ioHandler_.unregisterHandler();
1796   }
1797   writeTimeout_.cancelTimeout();
1798
1799   if (fd_ >= 0) {
1800     ioHandler_.changeHandlerFD(-1);
1801     doClose();
1802   }
1803 }
1804
1805 void AsyncSocket::finishFail() {
1806   assert(state_ == StateEnum::ERROR);
1807   assert(getDestructorGuardCount() > 0);
1808
1809   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1810                          withAddr("socket closing after error"));
1811   invokeConnectErr(ex);
1812   failAllWrites(ex);
1813
1814   if (readCallback_) {
1815     ReadCallback* callback = readCallback_;
1816     readCallback_ = nullptr;
1817     callback->readErr(ex);
1818   }
1819 }
1820
1821 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1822   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1823              << state_ << " host=" << addr_.describe()
1824              << "): failed in " << fn << "(): "
1825              << ex.what();
1826   startFail();
1827   finishFail();
1828 }
1829
1830 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1831   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1832                << state_ << " host=" << addr_.describe()
1833                << "): failed while connecting in " << fn << "(): "
1834                << ex.what();
1835   startFail();
1836
1837   invokeConnectErr(ex);
1838   finishFail();
1839 }
1840
1841 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1842   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1843                << state_ << " host=" << addr_.describe()
1844                << "): failed while reading in " << fn << "(): "
1845                << ex.what();
1846   startFail();
1847
1848   if (readCallback_ != nullptr) {
1849     ReadCallback* callback = readCallback_;
1850     readCallback_ = nullptr;
1851     callback->readErr(ex);
1852   }
1853
1854   finishFail();
1855 }
1856
1857 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1858   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1859                << state_ << " host=" << addr_.describe()
1860                << "): failed while writing in " << fn << "(): "
1861                << ex.what();
1862   startFail();
1863
1864   // Only invoke the first write callback, since the error occurred while
1865   // writing this request.  Let any other pending write callbacks be invoked in
1866   // finishFail().
1867   if (writeReqHead_ != nullptr) {
1868     WriteRequest* req = writeReqHead_;
1869     writeReqHead_ = req->getNext();
1870     WriteCallback* callback = req->getCallback();
1871     uint32_t bytesWritten = req->getTotalBytesWritten();
1872     req->destroy();
1873     if (callback) {
1874       callback->writeErr(bytesWritten, ex);
1875     }
1876   }
1877
1878   finishFail();
1879 }
1880
1881 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1882                              size_t bytesWritten,
1883                              const AsyncSocketException& ex) {
1884   // This version of failWrite() is used when the failure occurs before
1885   // we've added the callback to writeReqHead_.
1886   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1887              << state_ << " host=" << addr_.describe()
1888              <<"): failed while writing in " << fn << "(): "
1889              << ex.what();
1890   startFail();
1891
1892   if (callback != nullptr) {
1893     callback->writeErr(bytesWritten, ex);
1894   }
1895
1896   finishFail();
1897 }
1898
1899 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1900   // Invoke writeError() on all write callbacks.
1901   // This is used when writes are forcibly shutdown with write requests
1902   // pending, or when an error occurs with writes pending.
1903   while (writeReqHead_ != nullptr) {
1904     WriteRequest* req = writeReqHead_;
1905     writeReqHead_ = req->getNext();
1906     WriteCallback* callback = req->getCallback();
1907     if (callback) {
1908       callback->writeErr(req->getTotalBytesWritten(), ex);
1909     }
1910     req->destroy();
1911   }
1912 }
1913
1914 void AsyncSocket::invalidState(ConnectCallback* callback) {
1915   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1916              << "): connect() called in invalid state " << state_;
1917
1918   /*
1919    * The invalidState() methods don't use the normal failure mechanisms,
1920    * since we don't know what state we are in.  We don't want to call
1921    * startFail()/finishFail() recursively if we are already in the middle of
1922    * cleaning up.
1923    */
1924
1925   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1926                          "connect() called with socket in invalid state");
1927   connectEndTime_ = std::chrono::steady_clock::now();
1928   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1929     if (callback) {
1930       callback->connectErr(ex);
1931     }
1932   } else {
1933     // We can't use failConnect() here since connectCallback_
1934     // may already be set to another callback.  Invoke this ConnectCallback
1935     // here; any other connectCallback_ will be invoked in finishFail()
1936     startFail();
1937     if (callback) {
1938       callback->connectErr(ex);
1939     }
1940     finishFail();
1941   }
1942 }
1943
1944 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1945   connectEndTime_ = std::chrono::steady_clock::now();
1946   if (connectCallback_) {
1947     ConnectCallback* callback = connectCallback_;
1948     connectCallback_ = nullptr;
1949     callback->connectErr(ex);
1950   }
1951 }
1952
1953 void AsyncSocket::invokeConnectSuccess() {
1954   connectEndTime_ = std::chrono::steady_clock::now();
1955   if (connectCallback_) {
1956     ConnectCallback* callback = connectCallback_;
1957     connectCallback_ = nullptr;
1958     callback->connectSuccess();
1959   }
1960 }
1961
1962 void AsyncSocket::invalidState(ReadCallback* callback) {
1963   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1964              << "): setReadCallback(" << callback
1965              << ") called in invalid state " << state_;
1966
1967   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1968                          "setReadCallback() called with socket in "
1969                          "invalid state");
1970   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1971     if (callback) {
1972       callback->readErr(ex);
1973     }
1974   } else {
1975     startFail();
1976     if (callback) {
1977       callback->readErr(ex);
1978     }
1979     finishFail();
1980   }
1981 }
1982
1983 void AsyncSocket::invalidState(WriteCallback* callback) {
1984   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1985              << "): write() called in invalid state " << state_;
1986
1987   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1988                          withAddr("write() called with socket in invalid state"));
1989   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1990     if (callback) {
1991       callback->writeErr(0, ex);
1992     }
1993   } else {
1994     startFail();
1995     if (callback) {
1996       callback->writeErr(0, ex);
1997     }
1998     finishFail();
1999   }
2000 }
2001
2002 void AsyncSocket::doClose() {
2003   if (fd_ == -1) return;
2004   if (shutdownSocketSet_) {
2005     shutdownSocketSet_->close(fd_);
2006   } else {
2007     ::close(fd_);
2008   }
2009   fd_ = -1;
2010 }
2011
2012 std::ostream& operator << (std::ostream& os,
2013                            const AsyncSocket::StateEnum& state) {
2014   os << static_cast<int>(state);
2015   return os;
2016 }
2017
2018 std::string AsyncSocket::withAddr(const std::string& s) {
2019   // Don't use addr_ directly because it may not be initialized
2020   // e.g. if constructed from fd
2021   folly::SocketAddress peer, local;
2022   try {
2023     getPeerAddress(&peer);
2024     getLocalAddress(&local);
2025   } catch (const std::exception&) {
2026     // ignore
2027   } catch (...) {
2028     // ignore
2029   }
2030   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2031 }
2032
2033 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2034   bufferCallback_ = cb;
2035 }
2036
2037 } // folly