Move AsyncSocket tests from thrift to folly
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/BlockingSocket.h>
24 #include <folly/io/async/test/Util.h>
25
26 #include <gtest/gtest.h>
27 #include <boost/scoped_array.hpp>
28 #include <iostream>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <poll.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/tcp.h>
35 #include <thread>
36
37 using namespace boost;
38
39 using std::string;
40 using std::vector;
41 using std::min;
42 using std::cerr;
43 using std::endl;
44 using std::unique_ptr;
45 using std::chrono::milliseconds;
46 using boost::scoped_array;
47
48 using namespace folly;
49
50 enum StateEnum {
51   STATE_WAITING,
52   STATE_SUCCEEDED,
53   STATE_FAILED
54 };
55
56 typedef std::function<void()> VoidCallback;
57
58
59 class ConnCallback : public AsyncSocket::ConnectCallback {
60  public:
61   ConnCallback()
62     : state(STATE_WAITING)
63     , exception(AsyncSocketException::UNKNOWN, "none") {}
64
65   void connectSuccess() noexcept override {
66     state = STATE_SUCCEEDED;
67     if (successCallback) {
68       successCallback();
69     }
70   }
71
72   void connectErr(const AsyncSocketException& ex) noexcept override {
73     state = STATE_FAILED;
74     exception = ex;
75     if (errorCallback) {
76       errorCallback();
77     }
78   }
79
80   StateEnum state;
81   AsyncSocketException exception;
82   VoidCallback successCallback;
83   VoidCallback errorCallback;
84 };
85
86 class WriteCallback : public AsyncTransportWrapper::WriteCallback {
87  public:
88   WriteCallback()
89     : state(STATE_WAITING)
90     , bytesWritten(0)
91     , exception(AsyncSocketException::UNKNOWN, "none") {}
92
93   void writeSuccess() noexcept override {
94     state = STATE_SUCCEEDED;
95     if (successCallback) {
96       successCallback();
97     }
98   }
99
100   void writeErr(size_t bytesWritten,
101                 const AsyncSocketException& ex) noexcept override {
102     state = STATE_FAILED;
103     this->bytesWritten = bytesWritten;
104     exception = ex;
105     if (errorCallback) {
106       errorCallback();
107     }
108   }
109
110   StateEnum state;
111   size_t bytesWritten;
112   AsyncSocketException exception;
113   VoidCallback successCallback;
114   VoidCallback errorCallback;
115 };
116
117 class ReadCallback : public AsyncTransportWrapper::ReadCallback {
118  public:
119   ReadCallback()
120     : state(STATE_WAITING)
121     , exception(AsyncSocketException::UNKNOWN, "none")
122     , buffers() {}
123
124   ~ReadCallback() {
125     for (vector<Buffer>::iterator it = buffers.begin();
126          it != buffers.end();
127          ++it) {
128       it->free();
129     }
130     currentBuffer.free();
131   }
132
133   void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
134     if (!currentBuffer.buffer) {
135       currentBuffer.allocate(4096);
136     }
137     *bufReturn = currentBuffer.buffer;
138     *lenReturn = currentBuffer.length;
139   }
140
141   void readDataAvailable(size_t len) noexcept override {
142     currentBuffer.length = len;
143     buffers.push_back(currentBuffer);
144     currentBuffer.reset();
145     if (dataAvailableCallback) {
146       dataAvailableCallback();
147     }
148   }
149
150   void readEOF() noexcept override {
151     state = STATE_SUCCEEDED;
152   }
153
154   void readErr(const AsyncSocketException& ex) noexcept override {
155     state = STATE_FAILED;
156     exception = ex;
157   }
158
159   void verifyData(const char* expected, size_t expectedLen) const {
160     size_t offset = 0;
161     for (size_t idx = 0; idx < buffers.size(); ++idx) {
162       const auto& buf = buffers[idx];
163       size_t cmpLen = std::min(buf.length, expectedLen - offset);
164       CHECK_EQ(memcmp(buf.buffer, expected + offset, cmpLen), 0);
165       CHECK_EQ(cmpLen, buf.length);
166       offset += cmpLen;
167     }
168     CHECK_EQ(offset, expectedLen);
169   }
170
171   class Buffer {
172    public:
173     Buffer() : buffer(nullptr), length(0) {}
174     Buffer(char* buf, size_t len) : buffer(buf), length(len) {}
175
176     void reset() {
177       buffer = nullptr;
178       length = 0;
179     }
180     void allocate(size_t length) {
181       assert(buffer == nullptr);
182       this->buffer = static_cast<char*>(malloc(length));
183       this->length = length;
184     }
185     void free() {
186       ::free(buffer);
187       reset();
188     }
189
190     char* buffer;
191     size_t length;
192   };
193
194   StateEnum state;
195   AsyncSocketException exception;
196   vector<Buffer> buffers;
197   Buffer currentBuffer;
198   VoidCallback dataAvailableCallback;
199 };
200
201 class ReadVerifier {
202 };
203
204 class TestServer {
205  public:
206   // Create a TestServer.
207   // This immediately starts listening on an ephemeral port.
208   TestServer()
209     : fd_(-1) {
210     fd_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
211     if (fd_ < 0) {
212       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
213                                 "failed to create test server socket", errno);
214     }
215     if (fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) {
216       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
217                                 "failed to put test server socket in "
218                                 "non-blocking mode", errno);
219     }
220     if (listen(fd_, 10) != 0) {
221       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
222                                 "failed to listen on test server socket",
223                                 errno);
224     }
225
226     address_.setFromLocalAddress(fd_);
227     // The local address will contain 0.0.0.0.
228     // Change it to 127.0.0.1, so it can be used to connect to the server
229     address_.setFromIpPort("127.0.0.1", address_.getPort());
230   }
231
232   // Get the address for connecting to the server
233   const folly::SocketAddress& getAddress() const {
234     return address_;
235   }
236
237   int acceptFD(int timeout=50) {
238     struct pollfd pfd;
239     pfd.fd = fd_;
240     pfd.events = POLLIN;
241     int ret = poll(&pfd, 1, timeout);
242     if (ret == 0) {
243       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
244                                 "test server accept() timed out");
245     } else if (ret < 0) {
246       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
247                                 "test server accept() poll failed", errno);
248     }
249
250     int acceptedFd = ::accept(fd_, nullptr, nullptr);
251     if (acceptedFd < 0) {
252       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
253                                 "test server accept() failed", errno);
254     }
255
256     return acceptedFd;
257   }
258
259   std::shared_ptr<BlockingSocket> accept(int timeout=50) {
260     int fd = acceptFD(timeout);
261     return std::shared_ptr<BlockingSocket>(new BlockingSocket(fd));
262   }
263
264   std::shared_ptr<AsyncSocket> acceptAsync(EventBase* evb, int timeout=50) {
265     int fd = acceptFD(timeout);
266     return AsyncSocket::newSocket(evb, fd);
267   }
268
269   /**
270    * Accept a connection, read data from it, and verify that it matches the
271    * data in the specified buffer.
272    */
273   void verifyConnection(const char* buf, size_t len) {
274     // accept a connection
275     std::shared_ptr<BlockingSocket> acceptedSocket = accept();
276     // read the data and compare it to the specified buffer
277     scoped_array<uint8_t> readbuf(new uint8_t[len]);
278     acceptedSocket->readAll(readbuf.get(), len);
279     CHECK_EQ(memcmp(buf, readbuf.get(), len), 0);
280     // make sure we get EOF next
281     uint32_t bytesRead = acceptedSocket->read(readbuf.get(), len);
282     CHECK_EQ(bytesRead, 0);
283   }
284
285  private:
286   int fd_;
287   folly::SocketAddress address_;
288 };
289
290 class DelayedWrite: public AsyncTimeout {
291  public:
292   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
293       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
294       bool cork, bool lastWrite = false):
295     AsyncTimeout(socket->getEventBase()),
296     socket_(socket),
297     bufs_(std::move(bufs)),
298     wcb_(wcb),
299     cork_(cork),
300     lastWrite_(lastWrite) {}
301
302  private:
303   void timeoutExpired() noexcept override {
304     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
305     socket_->writeChain(wcb_, std::move(bufs_), flags);
306     if (lastWrite_) {
307       socket_->shutdownWrite();
308     }
309   }
310
311   std::shared_ptr<AsyncSocket> socket_;
312   unique_ptr<IOBuf> bufs_;
313   AsyncTransportWrapper::WriteCallback* wcb_;
314   bool cork_;
315   bool lastWrite_;
316 };
317
318 ///////////////////////////////////////////////////////////////////////////
319 // connect() tests
320 ///////////////////////////////////////////////////////////////////////////
321
322 /**
323  * Test connecting to a server
324  */
325 TEST(AsyncSocketTest, Connect) {
326   // Start listening on a local port
327   TestServer server;
328
329   // Connect using a AsyncSocket
330   EventBase evb;
331   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
332   ConnCallback cb;
333   socket->connect(&cb, server.getAddress(), 30);
334
335   evb.loop();
336
337   CHECK_EQ(cb.state, STATE_SUCCEEDED);
338 }
339
340 /**
341  * Test connecting to a server that isn't listening
342  */
343 TEST(AsyncSocketTest, ConnectRefused) {
344   EventBase evb;
345
346   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
347
348   // Hopefully nothing is actually listening on this address
349   folly::SocketAddress addr("127.0.0.1", 65535);
350   ConnCallback cb;
351   socket->connect(&cb, addr, 30);
352
353   evb.loop();
354
355   CHECK_EQ(cb.state, STATE_FAILED);
356   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
357 }
358
359 /**
360  * Test connection timeout
361  */
362 TEST(AsyncSocketTest, ConnectTimeout) {
363   EventBase evb;
364
365   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
366
367   // Try connecting to server that won't respond.
368   //
369   // This depends somewhat on the network where this test is run.
370   // Hopefully this IP will be routable but unresponsive.
371   // (Alternatively, we could try listening on a local raw socket, but that
372   // normally requires root privileges.)
373   folly::SocketAddress addr("8.8.8.8", 65535);
374   ConnCallback cb;
375   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
376
377   evb.loop();
378
379   CHECK_EQ(cb.state, STATE_FAILED);
380   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
381
382   // Verify that we can still get the peer address after a timeout.
383   // Use case is if the client was created from a client pool, and we want
384   // to log which peer failed.
385   folly::SocketAddress peer;
386   socket->getPeerAddress(&peer);
387   CHECK_EQ(peer, addr);
388 }
389
390 /**
391  * Test writing immediately after connecting, without waiting for connect
392  * to finish.
393  */
394 TEST(AsyncSocketTest, ConnectAndWrite) {
395   TestServer server;
396
397   // connect()
398   EventBase evb;
399   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
400   ConnCallback ccb;
401   socket->connect(&ccb, server.getAddress(), 30);
402
403   // write()
404   char buf[128];
405   memset(buf, 'a', sizeof(buf));
406   WriteCallback wcb;
407   socket->write(&wcb, buf, sizeof(buf));
408
409   // Loop.  We don't bother accepting on the server socket yet.
410   // The kernel should be able to buffer the write request so it can succeed.
411   evb.loop();
412
413   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
414   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
415
416   // Make sure the server got a connection and received the data
417   socket->close();
418   server.verifyConnection(buf, sizeof(buf));
419 }
420
421 /**
422  * Test connecting using a nullptr connect callback.
423  */
424 TEST(AsyncSocketTest, ConnectNullCallback) {
425   TestServer server;
426
427   // connect()
428   EventBase evb;
429   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
430   socket->connect(nullptr, server.getAddress(), 30);
431
432   // write some data, just so we have some way of verifing
433   // that the socket works correctly after connecting
434   char buf[128];
435   memset(buf, 'a', sizeof(buf));
436   WriteCallback wcb;
437   socket->write(&wcb, buf, sizeof(buf));
438
439   evb.loop();
440
441   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
442
443   // Make sure the server got a connection and received the data
444   socket->close();
445   server.verifyConnection(buf, sizeof(buf));
446 }
447
448 /**
449  * Test calling both write() and close() immediately after connecting, without
450  * waiting for connect to finish.
451  *
452  * This exercises the STATE_CONNECTING_CLOSING code.
453  */
454 TEST(AsyncSocketTest, ConnectWriteAndClose) {
455   TestServer server;
456
457   // connect()
458   EventBase evb;
459   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
460   ConnCallback ccb;
461   socket->connect(&ccb, server.getAddress(), 30);
462
463   // write()
464   char buf[128];
465   memset(buf, 'a', sizeof(buf));
466   WriteCallback wcb;
467   socket->write(&wcb, buf, sizeof(buf));
468
469   // close()
470   socket->close();
471
472   // Loop.  We don't bother accepting on the server socket yet.
473   // The kernel should be able to buffer the write request so it can succeed.
474   evb.loop();
475
476   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
477   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
478
479   // Make sure the server got a connection and received the data
480   server.verifyConnection(buf, sizeof(buf));
481 }
482
483 /**
484  * Test calling close() immediately after connect()
485  */
486 TEST(AsyncSocketTest, ConnectAndClose) {
487   TestServer server;
488
489   // Connect using a AsyncSocket
490   EventBase evb;
491   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
492   ConnCallback ccb;
493   socket->connect(&ccb, server.getAddress(), 30);
494
495   // Hopefully the connect didn't succeed immediately.
496   // If it did, we can't exercise the close-while-connecting code path.
497   if (ccb.state == STATE_SUCCEEDED) {
498     LOG(INFO) << "connect() succeeded immediately; aborting test "
499                        "of close-during-connect behavior";
500     return;
501   }
502
503   socket->close();
504
505   // Loop, although there shouldn't be anything to do.
506   evb.loop();
507
508   // Make sure the connection was aborted
509   CHECK_EQ(ccb.state, STATE_FAILED);
510 }
511
512 /**
513  * Test calling closeNow() immediately after connect()
514  *
515  * This should be identical to the normal close behavior.
516  */
517 TEST(AsyncSocketTest, ConnectAndCloseNow) {
518   TestServer server;
519
520   // Connect using a AsyncSocket
521   EventBase evb;
522   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
523   ConnCallback ccb;
524   socket->connect(&ccb, server.getAddress(), 30);
525
526   // Hopefully the connect didn't succeed immediately.
527   // If it did, we can't exercise the close-while-connecting code path.
528   if (ccb.state == STATE_SUCCEEDED) {
529     LOG(INFO) << "connect() succeeded immediately; aborting test "
530                        "of closeNow()-during-connect behavior";
531     return;
532   }
533
534   socket->closeNow();
535
536   // Loop, although there shouldn't be anything to do.
537   evb.loop();
538
539   // Make sure the connection was aborted
540   CHECK_EQ(ccb.state, STATE_FAILED);
541 }
542
543 /**
544  * Test calling both write() and closeNow() immediately after connecting,
545  * without waiting for connect to finish.
546  *
547  * This should abort the pending write.
548  */
549 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
550   TestServer server;
551
552   // connect()
553   EventBase evb;
554   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
555   ConnCallback ccb;
556   socket->connect(&ccb, server.getAddress(), 30);
557
558   // Hopefully the connect didn't succeed immediately.
559   // If it did, we can't exercise the close-while-connecting code path.
560   if (ccb.state == STATE_SUCCEEDED) {
561     LOG(INFO) << "connect() succeeded immediately; aborting test "
562                        "of write-during-connect behavior";
563     return;
564   }
565
566   // write()
567   char buf[128];
568   memset(buf, 'a', sizeof(buf));
569   WriteCallback wcb;
570   socket->write(&wcb, buf, sizeof(buf));
571
572   // close()
573   socket->closeNow();
574
575   // Loop, although there shouldn't be anything to do.
576   evb.loop();
577
578   CHECK_EQ(ccb.state, STATE_FAILED);
579   CHECK_EQ(wcb.state, STATE_FAILED);
580 }
581
582 /**
583  * Test installing a read callback immediately, before connect() finishes.
584  */
585 TEST(AsyncSocketTest, ConnectAndRead) {
586   TestServer server;
587
588   // connect()
589   EventBase evb;
590   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
591   ConnCallback ccb;
592   socket->connect(&ccb, server.getAddress(), 30);
593
594   ReadCallback rcb;
595   socket->setReadCB(&rcb);
596
597   // Even though we haven't looped yet, we should be able to accept
598   // the connection and send data to it.
599   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
600   uint8_t buf[128];
601   memset(buf, 'a', sizeof(buf));
602   acceptedSocket->write(buf, sizeof(buf));
603   acceptedSocket->flush();
604   acceptedSocket->close();
605
606   // Loop, although there shouldn't be anything to do.
607   evb.loop();
608
609   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
610   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
611   CHECK_EQ(rcb.buffers.size(), 1);
612   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
613   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
614 }
615
616 /**
617  * Test installing a read callback and then closing immediately before the
618  * connect attempt finishes.
619  */
620 TEST(AsyncSocketTest, ConnectReadAndClose) {
621   TestServer server;
622
623   // connect()
624   EventBase evb;
625   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
626   ConnCallback ccb;
627   socket->connect(&ccb, server.getAddress(), 30);
628
629   // Hopefully the connect didn't succeed immediately.
630   // If it did, we can't exercise the close-while-connecting code path.
631   if (ccb.state == STATE_SUCCEEDED) {
632     LOG(INFO) << "connect() succeeded immediately; aborting test "
633                        "of read-during-connect behavior";
634     return;
635   }
636
637   ReadCallback rcb;
638   socket->setReadCB(&rcb);
639
640   // close()
641   socket->close();
642
643   // Loop, although there shouldn't be anything to do.
644   evb.loop();
645
646   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
647   CHECK_EQ(rcb.buffers.size(), 0);
648   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
649 }
650
651 /**
652  * Test both writing and installing a read callback immediately,
653  * before connect() finishes.
654  */
655 TEST(AsyncSocketTest, ConnectWriteAndRead) {
656   TestServer server;
657
658   // connect()
659   EventBase evb;
660   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
661   ConnCallback ccb;
662   socket->connect(&ccb, server.getAddress(), 30);
663
664   // write()
665   char buf1[128];
666   memset(buf1, 'a', sizeof(buf1));
667   WriteCallback wcb;
668   socket->write(&wcb, buf1, sizeof(buf1));
669
670   // set a read callback
671   ReadCallback rcb;
672   socket->setReadCB(&rcb);
673
674   // Even though we haven't looped yet, we should be able to accept
675   // the connection and send data to it.
676   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
677   uint8_t buf2[128];
678   memset(buf2, 'b', sizeof(buf2));
679   acceptedSocket->write(buf2, sizeof(buf2));
680   acceptedSocket->flush();
681
682   // shut down the write half of acceptedSocket, so that the AsyncSocket
683   // will stop reading and we can break out of the event loop.
684   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
685
686   // Loop
687   evb.loop();
688
689   // Make sure the connect succeeded
690   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
691
692   // Make sure the AsyncSocket read the data written by the accepted socket
693   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
694   CHECK_EQ(rcb.buffers.size(), 1);
695   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
696   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
697
698   // Close the AsyncSocket so we'll see EOF on acceptedSocket
699   socket->close();
700
701   // Make sure the accepted socket saw the data written by the AsyncSocket
702   uint8_t readbuf[sizeof(buf1)];
703   acceptedSocket->readAll(readbuf, sizeof(readbuf));
704   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
705   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
706   CHECK_EQ(bytesRead, 0);
707 }
708
709 /**
710  * Test writing to the socket then shutting down writes before the connect
711  * attempt finishes.
712  */
713 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
714   TestServer server;
715
716   // connect()
717   EventBase evb;
718   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
719   ConnCallback ccb;
720   socket->connect(&ccb, server.getAddress(), 30);
721
722   // Hopefully the connect didn't succeed immediately.
723   // If it did, we can't exercise the write-while-connecting code path.
724   if (ccb.state == STATE_SUCCEEDED) {
725     LOG(INFO) << "connect() succeeded immediately; skipping test";
726     return;
727   }
728
729   // Ask to write some data
730   char wbuf[128];
731   memset(wbuf, 'a', sizeof(wbuf));
732   WriteCallback wcb;
733   socket->write(&wcb, wbuf, sizeof(wbuf));
734   socket->shutdownWrite();
735
736   // Shutdown writes
737   socket->shutdownWrite();
738
739   // Even though we haven't looped yet, we should be able to accept
740   // the connection.
741   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
742
743   // Since the connection is still in progress, there should be no data to
744   // read yet.  Verify that the accepted socket is not readable.
745   struct pollfd fds[1];
746   fds[0].fd = acceptedSocket->getSocketFD();
747   fds[0].events = POLLIN;
748   fds[0].revents = 0;
749   int rc = poll(fds, 1, 0);
750   CHECK_EQ(rc, 0);
751
752   // Write data to the accepted socket
753   uint8_t acceptedWbuf[192];
754   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
755   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
756   acceptedSocket->flush();
757
758   // Loop
759   evb.loop();
760
761   // The loop should have completed the connection, written the queued data,
762   // and shutdown writes on the socket.
763   //
764   // Check that the connection was completed successfully and that the write
765   // callback succeeded.
766   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
767   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
768
769   // Check that we can read the data that was written to the socket, and that
770   // we see an EOF, since its socket was half-shutdown.
771   uint8_t readbuf[sizeof(wbuf)];
772   acceptedSocket->readAll(readbuf, sizeof(readbuf));
773   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
774   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
775   CHECK_EQ(bytesRead, 0);
776
777   // Close the accepted socket.  This will cause it to see EOF
778   // and uninstall the read callback when we loop next.
779   acceptedSocket->close();
780
781   // Install a read callback, then loop again.
782   ReadCallback rcb;
783   socket->setReadCB(&rcb);
784   evb.loop();
785
786   // This loop should have read the data and seen the EOF
787   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
788   CHECK_EQ(rcb.buffers.size(), 1);
789   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
790   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
791                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
792 }
793
794 /**
795  * Test reading, writing, and shutting down writes before the connect attempt
796  * finishes.
797  */
798 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
799   TestServer server;
800
801   // connect()
802   EventBase evb;
803   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
804   ConnCallback ccb;
805   socket->connect(&ccb, server.getAddress(), 30);
806
807   // Hopefully the connect didn't succeed immediately.
808   // If it did, we can't exercise the write-while-connecting code path.
809   if (ccb.state == STATE_SUCCEEDED) {
810     LOG(INFO) << "connect() succeeded immediately; skipping test";
811     return;
812   }
813
814   // Install a read callback
815   ReadCallback rcb;
816   socket->setReadCB(&rcb);
817
818   // Ask to write some data
819   char wbuf[128];
820   memset(wbuf, 'a', sizeof(wbuf));
821   WriteCallback wcb;
822   socket->write(&wcb, wbuf, sizeof(wbuf));
823
824   // Shutdown writes
825   socket->shutdownWrite();
826
827   // Even though we haven't looped yet, we should be able to accept
828   // the connection.
829   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
830
831   // Since the connection is still in progress, there should be no data to
832   // read yet.  Verify that the accepted socket is not readable.
833   struct pollfd fds[1];
834   fds[0].fd = acceptedSocket->getSocketFD();
835   fds[0].events = POLLIN;
836   fds[0].revents = 0;
837   int rc = poll(fds, 1, 0);
838   CHECK_EQ(rc, 0);
839
840   // Write data to the accepted socket
841   uint8_t acceptedWbuf[192];
842   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
843   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
844   acceptedSocket->flush();
845   // Shutdown writes to the accepted socket.  This will cause it to see EOF
846   // and uninstall the read callback.
847   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
848
849   // Loop
850   evb.loop();
851
852   // The loop should have completed the connection, written the queued data,
853   // shutdown writes on the socket, read the data we wrote to it, and see the
854   // EOF.
855   //
856   // Check that the connection was completed successfully and that the read
857   // and write callbacks were invoked as expected.
858   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
859   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
860   CHECK_EQ(rcb.buffers.size(), 1);
861   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
862   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
863                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
864   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
865
866   // Check that we can read the data that was written to the socket, and that
867   // we see an EOF, since its socket was half-shutdown.
868   uint8_t readbuf[sizeof(wbuf)];
869   acceptedSocket->readAll(readbuf, sizeof(readbuf));
870   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
871   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
872   CHECK_EQ(bytesRead, 0);
873
874   // Fully close both sockets
875   acceptedSocket->close();
876   socket->close();
877 }
878
879 /**
880  * Test reading, writing, and calling shutdownWriteNow() before the
881  * connect attempt finishes.
882  */
883 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
884   TestServer server;
885
886   // connect()
887   EventBase evb;
888   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
889   ConnCallback ccb;
890   socket->connect(&ccb, server.getAddress(), 30);
891
892   // Hopefully the connect didn't succeed immediately.
893   // If it did, we can't exercise the write-while-connecting code path.
894   if (ccb.state == STATE_SUCCEEDED) {
895     LOG(INFO) << "connect() succeeded immediately; skipping test";
896     return;
897   }
898
899   // Install a read callback
900   ReadCallback rcb;
901   socket->setReadCB(&rcb);
902
903   // Ask to write some data
904   char wbuf[128];
905   memset(wbuf, 'a', sizeof(wbuf));
906   WriteCallback wcb;
907   socket->write(&wcb, wbuf, sizeof(wbuf));
908
909   // Shutdown writes immediately.
910   // This should immediately discard the data that we just tried to write.
911   socket->shutdownWriteNow();
912
913   // Verify that writeError() was invoked on the write callback.
914   CHECK_EQ(wcb.state, STATE_FAILED);
915   CHECK_EQ(wcb.bytesWritten, 0);
916
917   // Even though we haven't looped yet, we should be able to accept
918   // the connection.
919   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
920
921   // Since the connection is still in progress, there should be no data to
922   // read yet.  Verify that the accepted socket is not readable.
923   struct pollfd fds[1];
924   fds[0].fd = acceptedSocket->getSocketFD();
925   fds[0].events = POLLIN;
926   fds[0].revents = 0;
927   int rc = poll(fds, 1, 0);
928   CHECK_EQ(rc, 0);
929
930   // Write data to the accepted socket
931   uint8_t acceptedWbuf[192];
932   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
933   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
934   acceptedSocket->flush();
935   // Shutdown writes to the accepted socket.  This will cause it to see EOF
936   // and uninstall the read callback.
937   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
938
939   // Loop
940   evb.loop();
941
942   // The loop should have completed the connection, written the queued data,
943   // shutdown writes on the socket, read the data we wrote to it, and see the
944   // EOF.
945   //
946   // Check that the connection was completed successfully and that the read
947   // callback was invoked as expected.
948   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
949   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
950   CHECK_EQ(rcb.buffers.size(), 1);
951   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
952   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
953                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
954
955   // Since we used shutdownWriteNow(), it should have discarded all pending
956   // write data.  Verify we see an immediate EOF when reading from the accepted
957   // socket.
958   uint8_t readbuf[sizeof(wbuf)];
959   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
960   CHECK_EQ(bytesRead, 0);
961
962   // Fully close both sockets
963   acceptedSocket->close();
964   socket->close();
965 }
966
967 // Helper function for use in testConnectOptWrite()
968 // Temporarily disable the read callback
969 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
970   // Uninstall the read callback
971   socket->setReadCB(nullptr);
972   // Schedule the read callback to be reinstalled after 1ms
973   socket->getEventBase()->runInLoop(
974       std::bind(&AsyncSocket::setReadCB, socket, rcb));
975 }
976
977 /**
978  * Test connect+write, then have the connect callback perform another write.
979  *
980  * This tests interaction of the optimistic writing after connect with
981  * additional write attempts that occur in the connect callback.
982  */
983 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
984   TestServer server;
985   EventBase evb;
986   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
987
988   // connect()
989   ConnCallback ccb;
990   socket->connect(&ccb, server.getAddress(), 30);
991
992   // Hopefully the connect didn't succeed immediately.
993   // If it did, we can't exercise the optimistic write code path.
994   if (ccb.state == STATE_SUCCEEDED) {
995     LOG(INFO) << "connect() succeeded immediately; aborting test "
996                        "of optimistic write behavior";
997     return;
998   }
999
1000   // Tell the connect callback to perform a write when the connect succeeds
1001   WriteCallback wcb2;
1002   scoped_array<char> buf2(new char[size2]);
1003   memset(buf2.get(), 'b', size2);
1004   if (size2 > 0) {
1005     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
1006     // Tell the second write callback to close the connection when it is done
1007     wcb2.successCallback = [&] { socket->closeNow(); };
1008   }
1009
1010   // Schedule one write() immediately, before the connect finishes
1011   scoped_array<char> buf1(new char[size1]);
1012   memset(buf1.get(), 'a', size1);
1013   WriteCallback wcb1;
1014   if (size1 > 0) {
1015     socket->write(&wcb1, buf1.get(), size1);
1016   }
1017
1018   if (close) {
1019     // immediately perform a close, before connect() completes
1020     socket->close();
1021   }
1022
1023   // Start reading from the other endpoint after 10ms.
1024   // If we're using large buffers, we have to read so that the writes don't
1025   // block forever.
1026   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1027   ReadCallback rcb;
1028   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
1029                                         acceptedSocket.get(), &rcb);
1030   socket->getEventBase()->tryRunAfterDelay(
1031       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
1032       10);
1033
1034   // Loop.  We don't bother accepting on the server socket yet.
1035   // The kernel should be able to buffer the write request so it can succeed.
1036   evb.loop();
1037
1038   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1039   if (size1 > 0) {
1040     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1041   }
1042   if (size2 > 0) {
1043     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1044   }
1045
1046   socket->close();
1047
1048   // Make sure the read callback received all of the data
1049   size_t bytesRead = 0;
1050   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
1051        it != rcb.buffers.end();
1052        ++it) {
1053     size_t start = bytesRead;
1054     bytesRead += it->length;
1055     size_t end = bytesRead;
1056     if (start < size1) {
1057       size_t cmpLen = min(size1, end) - start;
1058       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
1059     }
1060     if (end > size1 && end <= size1 + size2) {
1061       size_t itOffset;
1062       size_t buf2Offset;
1063       size_t cmpLen;
1064       if (start >= size1) {
1065         itOffset = 0;
1066         buf2Offset = start - size1;
1067         cmpLen = end - start;
1068       } else {
1069         itOffset = size1 - start;
1070         buf2Offset = 0;
1071         cmpLen = end - size1;
1072       }
1073       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
1074                                cmpLen),
1075                         0);
1076     }
1077   }
1078   CHECK_EQ(bytesRead, size1 + size2);
1079 }
1080
1081 TEST(AsyncSocketTest, ConnectCallbackWrite) {
1082   // Test using small writes that should both succeed immediately
1083   testConnectOptWrite(100, 200);
1084
1085   // Test using a large buffer in the connect callback, that should block
1086   const size_t largeSize = 8*1024*1024;
1087   testConnectOptWrite(100, largeSize);
1088
1089   // Test using a large initial write
1090   testConnectOptWrite(largeSize, 100);
1091
1092   // Test using two large buffers
1093   testConnectOptWrite(largeSize, largeSize);
1094
1095   // Test a small write in the connect callback,
1096   // but no immediate write before connect completes
1097   testConnectOptWrite(0, 64);
1098
1099   // Test a large write in the connect callback,
1100   // but no immediate write before connect completes
1101   testConnectOptWrite(0, largeSize);
1102
1103   // Test connect, a small write, then immediately call close() before connect
1104   // completes
1105   testConnectOptWrite(211, 0, true);
1106
1107   // Test connect, a large immediate write (that will block), then immediately
1108   // call close() before connect completes
1109   testConnectOptWrite(largeSize, 0, true);
1110 }
1111
1112 ///////////////////////////////////////////////////////////////////////////
1113 // write() related tests
1114 ///////////////////////////////////////////////////////////////////////////
1115
1116 /**
1117  * Test writing using a nullptr callback
1118  */
1119 TEST(AsyncSocketTest, WriteNullCallback) {
1120   TestServer server;
1121
1122   // connect()
1123   EventBase evb;
1124   std::shared_ptr<AsyncSocket> socket =
1125     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1126   evb.loop(); // loop until the socket is connected
1127
1128   // write() with a nullptr callback
1129   char buf[128];
1130   memset(buf, 'a', sizeof(buf));
1131   socket->write(nullptr, buf, sizeof(buf));
1132
1133   evb.loop(); // loop until the data is sent
1134
1135   // Make sure the server got a connection and received the data
1136   socket->close();
1137   server.verifyConnection(buf, sizeof(buf));
1138 }
1139
1140 /**
1141  * Test writing with a send timeout
1142  */
1143 TEST(AsyncSocketTest, WriteTimeout) {
1144   TestServer server;
1145
1146   // connect()
1147   EventBase evb;
1148   std::shared_ptr<AsyncSocket> socket =
1149     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1150   evb.loop(); // loop until the socket is connected
1151
1152   // write() a large chunk of data, with no-one on the other end reading
1153   size_t writeLength = 8*1024*1024;
1154   uint32_t timeout = 200;
1155   socket->setSendTimeout(timeout);
1156   scoped_array<char> buf(new char[writeLength]);
1157   memset(buf.get(), 'a', writeLength);
1158   WriteCallback wcb;
1159   socket->write(&wcb, buf.get(), writeLength);
1160
1161   TimePoint start;
1162   evb.loop();
1163   TimePoint end;
1164
1165   // Make sure the write attempt timed out as requested
1166   CHECK_EQ(wcb.state, STATE_FAILED);
1167   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1168
1169   // Check that the write timed out within a reasonable period of time.
1170   // We don't check for exactly the specified timeout, since AsyncSocket only
1171   // times out when it hasn't made progress for that period of time.
1172   //
1173   // On linux, the first write sends a few hundred kb of data, then blocks for
1174   // writability, and then unblocks again after 40ms and is able to write
1175   // another smaller of data before blocking permanently.  Therefore it doesn't
1176   // time out until 40ms + timeout.
1177   //
1178   // I haven't fully verified the cause of this, but I believe it probably
1179   // occurs because the receiving end delays sending an ack for up to 40ms.
1180   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
1181   // the ack, it can send some more data.  However, after that point the
1182   // receiver's kernel buffer is full.  This 40ms delay happens even with
1183   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
1184   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1185   // data.
1186   //
1187   // For now, we simply check that the timeout occurred within 160ms of
1188   // the requested value.
1189   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1190 }
1191
1192 /**
1193  * Test writing to a socket that the remote endpoint has closed
1194  */
1195 TEST(AsyncSocketTest, WritePipeError) {
1196   TestServer server;
1197
1198   // connect()
1199   EventBase evb;
1200   std::shared_ptr<AsyncSocket> socket =
1201     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1202   socket->setSendTimeout(1000);
1203   evb.loop(); // loop until the socket is connected
1204
1205   // accept and immediately close the socket
1206   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1207   acceptedSocket.reset();
1208
1209   // write() a large chunk of data
1210   size_t writeLength = 8*1024*1024;
1211   scoped_array<char> buf(new char[writeLength]);
1212   memset(buf.get(), 'a', writeLength);
1213   WriteCallback wcb;
1214   socket->write(&wcb, buf.get(), writeLength);
1215
1216   evb.loop();
1217
1218   // Make sure the write failed.
1219   // It would be nice if AsyncSocketException could convey the errno value,
1220   // so that we could check for EPIPE
1221   CHECK_EQ(wcb.state, STATE_FAILED);
1222   CHECK_EQ(wcb.exception.getType(),
1223                     AsyncSocketException::INTERNAL_ERROR);
1224 }
1225
1226 /**
1227  * Test writing a mix of simple buffers and IOBufs
1228  */
1229 TEST(AsyncSocketTest, WriteIOBuf) {
1230   TestServer server;
1231
1232   // connect()
1233   EventBase evb;
1234   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1235   ConnCallback ccb;
1236   socket->connect(&ccb, server.getAddress(), 30);
1237
1238   // Accept the connection
1239   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1240   ReadCallback rcb;
1241   acceptedSocket->setReadCB(&rcb);
1242
1243   // Write a simple buffer to the socket
1244   size_t simpleBufLength = 5;
1245   char simpleBuf[simpleBufLength];
1246   memset(simpleBuf, 'a', simpleBufLength);
1247   WriteCallback wcb;
1248   socket->write(&wcb, simpleBuf, simpleBufLength);
1249
1250   // Write a single-element IOBuf chain
1251   size_t buf1Length = 7;
1252   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1253   memset(buf1->writableData(), 'b', buf1Length);
1254   buf1->append(buf1Length);
1255   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1256   WriteCallback wcb2;
1257   socket->writeChain(&wcb2, std::move(buf1));
1258
1259   // Write a multiple-element IOBuf chain
1260   size_t buf2Length = 11;
1261   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1262   memset(buf2->writableData(), 'c', buf2Length);
1263   buf2->append(buf2Length);
1264   size_t buf3Length = 13;
1265   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1266   memset(buf3->writableData(), 'd', buf3Length);
1267   buf3->append(buf3Length);
1268   buf2->appendChain(std::move(buf3));
1269   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1270   buf2Copy->coalesce();
1271   WriteCallback wcb3;
1272   socket->writeChain(&wcb3, std::move(buf2));
1273   socket->shutdownWrite();
1274
1275   // Let the reads and writes run to completion
1276   evb.loop();
1277
1278   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1279   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1280   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1281
1282   // Make sure the reader got the right data in the right order
1283   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1284   CHECK_EQ(rcb.buffers.size(), 1);
1285   CHECK_EQ(rcb.buffers[0].length,
1286       simpleBufLength + buf1Length + buf2Length + buf3Length);
1287   CHECK_EQ(
1288       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1289   CHECK_EQ(
1290       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1291           buf1Copy->data(), buf1Copy->length()), 0);
1292   CHECK_EQ(
1293       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1294           buf2Copy->data(), buf2Copy->length()), 0);
1295
1296   acceptedSocket->close();
1297   socket->close();
1298 }
1299
1300 TEST(AsyncSocketTest, WriteIOBufCorked) {
1301   TestServer server;
1302
1303   // connect()
1304   EventBase evb;
1305   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1306   ConnCallback ccb;
1307   socket->connect(&ccb, server.getAddress(), 30);
1308
1309   // Accept the connection
1310   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1311   ReadCallback rcb;
1312   acceptedSocket->setReadCB(&rcb);
1313
1314   // Do three writes, 100ms apart, with the "cork" flag set
1315   // on the second write.  The reader should see the first write
1316   // arrive by itself, followed by the second and third writes
1317   // arriving together.
1318   size_t buf1Length = 5;
1319   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1320   memset(buf1->writableData(), 'a', buf1Length);
1321   buf1->append(buf1Length);
1322   size_t buf2Length = 7;
1323   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1324   memset(buf2->writableData(), 'b', buf2Length);
1325   buf2->append(buf2Length);
1326   size_t buf3Length = 11;
1327   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1328   memset(buf3->writableData(), 'c', buf3Length);
1329   buf3->append(buf3Length);
1330   WriteCallback wcb1;
1331   socket->writeChain(&wcb1, std::move(buf1));
1332   WriteCallback wcb2;
1333   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1334   write2.scheduleTimeout(100);
1335   WriteCallback wcb3;
1336   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1337   write3.scheduleTimeout(200);
1338
1339   evb.loop();
1340   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1341   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1342   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1343   if (wcb3.state != STATE_SUCCEEDED) {
1344     throw(wcb3.exception);
1345   }
1346   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1347
1348   // Make sure the reader got the data with the right grouping
1349   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1350   CHECK_EQ(rcb.buffers.size(), 2);
1351   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1352   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1353
1354   acceptedSocket->close();
1355   socket->close();
1356 }
1357
1358 /**
1359  * Test performing a zero-length write
1360  */
1361 TEST(AsyncSocketTest, ZeroLengthWrite) {
1362   TestServer server;
1363
1364   // connect()
1365   EventBase evb;
1366   std::shared_ptr<AsyncSocket> socket =
1367     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1368   evb.loop(); // loop until the socket is connected
1369
1370   auto acceptedSocket = server.acceptAsync(&evb);
1371   ReadCallback rcb;
1372   acceptedSocket->setReadCB(&rcb);
1373
1374   size_t len1 = 1024*1024;
1375   size_t len2 = 1024*1024;
1376   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1377   memset(buf.get(), 'a', len1);
1378   memset(buf.get(), 'b', len2);
1379
1380   WriteCallback wcb1;
1381   WriteCallback wcb2;
1382   WriteCallback wcb3;
1383   WriteCallback wcb4;
1384   socket->write(&wcb1, buf.get(), 0);
1385   socket->write(&wcb2, buf.get(), len1);
1386   socket->write(&wcb3, buf.get() + len1, 0);
1387   socket->write(&wcb4, buf.get() + len1, len2);
1388   socket->close();
1389
1390   evb.loop(); // loop until the data is sent
1391
1392   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1393   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1394   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1395   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1396   rcb.verifyData(buf.get(), len1 + len2);
1397 }
1398
1399 TEST(AsyncSocketTest, ZeroLengthWritev) {
1400   TestServer server;
1401
1402   // connect()
1403   EventBase evb;
1404   std::shared_ptr<AsyncSocket> socket =
1405     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1406   evb.loop(); // loop until the socket is connected
1407
1408   auto acceptedSocket = server.acceptAsync(&evb);
1409   ReadCallback rcb;
1410   acceptedSocket->setReadCB(&rcb);
1411
1412   size_t len1 = 1024*1024;
1413   size_t len2 = 1024*1024;
1414   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1415   memset(buf.get(), 'a', len1);
1416   memset(buf.get(), 'b', len2);
1417
1418   WriteCallback wcb;
1419   size_t iovCount = 4;
1420   struct iovec iov[iovCount];
1421   iov[0].iov_base = buf.get();
1422   iov[0].iov_len = len1;
1423   iov[1].iov_base = buf.get() + len1;
1424   iov[1].iov_len = 0;
1425   iov[2].iov_base = buf.get() + len1;
1426   iov[2].iov_len = len2;
1427   iov[3].iov_base = buf.get() + len1 + len2;
1428   iov[3].iov_len = 0;
1429
1430   socket->writev(&wcb, iov, iovCount);
1431   socket->close();
1432   evb.loop(); // loop until the data is sent
1433
1434   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1435   rcb.verifyData(buf.get(), len1 + len2);
1436 }
1437
1438 ///////////////////////////////////////////////////////////////////////////
1439 // close() related tests
1440 ///////////////////////////////////////////////////////////////////////////
1441
1442 /**
1443  * Test calling close() with pending writes when the socket is already closing.
1444  */
1445 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1446   TestServer server;
1447
1448   // connect()
1449   EventBase evb;
1450   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1451   ConnCallback ccb;
1452   socket->connect(&ccb, server.getAddress(), 30);
1453
1454   // accept the socket on the server side
1455   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1456
1457   // Loop to ensure the connect has completed
1458   evb.loop();
1459
1460   // Make sure we are connected
1461   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1462
1463   // Schedule pending writes, until several write attempts have blocked
1464   char buf[128];
1465   memset(buf, 'a', sizeof(buf));
1466   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1467   WriteCallbackVector writeCallbacks;
1468
1469   writeCallbacks.reserve(5);
1470   while (writeCallbacks.size() < 5) {
1471     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1472
1473     socket->write(wcb.get(), buf, sizeof(buf));
1474     if (wcb->state == STATE_SUCCEEDED) {
1475       // Succeeded immediately.  Keep performing more writes
1476       continue;
1477     }
1478
1479     // This write is blocked.
1480     // Have the write callback call close() when writeError() is invoked
1481     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1482     writeCallbacks.push_back(wcb);
1483   }
1484
1485   // Call closeNow() to immediately fail the pending writes
1486   socket->closeNow();
1487
1488   // Make sure writeError() was invoked on all of the pending write callbacks
1489   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1490        it != writeCallbacks.end();
1491        ++it) {
1492     CHECK_EQ((*it)->state, STATE_FAILED);
1493   }
1494 }
1495
1496
1497 // TODO:
1498 // - Test connect() and have the connect callback set the read callback
1499 // - Test connect() and have the connect callback unset the read callback
1500 // - Test reading/writing/closing/destroying the socket in the connect callback
1501 // - Test reading/writing/closing/destroying the socket in the read callback
1502 // - Test reading/writing/closing/destroying the socket in the write callback
1503 // - Test one-way shutdown behavior
1504 // - Test changing the EventBase
1505 //
1506 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1507 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1508
1509
1510 ///////////////////////////////////////////////////////////////////////////
1511 // AsyncServerSocket tests
1512 ///////////////////////////////////////////////////////////////////////////
1513
1514 /**
1515  * Helper AcceptCallback class for the test code
1516  * It records the callbacks that were invoked, and also supports calling
1517  * generic std::function objects in each callback.
1518  */
1519 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1520  public:
1521   enum EventType {
1522     TYPE_START,
1523     TYPE_ACCEPT,
1524     TYPE_ERROR,
1525     TYPE_STOP
1526   };
1527   struct EventInfo {
1528     EventInfo(int fd, const folly::SocketAddress& addr)
1529       : type(TYPE_ACCEPT),
1530         fd(fd),
1531         address(addr),
1532         errorMsg() {}
1533     explicit EventInfo(const std::string& msg)
1534       : type(TYPE_ERROR),
1535         fd(-1),
1536         address(),
1537         errorMsg(msg) {}
1538     explicit EventInfo(EventType et)
1539       : type(et),
1540         fd(-1),
1541         address(),
1542         errorMsg() {}
1543
1544     EventType type;
1545     int fd;  // valid for TYPE_ACCEPT
1546     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1547     string errorMsg;  // valid for TYPE_ERROR
1548   };
1549   typedef std::deque<EventInfo> EventList;
1550
1551   TestAcceptCallback()
1552     : connectionAcceptedFn_(),
1553       acceptErrorFn_(),
1554       acceptStoppedFn_(),
1555       events_() {}
1556
1557   std::deque<EventInfo>* getEvents() {
1558     return &events_;
1559   }
1560
1561   void setConnectionAcceptedFn(
1562       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1563     connectionAcceptedFn_ = fn;
1564   }
1565   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1566     acceptErrorFn_ = fn;
1567   }
1568   void setAcceptStartedFn(const std::function<void()>& fn) {
1569     acceptStartedFn_ = fn;
1570   }
1571   void setAcceptStoppedFn(const std::function<void()>& fn) {
1572     acceptStoppedFn_ = fn;
1573   }
1574
1575   void connectionAccepted(int fd, const folly::SocketAddress& clientAddr)
1576       noexcept {
1577     events_.push_back(EventInfo(fd, clientAddr));
1578
1579     if (connectionAcceptedFn_) {
1580       connectionAcceptedFn_(fd, clientAddr);
1581     }
1582   }
1583   void acceptError(const std::exception& ex) noexcept {
1584     events_.push_back(EventInfo(ex.what()));
1585
1586     if (acceptErrorFn_) {
1587       acceptErrorFn_(ex);
1588     }
1589   }
1590   void acceptStarted() noexcept {
1591     events_.push_back(EventInfo(TYPE_START));
1592
1593     if (acceptStartedFn_) {
1594       acceptStartedFn_();
1595     }
1596   }
1597   void acceptStopped() noexcept {
1598     events_.push_back(EventInfo(TYPE_STOP));
1599
1600     if (acceptStoppedFn_) {
1601       acceptStoppedFn_();
1602     }
1603   }
1604
1605  private:
1606   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1607   std::function<void(const std::exception&)> acceptErrorFn_;
1608   std::function<void()> acceptStartedFn_;
1609   std::function<void()> acceptStoppedFn_;
1610
1611   std::deque<EventInfo> events_;
1612 };
1613
1614 /**
1615  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1616  */
1617 TEST(AsyncSocketTest, ServerAcceptOptions) {
1618   EventBase eventBase;
1619
1620   // Create a server socket
1621   std::shared_ptr<AsyncServerSocket> serverSocket(
1622       AsyncServerSocket::newSocket(&eventBase));
1623   serverSocket->bind(0);
1624   serverSocket->listen(16);
1625   folly::SocketAddress serverAddress;
1626   serverSocket->getAddress(&serverAddress);
1627
1628   // Add a callback to accept one connection then stop the loop
1629   TestAcceptCallback acceptCallback;
1630   acceptCallback.setConnectionAcceptedFn(
1631     [&](int fd, const folly::SocketAddress& addr) {
1632       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1633     });
1634   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1635     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1636   });
1637   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1638   serverSocket->startAccepting();
1639
1640   // Connect to the server socket
1641   std::shared_ptr<AsyncSocket> socket(
1642       AsyncSocket::newSocket(&eventBase, serverAddress));
1643
1644   eventBase.loop();
1645
1646   // Verify that the server accepted a connection
1647   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1648   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1649                     TestAcceptCallback::TYPE_START);
1650   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1651                     TestAcceptCallback::TYPE_ACCEPT);
1652   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1653                     TestAcceptCallback::TYPE_STOP);
1654   int fd = acceptCallback.getEvents()->at(1).fd;
1655
1656   // The accepted connection should already be in non-blocking mode
1657   int flags = fcntl(fd, F_GETFL, 0);
1658   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1659
1660 #ifndef TCP_NOPUSH
1661   // The accepted connection should already have TCP_NODELAY set
1662   int value;
1663   socklen_t valueLength = sizeof(value);
1664   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1665   CHECK_EQ(rc, 0);
1666   CHECK_EQ(value, 1);
1667 #endif
1668 }
1669
1670 /**
1671  * Test AsyncServerSocket::removeAcceptCallback()
1672  */
1673 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1674   // Create a new AsyncServerSocket
1675   EventBase eventBase;
1676   std::shared_ptr<AsyncServerSocket> serverSocket(
1677       AsyncServerSocket::newSocket(&eventBase));
1678   serverSocket->bind(0);
1679   serverSocket->listen(16);
1680   folly::SocketAddress serverAddress;
1681   serverSocket->getAddress(&serverAddress);
1682
1683   // Add several accept callbacks
1684   TestAcceptCallback cb1;
1685   TestAcceptCallback cb2;
1686   TestAcceptCallback cb3;
1687   TestAcceptCallback cb4;
1688   TestAcceptCallback cb5;
1689   TestAcceptCallback cb6;
1690   TestAcceptCallback cb7;
1691
1692   // Test having callbacks remove other callbacks before them on the list,
1693   // after them on the list, or removing themselves.
1694   //
1695   // Have callback 2 remove callback 3 and callback 5 the first time it is
1696   // called.
1697   int cb2Count = 0;
1698   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1699       std::shared_ptr<AsyncSocket> sock2(
1700         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1701       });
1702   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1703     });
1704   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1705       std::shared_ptr<AsyncSocket> sock3(
1706         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1707     });
1708   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1709   std::shared_ptr<AsyncSocket> sock5(
1710       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1711
1712     });
1713   cb2.setConnectionAcceptedFn(
1714     [&](int fd, const folly::SocketAddress& addr) {
1715       if (cb2Count == 0) {
1716         serverSocket->removeAcceptCallback(&cb3, nullptr);
1717         serverSocket->removeAcceptCallback(&cb5, nullptr);
1718       }
1719       ++cb2Count;
1720     });
1721   // Have callback 6 remove callback 4 the first time it is called,
1722   // and destroy the server socket the second time it is called
1723   int cb6Count = 0;
1724   cb6.setConnectionAcceptedFn(
1725     [&](int fd, const folly::SocketAddress& addr) {
1726       if (cb6Count == 0) {
1727         serverSocket->removeAcceptCallback(&cb4, nullptr);
1728         std::shared_ptr<AsyncSocket> sock6(
1729           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1730         std::shared_ptr<AsyncSocket> sock7(
1731           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1732         std::shared_ptr<AsyncSocket> sock8(
1733           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1734
1735       } else {
1736         serverSocket.reset();
1737       }
1738       ++cb6Count;
1739     });
1740   // Have callback 7 remove itself
1741   cb7.setConnectionAcceptedFn(
1742     [&](int fd, const folly::SocketAddress& addr) {
1743       serverSocket->removeAcceptCallback(&cb7, nullptr);
1744     });
1745
1746   serverSocket->addAcceptCallback(&cb1, nullptr);
1747   serverSocket->addAcceptCallback(&cb2, nullptr);
1748   serverSocket->addAcceptCallback(&cb3, nullptr);
1749   serverSocket->addAcceptCallback(&cb4, nullptr);
1750   serverSocket->addAcceptCallback(&cb5, nullptr);
1751   serverSocket->addAcceptCallback(&cb6, nullptr);
1752   serverSocket->addAcceptCallback(&cb7, nullptr);
1753   serverSocket->startAccepting();
1754
1755   // Make several connections to the socket
1756   std::shared_ptr<AsyncSocket> sock1(
1757       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1758   std::shared_ptr<AsyncSocket> sock4(
1759       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1760
1761   // Loop until we are stopped
1762   eventBase.loop();
1763
1764   // Check to make sure that the expected callbacks were invoked.
1765   //
1766   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1767   // the AcceptCallbacks in round-robin fashion, in the order that they were
1768   // added.  The code is implemented this way right now, but the API doesn't
1769   // explicitly require it be done this way.  If we change the code not to be
1770   // exactly round robin in the future, we can simplify the test checks here.
1771   // (We'll also need to update the termination code, since we expect cb6 to
1772   // get called twice to terminate the loop.)
1773   CHECK_EQ(cb1.getEvents()->size(), 4);
1774   CHECK_EQ(cb1.getEvents()->at(0).type,
1775                     TestAcceptCallback::TYPE_START);
1776   CHECK_EQ(cb1.getEvents()->at(1).type,
1777                     TestAcceptCallback::TYPE_ACCEPT);
1778   CHECK_EQ(cb1.getEvents()->at(2).type,
1779                     TestAcceptCallback::TYPE_ACCEPT);
1780   CHECK_EQ(cb1.getEvents()->at(3).type,
1781                     TestAcceptCallback::TYPE_STOP);
1782
1783   CHECK_EQ(cb2.getEvents()->size(), 4);
1784   CHECK_EQ(cb2.getEvents()->at(0).type,
1785                     TestAcceptCallback::TYPE_START);
1786   CHECK_EQ(cb2.getEvents()->at(1).type,
1787                     TestAcceptCallback::TYPE_ACCEPT);
1788   CHECK_EQ(cb2.getEvents()->at(2).type,
1789                     TestAcceptCallback::TYPE_ACCEPT);
1790   CHECK_EQ(cb2.getEvents()->at(3).type,
1791                     TestAcceptCallback::TYPE_STOP);
1792
1793   CHECK_EQ(cb3.getEvents()->size(), 2);
1794   CHECK_EQ(cb3.getEvents()->at(0).type,
1795                     TestAcceptCallback::TYPE_START);
1796   CHECK_EQ(cb3.getEvents()->at(1).type,
1797                     TestAcceptCallback::TYPE_STOP);
1798
1799   CHECK_EQ(cb4.getEvents()->size(), 3);
1800   CHECK_EQ(cb4.getEvents()->at(0).type,
1801                     TestAcceptCallback::TYPE_START);
1802   CHECK_EQ(cb4.getEvents()->at(1).type,
1803                     TestAcceptCallback::TYPE_ACCEPT);
1804   CHECK_EQ(cb4.getEvents()->at(2).type,
1805                     TestAcceptCallback::TYPE_STOP);
1806
1807   CHECK_EQ(cb5.getEvents()->size(), 2);
1808   CHECK_EQ(cb5.getEvents()->at(0).type,
1809                     TestAcceptCallback::TYPE_START);
1810   CHECK_EQ(cb5.getEvents()->at(1).type,
1811                     TestAcceptCallback::TYPE_STOP);
1812
1813   CHECK_EQ(cb6.getEvents()->size(), 4);
1814   CHECK_EQ(cb6.getEvents()->at(0).type,
1815                     TestAcceptCallback::TYPE_START);
1816   CHECK_EQ(cb6.getEvents()->at(1).type,
1817                     TestAcceptCallback::TYPE_ACCEPT);
1818   CHECK_EQ(cb6.getEvents()->at(2).type,
1819                     TestAcceptCallback::TYPE_ACCEPT);
1820   CHECK_EQ(cb6.getEvents()->at(3).type,
1821                     TestAcceptCallback::TYPE_STOP);
1822
1823   CHECK_EQ(cb7.getEvents()->size(), 3);
1824   CHECK_EQ(cb7.getEvents()->at(0).type,
1825                     TestAcceptCallback::TYPE_START);
1826   CHECK_EQ(cb7.getEvents()->at(1).type,
1827                     TestAcceptCallback::TYPE_ACCEPT);
1828   CHECK_EQ(cb7.getEvents()->at(2).type,
1829                     TestAcceptCallback::TYPE_STOP);
1830 }
1831
1832 /**
1833  * Test AsyncServerSocket::removeAcceptCallback()
1834  */
1835 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1836   // Create a new AsyncServerSocket
1837   EventBase eventBase;
1838   std::shared_ptr<AsyncServerSocket> serverSocket(
1839       AsyncServerSocket::newSocket(&eventBase));
1840   serverSocket->bind(0);
1841   serverSocket->listen(16);
1842   folly::SocketAddress serverAddress;
1843   serverSocket->getAddress(&serverAddress);
1844
1845   // Add several accept callbacks
1846   TestAcceptCallback cb1;
1847   auto thread_id = pthread_self();
1848   cb1.setAcceptStartedFn([&](){
1849     CHECK_NE(thread_id, pthread_self());
1850     thread_id = pthread_self();
1851   });
1852   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1853     CHECK_EQ(thread_id, pthread_self());
1854     serverSocket->removeAcceptCallback(&cb1, nullptr);
1855   });
1856   cb1.setAcceptStoppedFn([&](){
1857     CHECK_EQ(thread_id, pthread_self());
1858   });
1859
1860   // Test having callbacks remove other callbacks before them on the list,
1861   serverSocket->addAcceptCallback(&cb1, nullptr);
1862   serverSocket->startAccepting();
1863
1864   // Make several connections to the socket
1865   std::shared_ptr<AsyncSocket> sock1(
1866       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1867
1868   // Loop in another thread
1869   auto other = std::thread([&](){
1870     eventBase.loop();
1871   });
1872   other.join();
1873
1874   // Check to make sure that the expected callbacks were invoked.
1875   //
1876   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1877   // the AcceptCallbacks in round-robin fashion, in the order that they were
1878   // added.  The code is implemented this way right now, but the API doesn't
1879   // explicitly require it be done this way.  If we change the code not to be
1880   // exactly round robin in the future, we can simplify the test checks here.
1881   // (We'll also need to update the termination code, since we expect cb6 to
1882   // get called twice to terminate the loop.)
1883   CHECK_EQ(cb1.getEvents()->size(), 3);
1884   CHECK_EQ(cb1.getEvents()->at(0).type,
1885                     TestAcceptCallback::TYPE_START);
1886   CHECK_EQ(cb1.getEvents()->at(1).type,
1887                     TestAcceptCallback::TYPE_ACCEPT);
1888   CHECK_EQ(cb1.getEvents()->at(2).type,
1889                     TestAcceptCallback::TYPE_STOP);
1890
1891 }
1892
1893 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1894   // Add a callback to accept one connection then stop accepting
1895   TestAcceptCallback acceptCallback;
1896   acceptCallback.setConnectionAcceptedFn(
1897     [&](int fd, const folly::SocketAddress& addr) {
1898       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1899     });
1900   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1901     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1902   });
1903   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1904   serverSocket->startAccepting();
1905
1906   // Connect to the server socket
1907   EventBase* eventBase = serverSocket->getEventBase();
1908   folly::SocketAddress serverAddress;
1909   serverSocket->getAddress(&serverAddress);
1910   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1911
1912   // Loop to process all events
1913   eventBase->loop();
1914
1915   // Verify that the server accepted a connection
1916   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1917   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1918                     TestAcceptCallback::TYPE_START);
1919   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1920                     TestAcceptCallback::TYPE_ACCEPT);
1921   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1922                     TestAcceptCallback::TYPE_STOP);
1923 }
1924
1925 /* Verify that we don't leak sockets if we are destroyed()
1926  * and there are still writes pending
1927  *
1928  * If destroy() only calls close() instead of closeNow(),
1929  * it would shutdown(writes) on the socket, but it would
1930  * never be close()'d, and the socket would leak
1931  */
1932 TEST(AsyncSocketTest, DestroyCloseTest) {
1933   TestServer server;
1934
1935   // connect()
1936   EventBase clientEB;
1937   EventBase serverEB;
1938   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1939   ConnCallback ccb;
1940   socket->connect(&ccb, server.getAddress(), 30);
1941
1942   // Accept the connection
1943   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1944   ReadCallback rcb;
1945   acceptedSocket->setReadCB(&rcb);
1946
1947   // Write a large buffer to the socket that is larger than kernel buffer
1948   size_t simpleBufLength = 5000000;
1949   char* simpleBuf = new char[simpleBufLength];
1950   memset(simpleBuf, 'a', simpleBufLength);
1951   WriteCallback wcb;
1952
1953   // Let the reads and writes run to completion
1954   int fd = acceptedSocket->getFd();
1955
1956   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1957   socket.reset();
1958   acceptedSocket.reset();
1959
1960   // Test that server socket was closed
1961   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1962   CHECK_EQ(sz, -1);
1963   CHECK_EQ(errno, 9);
1964   delete[] simpleBuf;
1965 }
1966
1967 /**
1968  * Test AsyncServerSocket::useExistingSocket()
1969  */
1970 TEST(AsyncSocketTest, ServerExistingSocket) {
1971   EventBase eventBase;
1972
1973   // Test creating a socket, and letting AsyncServerSocket bind and listen
1974   {
1975     // Manually create a socket
1976     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1977     ASSERT_GE(fd, 0);
1978
1979     // Create a server socket
1980     AsyncServerSocket::UniquePtr serverSocket(
1981         new AsyncServerSocket(&eventBase));
1982     serverSocket->useExistingSocket(fd);
1983     folly::SocketAddress address;
1984     serverSocket->getAddress(&address);
1985     address.setPort(0);
1986     serverSocket->bind(address);
1987     serverSocket->listen(16);
1988
1989     // Make sure the socket works
1990     serverSocketSanityTest(serverSocket.get());
1991   }
1992
1993   // Test creating a socket and binding manually,
1994   // then letting AsyncServerSocket listen
1995   {
1996     // Manually create a socket
1997     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1998     ASSERT_GE(fd, 0);
1999     // bind
2000     struct sockaddr_in addr;
2001     addr.sin_family = AF_INET;
2002     addr.sin_port = 0;
2003     addr.sin_addr.s_addr = INADDR_ANY;
2004     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2005                              sizeof(addr)), 0);
2006     // Look up the address that we bound to
2007     folly::SocketAddress boundAddress;
2008     boundAddress.setFromLocalAddress(fd);
2009
2010     // Create a server socket
2011     AsyncServerSocket::UniquePtr serverSocket(
2012         new AsyncServerSocket(&eventBase));
2013     serverSocket->useExistingSocket(fd);
2014     serverSocket->listen(16);
2015
2016     // Make sure AsyncServerSocket reports the same address that we bound to
2017     folly::SocketAddress serverSocketAddress;
2018     serverSocket->getAddress(&serverSocketAddress);
2019     CHECK_EQ(boundAddress, serverSocketAddress);
2020
2021     // Make sure the socket works
2022     serverSocketSanityTest(serverSocket.get());
2023   }
2024
2025   // Test creating a socket, binding and listening manually,
2026   // then giving it to AsyncServerSocket
2027   {
2028     // Manually create a socket
2029     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2030     ASSERT_GE(fd, 0);
2031     // bind
2032     struct sockaddr_in addr;
2033     addr.sin_family = AF_INET;
2034     addr.sin_port = 0;
2035     addr.sin_addr.s_addr = INADDR_ANY;
2036     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2037                              sizeof(addr)), 0);
2038     // Look up the address that we bound to
2039     folly::SocketAddress boundAddress;
2040     boundAddress.setFromLocalAddress(fd);
2041     // listen
2042     CHECK_EQ(listen(fd, 16), 0);
2043
2044     // Create a server socket
2045     AsyncServerSocket::UniquePtr serverSocket(
2046         new AsyncServerSocket(&eventBase));
2047     serverSocket->useExistingSocket(fd);
2048
2049     // Make sure AsyncServerSocket reports the same address that we bound to
2050     folly::SocketAddress serverSocketAddress;
2051     serverSocket->getAddress(&serverSocketAddress);
2052     CHECK_EQ(boundAddress, serverSocketAddress);
2053
2054     // Make sure the socket works
2055     serverSocketSanityTest(serverSocket.get());
2056   }
2057 }
2058
2059 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2060   EventBase eventBase;
2061
2062   // Create a server socket
2063   std::shared_ptr<AsyncServerSocket> serverSocket(
2064       AsyncServerSocket::newSocket(&eventBase));
2065   string path(1, 0);
2066   path.append("/anonymous");
2067   folly::SocketAddress serverAddress;
2068   serverAddress.setFromPath(path);
2069   serverSocket->bind(serverAddress);
2070   serverSocket->listen(16);
2071
2072   // Add a callback to accept one connection then stop the loop
2073   TestAcceptCallback acceptCallback;
2074   acceptCallback.setConnectionAcceptedFn(
2075     [&](int fd, const folly::SocketAddress& addr) {
2076       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2077     });
2078   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2079     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2080   });
2081   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2082   serverSocket->startAccepting();
2083
2084   // Connect to the server socket
2085   std::shared_ptr<AsyncSocket> socket(
2086       AsyncSocket::newSocket(&eventBase, serverAddress));
2087
2088   eventBase.loop();
2089
2090   // Verify that the server accepted a connection
2091   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2092   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2093                     TestAcceptCallback::TYPE_START);
2094   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2095                     TestAcceptCallback::TYPE_ACCEPT);
2096   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2097                     TestAcceptCallback::TYPE_STOP);
2098   int fd = acceptCallback.getEvents()->at(1).fd;
2099
2100   // The accepted connection should already be in non-blocking mode
2101   int flags = fcntl(fd, F_GETFL, 0);
2102   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2103 }