Check readCallback before calling handleRead
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25
26 #include <gtest/gtest.h>
27 #include <boost/scoped_array.hpp>
28 #include <iostream>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <poll.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/tcp.h>
35 #include <thread>
36
37 using namespace boost;
38
39 using std::string;
40 using std::vector;
41 using std::min;
42 using std::cerr;
43 using std::endl;
44 using std::unique_ptr;
45 using std::chrono::milliseconds;
46 using boost::scoped_array;
47
48 using namespace folly;
49
50 class DelayedWrite: public AsyncTimeout {
51  public:
52   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
53       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
54       bool cork, bool lastWrite = false):
55     AsyncTimeout(socket->getEventBase()),
56     socket_(socket),
57     bufs_(std::move(bufs)),
58     wcb_(wcb),
59     cork_(cork),
60     lastWrite_(lastWrite) {}
61
62  private:
63   void timeoutExpired() noexcept override {
64     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
65     socket_->writeChain(wcb_, std::move(bufs_), flags);
66     if (lastWrite_) {
67       socket_->shutdownWrite();
68     }
69   }
70
71   std::shared_ptr<AsyncSocket> socket_;
72   unique_ptr<IOBuf> bufs_;
73   AsyncTransportWrapper::WriteCallback* wcb_;
74   bool cork_;
75   bool lastWrite_;
76 };
77
78 ///////////////////////////////////////////////////////////////////////////
79 // connect() tests
80 ///////////////////////////////////////////////////////////////////////////
81
82 /**
83  * Test connecting to a server
84  */
85 TEST(AsyncSocketTest, Connect) {
86   // Start listening on a local port
87   TestServer server;
88
89   // Connect using a AsyncSocket
90   EventBase evb;
91   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
92   ConnCallback cb;
93   socket->connect(&cb, server.getAddress(), 30);
94
95   evb.loop();
96
97   CHECK_EQ(cb.state, STATE_SUCCEEDED);
98 }
99
100 /**
101  * Test connecting to a server that isn't listening
102  */
103 TEST(AsyncSocketTest, ConnectRefused) {
104   EventBase evb;
105
106   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
107
108   // Hopefully nothing is actually listening on this address
109   folly::SocketAddress addr("127.0.0.1", 65535);
110   ConnCallback cb;
111   socket->connect(&cb, addr, 30);
112
113   evb.loop();
114
115   CHECK_EQ(cb.state, STATE_FAILED);
116   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
117 }
118
119 /**
120  * Test connection timeout
121  */
122 TEST(AsyncSocketTest, ConnectTimeout) {
123   EventBase evb;
124
125   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
126
127   // Try connecting to server that won't respond.
128   //
129   // This depends somewhat on the network where this test is run.
130   // Hopefully this IP will be routable but unresponsive.
131   // (Alternatively, we could try listening on a local raw socket, but that
132   // normally requires root privileges.)
133   folly::SocketAddress addr("8.8.8.8", 65535);
134   ConnCallback cb;
135   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
136
137   evb.loop();
138
139   CHECK_EQ(cb.state, STATE_FAILED);
140   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
141
142   // Verify that we can still get the peer address after a timeout.
143   // Use case is if the client was created from a client pool, and we want
144   // to log which peer failed.
145   folly::SocketAddress peer;
146   socket->getPeerAddress(&peer);
147   CHECK_EQ(peer, addr);
148 }
149
150 /**
151  * Test writing immediately after connecting, without waiting for connect
152  * to finish.
153  */
154 TEST(AsyncSocketTest, ConnectAndWrite) {
155   TestServer server;
156
157   // connect()
158   EventBase evb;
159   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
160   ConnCallback ccb;
161   socket->connect(&ccb, server.getAddress(), 30);
162
163   // write()
164   char buf[128];
165   memset(buf, 'a', sizeof(buf));
166   WriteCallback wcb;
167   socket->write(&wcb, buf, sizeof(buf));
168
169   // Loop.  We don't bother accepting on the server socket yet.
170   // The kernel should be able to buffer the write request so it can succeed.
171   evb.loop();
172
173   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
174   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
175
176   // Make sure the server got a connection and received the data
177   socket->close();
178   server.verifyConnection(buf, sizeof(buf));
179 }
180
181 /**
182  * Test connecting using a nullptr connect callback.
183  */
184 TEST(AsyncSocketTest, ConnectNullCallback) {
185   TestServer server;
186
187   // connect()
188   EventBase evb;
189   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
190   socket->connect(nullptr, server.getAddress(), 30);
191
192   // write some data, just so we have some way of verifing
193   // that the socket works correctly after connecting
194   char buf[128];
195   memset(buf, 'a', sizeof(buf));
196   WriteCallback wcb;
197   socket->write(&wcb, buf, sizeof(buf));
198
199   evb.loop();
200
201   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
202
203   // Make sure the server got a connection and received the data
204   socket->close();
205   server.verifyConnection(buf, sizeof(buf));
206 }
207
208 /**
209  * Test calling both write() and close() immediately after connecting, without
210  * waiting for connect to finish.
211  *
212  * This exercises the STATE_CONNECTING_CLOSING code.
213  */
214 TEST(AsyncSocketTest, ConnectWriteAndClose) {
215   TestServer server;
216
217   // connect()
218   EventBase evb;
219   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
220   ConnCallback ccb;
221   socket->connect(&ccb, server.getAddress(), 30);
222
223   // write()
224   char buf[128];
225   memset(buf, 'a', sizeof(buf));
226   WriteCallback wcb;
227   socket->write(&wcb, buf, sizeof(buf));
228
229   // close()
230   socket->close();
231
232   // Loop.  We don't bother accepting on the server socket yet.
233   // The kernel should be able to buffer the write request so it can succeed.
234   evb.loop();
235
236   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
237   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
238
239   // Make sure the server got a connection and received the data
240   server.verifyConnection(buf, sizeof(buf));
241 }
242
243 /**
244  * Test calling close() immediately after connect()
245  */
246 TEST(AsyncSocketTest, ConnectAndClose) {
247   TestServer server;
248
249   // Connect using a AsyncSocket
250   EventBase evb;
251   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
252   ConnCallback ccb;
253   socket->connect(&ccb, server.getAddress(), 30);
254
255   // Hopefully the connect didn't succeed immediately.
256   // If it did, we can't exercise the close-while-connecting code path.
257   if (ccb.state == STATE_SUCCEEDED) {
258     LOG(INFO) << "connect() succeeded immediately; aborting test "
259                        "of close-during-connect behavior";
260     return;
261   }
262
263   socket->close();
264
265   // Loop, although there shouldn't be anything to do.
266   evb.loop();
267
268   // Make sure the connection was aborted
269   CHECK_EQ(ccb.state, STATE_FAILED);
270 }
271
272 /**
273  * Test calling closeNow() immediately after connect()
274  *
275  * This should be identical to the normal close behavior.
276  */
277 TEST(AsyncSocketTest, ConnectAndCloseNow) {
278   TestServer server;
279
280   // Connect using a AsyncSocket
281   EventBase evb;
282   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
283   ConnCallback ccb;
284   socket->connect(&ccb, server.getAddress(), 30);
285
286   // Hopefully the connect didn't succeed immediately.
287   // If it did, we can't exercise the close-while-connecting code path.
288   if (ccb.state == STATE_SUCCEEDED) {
289     LOG(INFO) << "connect() succeeded immediately; aborting test "
290                        "of closeNow()-during-connect behavior";
291     return;
292   }
293
294   socket->closeNow();
295
296   // Loop, although there shouldn't be anything to do.
297   evb.loop();
298
299   // Make sure the connection was aborted
300   CHECK_EQ(ccb.state, STATE_FAILED);
301 }
302
303 /**
304  * Test calling both write() and closeNow() immediately after connecting,
305  * without waiting for connect to finish.
306  *
307  * This should abort the pending write.
308  */
309 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
310   TestServer server;
311
312   // connect()
313   EventBase evb;
314   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
315   ConnCallback ccb;
316   socket->connect(&ccb, server.getAddress(), 30);
317
318   // Hopefully the connect didn't succeed immediately.
319   // If it did, we can't exercise the close-while-connecting code path.
320   if (ccb.state == STATE_SUCCEEDED) {
321     LOG(INFO) << "connect() succeeded immediately; aborting test "
322                        "of write-during-connect behavior";
323     return;
324   }
325
326   // write()
327   char buf[128];
328   memset(buf, 'a', sizeof(buf));
329   WriteCallback wcb;
330   socket->write(&wcb, buf, sizeof(buf));
331
332   // close()
333   socket->closeNow();
334
335   // Loop, although there shouldn't be anything to do.
336   evb.loop();
337
338   CHECK_EQ(ccb.state, STATE_FAILED);
339   CHECK_EQ(wcb.state, STATE_FAILED);
340 }
341
342 /**
343  * Test installing a read callback immediately, before connect() finishes.
344  */
345 TEST(AsyncSocketTest, ConnectAndRead) {
346   TestServer server;
347
348   // connect()
349   EventBase evb;
350   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
351   ConnCallback ccb;
352   socket->connect(&ccb, server.getAddress(), 30);
353
354   ReadCallback rcb;
355   socket->setReadCB(&rcb);
356
357   // Even though we haven't looped yet, we should be able to accept
358   // the connection and send data to it.
359   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
360   uint8_t buf[128];
361   memset(buf, 'a', sizeof(buf));
362   acceptedSocket->write(buf, sizeof(buf));
363   acceptedSocket->flush();
364   acceptedSocket->close();
365
366   // Loop, although there shouldn't be anything to do.
367   evb.loop();
368
369   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
370   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
371   CHECK_EQ(rcb.buffers.size(), 1);
372   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
373   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
374 }
375
376 /**
377  * Test installing a read callback and then closing immediately before the
378  * connect attempt finishes.
379  */
380 TEST(AsyncSocketTest, ConnectReadAndClose) {
381   TestServer server;
382
383   // connect()
384   EventBase evb;
385   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
386   ConnCallback ccb;
387   socket->connect(&ccb, server.getAddress(), 30);
388
389   // Hopefully the connect didn't succeed immediately.
390   // If it did, we can't exercise the close-while-connecting code path.
391   if (ccb.state == STATE_SUCCEEDED) {
392     LOG(INFO) << "connect() succeeded immediately; aborting test "
393                        "of read-during-connect behavior";
394     return;
395   }
396
397   ReadCallback rcb;
398   socket->setReadCB(&rcb);
399
400   // close()
401   socket->close();
402
403   // Loop, although there shouldn't be anything to do.
404   evb.loop();
405
406   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
407   CHECK_EQ(rcb.buffers.size(), 0);
408   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
409 }
410
411 /**
412  * Test both writing and installing a read callback immediately,
413  * before connect() finishes.
414  */
415 TEST(AsyncSocketTest, ConnectWriteAndRead) {
416   TestServer server;
417
418   // connect()
419   EventBase evb;
420   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
421   ConnCallback ccb;
422   socket->connect(&ccb, server.getAddress(), 30);
423
424   // write()
425   char buf1[128];
426   memset(buf1, 'a', sizeof(buf1));
427   WriteCallback wcb;
428   socket->write(&wcb, buf1, sizeof(buf1));
429
430   // set a read callback
431   ReadCallback rcb;
432   socket->setReadCB(&rcb);
433
434   // Even though we haven't looped yet, we should be able to accept
435   // the connection and send data to it.
436   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
437   uint8_t buf2[128];
438   memset(buf2, 'b', sizeof(buf2));
439   acceptedSocket->write(buf2, sizeof(buf2));
440   acceptedSocket->flush();
441
442   // shut down the write half of acceptedSocket, so that the AsyncSocket
443   // will stop reading and we can break out of the event loop.
444   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
445
446   // Loop
447   evb.loop();
448
449   // Make sure the connect succeeded
450   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
451
452   // Make sure the AsyncSocket read the data written by the accepted socket
453   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
454   CHECK_EQ(rcb.buffers.size(), 1);
455   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
456   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
457
458   // Close the AsyncSocket so we'll see EOF on acceptedSocket
459   socket->close();
460
461   // Make sure the accepted socket saw the data written by the AsyncSocket
462   uint8_t readbuf[sizeof(buf1)];
463   acceptedSocket->readAll(readbuf, sizeof(readbuf));
464   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
465   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
466   CHECK_EQ(bytesRead, 0);
467 }
468
469 /**
470  * Test writing to the socket then shutting down writes before the connect
471  * attempt finishes.
472  */
473 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
474   TestServer server;
475
476   // connect()
477   EventBase evb;
478   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
479   ConnCallback ccb;
480   socket->connect(&ccb, server.getAddress(), 30);
481
482   // Hopefully the connect didn't succeed immediately.
483   // If it did, we can't exercise the write-while-connecting code path.
484   if (ccb.state == STATE_SUCCEEDED) {
485     LOG(INFO) << "connect() succeeded immediately; skipping test";
486     return;
487   }
488
489   // Ask to write some data
490   char wbuf[128];
491   memset(wbuf, 'a', sizeof(wbuf));
492   WriteCallback wcb;
493   socket->write(&wcb, wbuf, sizeof(wbuf));
494   socket->shutdownWrite();
495
496   // Shutdown writes
497   socket->shutdownWrite();
498
499   // Even though we haven't looped yet, we should be able to accept
500   // the connection.
501   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
502
503   // Since the connection is still in progress, there should be no data to
504   // read yet.  Verify that the accepted socket is not readable.
505   struct pollfd fds[1];
506   fds[0].fd = acceptedSocket->getSocketFD();
507   fds[0].events = POLLIN;
508   fds[0].revents = 0;
509   int rc = poll(fds, 1, 0);
510   CHECK_EQ(rc, 0);
511
512   // Write data to the accepted socket
513   uint8_t acceptedWbuf[192];
514   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
515   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
516   acceptedSocket->flush();
517
518   // Loop
519   evb.loop();
520
521   // The loop should have completed the connection, written the queued data,
522   // and shutdown writes on the socket.
523   //
524   // Check that the connection was completed successfully and that the write
525   // callback succeeded.
526   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
527   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
528
529   // Check that we can read the data that was written to the socket, and that
530   // we see an EOF, since its socket was half-shutdown.
531   uint8_t readbuf[sizeof(wbuf)];
532   acceptedSocket->readAll(readbuf, sizeof(readbuf));
533   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
534   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
535   CHECK_EQ(bytesRead, 0);
536
537   // Close the accepted socket.  This will cause it to see EOF
538   // and uninstall the read callback when we loop next.
539   acceptedSocket->close();
540
541   // Install a read callback, then loop again.
542   ReadCallback rcb;
543   socket->setReadCB(&rcb);
544   evb.loop();
545
546   // This loop should have read the data and seen the EOF
547   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
548   CHECK_EQ(rcb.buffers.size(), 1);
549   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
550   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
551                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
552 }
553
554 /**
555  * Test reading, writing, and shutting down writes before the connect attempt
556  * finishes.
557  */
558 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
559   TestServer server;
560
561   // connect()
562   EventBase evb;
563   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
564   ConnCallback ccb;
565   socket->connect(&ccb, server.getAddress(), 30);
566
567   // Hopefully the connect didn't succeed immediately.
568   // If it did, we can't exercise the write-while-connecting code path.
569   if (ccb.state == STATE_SUCCEEDED) {
570     LOG(INFO) << "connect() succeeded immediately; skipping test";
571     return;
572   }
573
574   // Install a read callback
575   ReadCallback rcb;
576   socket->setReadCB(&rcb);
577
578   // Ask to write some data
579   char wbuf[128];
580   memset(wbuf, 'a', sizeof(wbuf));
581   WriteCallback wcb;
582   socket->write(&wcb, wbuf, sizeof(wbuf));
583
584   // Shutdown writes
585   socket->shutdownWrite();
586
587   // Even though we haven't looped yet, we should be able to accept
588   // the connection.
589   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
590
591   // Since the connection is still in progress, there should be no data to
592   // read yet.  Verify that the accepted socket is not readable.
593   struct pollfd fds[1];
594   fds[0].fd = acceptedSocket->getSocketFD();
595   fds[0].events = POLLIN;
596   fds[0].revents = 0;
597   int rc = poll(fds, 1, 0);
598   CHECK_EQ(rc, 0);
599
600   // Write data to the accepted socket
601   uint8_t acceptedWbuf[192];
602   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
603   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
604   acceptedSocket->flush();
605   // Shutdown writes to the accepted socket.  This will cause it to see EOF
606   // and uninstall the read callback.
607   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
608
609   // Loop
610   evb.loop();
611
612   // The loop should have completed the connection, written the queued data,
613   // shutdown writes on the socket, read the data we wrote to it, and see the
614   // EOF.
615   //
616   // Check that the connection was completed successfully and that the read
617   // and write callbacks were invoked as expected.
618   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
619   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
620   CHECK_EQ(rcb.buffers.size(), 1);
621   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
622   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
623                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
624   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
625
626   // Check that we can read the data that was written to the socket, and that
627   // we see an EOF, since its socket was half-shutdown.
628   uint8_t readbuf[sizeof(wbuf)];
629   acceptedSocket->readAll(readbuf, sizeof(readbuf));
630   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
631   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
632   CHECK_EQ(bytesRead, 0);
633
634   // Fully close both sockets
635   acceptedSocket->close();
636   socket->close();
637 }
638
639 /**
640  * Test reading, writing, and calling shutdownWriteNow() before the
641  * connect attempt finishes.
642  */
643 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
644   TestServer server;
645
646   // connect()
647   EventBase evb;
648   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
649   ConnCallback ccb;
650   socket->connect(&ccb, server.getAddress(), 30);
651
652   // Hopefully the connect didn't succeed immediately.
653   // If it did, we can't exercise the write-while-connecting code path.
654   if (ccb.state == STATE_SUCCEEDED) {
655     LOG(INFO) << "connect() succeeded immediately; skipping test";
656     return;
657   }
658
659   // Install a read callback
660   ReadCallback rcb;
661   socket->setReadCB(&rcb);
662
663   // Ask to write some data
664   char wbuf[128];
665   memset(wbuf, 'a', sizeof(wbuf));
666   WriteCallback wcb;
667   socket->write(&wcb, wbuf, sizeof(wbuf));
668
669   // Shutdown writes immediately.
670   // This should immediately discard the data that we just tried to write.
671   socket->shutdownWriteNow();
672
673   // Verify that writeError() was invoked on the write callback.
674   CHECK_EQ(wcb.state, STATE_FAILED);
675   CHECK_EQ(wcb.bytesWritten, 0);
676
677   // Even though we haven't looped yet, we should be able to accept
678   // the connection.
679   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
680
681   // Since the connection is still in progress, there should be no data to
682   // read yet.  Verify that the accepted socket is not readable.
683   struct pollfd fds[1];
684   fds[0].fd = acceptedSocket->getSocketFD();
685   fds[0].events = POLLIN;
686   fds[0].revents = 0;
687   int rc = poll(fds, 1, 0);
688   CHECK_EQ(rc, 0);
689
690   // Write data to the accepted socket
691   uint8_t acceptedWbuf[192];
692   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
693   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
694   acceptedSocket->flush();
695   // Shutdown writes to the accepted socket.  This will cause it to see EOF
696   // and uninstall the read callback.
697   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
698
699   // Loop
700   evb.loop();
701
702   // The loop should have completed the connection, written the queued data,
703   // shutdown writes on the socket, read the data we wrote to it, and see the
704   // EOF.
705   //
706   // Check that the connection was completed successfully and that the read
707   // callback was invoked as expected.
708   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
709   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
710   CHECK_EQ(rcb.buffers.size(), 1);
711   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
712   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
713                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
714
715   // Since we used shutdownWriteNow(), it should have discarded all pending
716   // write data.  Verify we see an immediate EOF when reading from the accepted
717   // socket.
718   uint8_t readbuf[sizeof(wbuf)];
719   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
720   CHECK_EQ(bytesRead, 0);
721
722   // Fully close both sockets
723   acceptedSocket->close();
724   socket->close();
725 }
726
727 // Helper function for use in testConnectOptWrite()
728 // Temporarily disable the read callback
729 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
730   // Uninstall the read callback
731   socket->setReadCB(nullptr);
732   // Schedule the read callback to be reinstalled after 1ms
733   socket->getEventBase()->runInLoop(
734       std::bind(&AsyncSocket::setReadCB, socket, rcb));
735 }
736
737 /**
738  * Test connect+write, then have the connect callback perform another write.
739  *
740  * This tests interaction of the optimistic writing after connect with
741  * additional write attempts that occur in the connect callback.
742  */
743 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
744   TestServer server;
745   EventBase evb;
746   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
747
748   // connect()
749   ConnCallback ccb;
750   socket->connect(&ccb, server.getAddress(), 30);
751
752   // Hopefully the connect didn't succeed immediately.
753   // If it did, we can't exercise the optimistic write code path.
754   if (ccb.state == STATE_SUCCEEDED) {
755     LOG(INFO) << "connect() succeeded immediately; aborting test "
756                        "of optimistic write behavior";
757     return;
758   }
759
760   // Tell the connect callback to perform a write when the connect succeeds
761   WriteCallback wcb2;
762   scoped_array<char> buf2(new char[size2]);
763   memset(buf2.get(), 'b', size2);
764   if (size2 > 0) {
765     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
766     // Tell the second write callback to close the connection when it is done
767     wcb2.successCallback = [&] { socket->closeNow(); };
768   }
769
770   // Schedule one write() immediately, before the connect finishes
771   scoped_array<char> buf1(new char[size1]);
772   memset(buf1.get(), 'a', size1);
773   WriteCallback wcb1;
774   if (size1 > 0) {
775     socket->write(&wcb1, buf1.get(), size1);
776   }
777
778   if (close) {
779     // immediately perform a close, before connect() completes
780     socket->close();
781   }
782
783   // Start reading from the other endpoint after 10ms.
784   // If we're using large buffers, we have to read so that the writes don't
785   // block forever.
786   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
787   ReadCallback rcb;
788   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
789                                         acceptedSocket.get(), &rcb);
790   socket->getEventBase()->tryRunAfterDelay(
791       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
792       10);
793
794   // Loop.  We don't bother accepting on the server socket yet.
795   // The kernel should be able to buffer the write request so it can succeed.
796   evb.loop();
797
798   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
799   if (size1 > 0) {
800     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
801   }
802   if (size2 > 0) {
803     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
804   }
805
806   socket->close();
807
808   // Make sure the read callback received all of the data
809   size_t bytesRead = 0;
810   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
811        it != rcb.buffers.end();
812        ++it) {
813     size_t start = bytesRead;
814     bytesRead += it->length;
815     size_t end = bytesRead;
816     if (start < size1) {
817       size_t cmpLen = min(size1, end) - start;
818       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
819     }
820     if (end > size1 && end <= size1 + size2) {
821       size_t itOffset;
822       size_t buf2Offset;
823       size_t cmpLen;
824       if (start >= size1) {
825         itOffset = 0;
826         buf2Offset = start - size1;
827         cmpLen = end - start;
828       } else {
829         itOffset = size1 - start;
830         buf2Offset = 0;
831         cmpLen = end - size1;
832       }
833       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
834                                cmpLen),
835                         0);
836     }
837   }
838   CHECK_EQ(bytesRead, size1 + size2);
839 }
840
841 TEST(AsyncSocketTest, ConnectCallbackWrite) {
842   // Test using small writes that should both succeed immediately
843   testConnectOptWrite(100, 200);
844
845   // Test using a large buffer in the connect callback, that should block
846   const size_t largeSize = 8*1024*1024;
847   testConnectOptWrite(100, largeSize);
848
849   // Test using a large initial write
850   testConnectOptWrite(largeSize, 100);
851
852   // Test using two large buffers
853   testConnectOptWrite(largeSize, largeSize);
854
855   // Test a small write in the connect callback,
856   // but no immediate write before connect completes
857   testConnectOptWrite(0, 64);
858
859   // Test a large write in the connect callback,
860   // but no immediate write before connect completes
861   testConnectOptWrite(0, largeSize);
862
863   // Test connect, a small write, then immediately call close() before connect
864   // completes
865   testConnectOptWrite(211, 0, true);
866
867   // Test connect, a large immediate write (that will block), then immediately
868   // call close() before connect completes
869   testConnectOptWrite(largeSize, 0, true);
870 }
871
872 ///////////////////////////////////////////////////////////////////////////
873 // write() related tests
874 ///////////////////////////////////////////////////////////////////////////
875
876 /**
877  * Test writing using a nullptr callback
878  */
879 TEST(AsyncSocketTest, WriteNullCallback) {
880   TestServer server;
881
882   // connect()
883   EventBase evb;
884   std::shared_ptr<AsyncSocket> socket =
885     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
886   evb.loop(); // loop until the socket is connected
887
888   // write() with a nullptr callback
889   char buf[128];
890   memset(buf, 'a', sizeof(buf));
891   socket->write(nullptr, buf, sizeof(buf));
892
893   evb.loop(); // loop until the data is sent
894
895   // Make sure the server got a connection and received the data
896   socket->close();
897   server.verifyConnection(buf, sizeof(buf));
898 }
899
900 /**
901  * Test writing with a send timeout
902  */
903 TEST(AsyncSocketTest, WriteTimeout) {
904   TestServer server;
905
906   // connect()
907   EventBase evb;
908   std::shared_ptr<AsyncSocket> socket =
909     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
910   evb.loop(); // loop until the socket is connected
911
912   // write() a large chunk of data, with no-one on the other end reading
913   size_t writeLength = 8*1024*1024;
914   uint32_t timeout = 200;
915   socket->setSendTimeout(timeout);
916   scoped_array<char> buf(new char[writeLength]);
917   memset(buf.get(), 'a', writeLength);
918   WriteCallback wcb;
919   socket->write(&wcb, buf.get(), writeLength);
920
921   TimePoint start;
922   evb.loop();
923   TimePoint end;
924
925   // Make sure the write attempt timed out as requested
926   CHECK_EQ(wcb.state, STATE_FAILED);
927   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
928
929   // Check that the write timed out within a reasonable period of time.
930   // We don't check for exactly the specified timeout, since AsyncSocket only
931   // times out when it hasn't made progress for that period of time.
932   //
933   // On linux, the first write sends a few hundred kb of data, then blocks for
934   // writability, and then unblocks again after 40ms and is able to write
935   // another smaller of data before blocking permanently.  Therefore it doesn't
936   // time out until 40ms + timeout.
937   //
938   // I haven't fully verified the cause of this, but I believe it probably
939   // occurs because the receiving end delays sending an ack for up to 40ms.
940   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
941   // the ack, it can send some more data.  However, after that point the
942   // receiver's kernel buffer is full.  This 40ms delay happens even with
943   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
944   // kernel may be automatically disabling TCP_QUICKACK after receiving some
945   // data.
946   //
947   // For now, we simply check that the timeout occurred within 160ms of
948   // the requested value.
949   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
950 }
951
952 /**
953  * Test writing to a socket that the remote endpoint has closed
954  */
955 TEST(AsyncSocketTest, WritePipeError) {
956   TestServer server;
957
958   // connect()
959   EventBase evb;
960   std::shared_ptr<AsyncSocket> socket =
961     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
962   socket->setSendTimeout(1000);
963   evb.loop(); // loop until the socket is connected
964
965   // accept and immediately close the socket
966   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
967   acceptedSocket.reset();
968
969   // write() a large chunk of data
970   size_t writeLength = 8*1024*1024;
971   scoped_array<char> buf(new char[writeLength]);
972   memset(buf.get(), 'a', writeLength);
973   WriteCallback wcb;
974   socket->write(&wcb, buf.get(), writeLength);
975
976   evb.loop();
977
978   // Make sure the write failed.
979   // It would be nice if AsyncSocketException could convey the errno value,
980   // so that we could check for EPIPE
981   CHECK_EQ(wcb.state, STATE_FAILED);
982   CHECK_EQ(wcb.exception.getType(),
983                     AsyncSocketException::INTERNAL_ERROR);
984 }
985
986 /**
987  * Test writing a mix of simple buffers and IOBufs
988  */
989 TEST(AsyncSocketTest, WriteIOBuf) {
990   TestServer server;
991
992   // connect()
993   EventBase evb;
994   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
995   ConnCallback ccb;
996   socket->connect(&ccb, server.getAddress(), 30);
997
998   // Accept the connection
999   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1000   ReadCallback rcb;
1001   acceptedSocket->setReadCB(&rcb);
1002
1003   // Write a simple buffer to the socket
1004   size_t simpleBufLength = 5;
1005   char simpleBuf[simpleBufLength];
1006   memset(simpleBuf, 'a', simpleBufLength);
1007   WriteCallback wcb;
1008   socket->write(&wcb, simpleBuf, simpleBufLength);
1009
1010   // Write a single-element IOBuf chain
1011   size_t buf1Length = 7;
1012   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1013   memset(buf1->writableData(), 'b', buf1Length);
1014   buf1->append(buf1Length);
1015   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1016   WriteCallback wcb2;
1017   socket->writeChain(&wcb2, std::move(buf1));
1018
1019   // Write a multiple-element IOBuf chain
1020   size_t buf2Length = 11;
1021   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1022   memset(buf2->writableData(), 'c', buf2Length);
1023   buf2->append(buf2Length);
1024   size_t buf3Length = 13;
1025   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1026   memset(buf3->writableData(), 'd', buf3Length);
1027   buf3->append(buf3Length);
1028   buf2->appendChain(std::move(buf3));
1029   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1030   buf2Copy->coalesce();
1031   WriteCallback wcb3;
1032   socket->writeChain(&wcb3, std::move(buf2));
1033   socket->shutdownWrite();
1034
1035   // Let the reads and writes run to completion
1036   evb.loop();
1037
1038   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1039   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1040   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1041
1042   // Make sure the reader got the right data in the right order
1043   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1044   CHECK_EQ(rcb.buffers.size(), 1);
1045   CHECK_EQ(rcb.buffers[0].length,
1046       simpleBufLength + buf1Length + buf2Length + buf3Length);
1047   CHECK_EQ(
1048       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1049   CHECK_EQ(
1050       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1051           buf1Copy->data(), buf1Copy->length()), 0);
1052   CHECK_EQ(
1053       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1054           buf2Copy->data(), buf2Copy->length()), 0);
1055
1056   acceptedSocket->close();
1057   socket->close();
1058 }
1059
1060 TEST(AsyncSocketTest, WriteIOBufCorked) {
1061   TestServer server;
1062
1063   // connect()
1064   EventBase evb;
1065   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1066   ConnCallback ccb;
1067   socket->connect(&ccb, server.getAddress(), 30);
1068
1069   // Accept the connection
1070   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1071   ReadCallback rcb;
1072   acceptedSocket->setReadCB(&rcb);
1073
1074   // Do three writes, 100ms apart, with the "cork" flag set
1075   // on the second write.  The reader should see the first write
1076   // arrive by itself, followed by the second and third writes
1077   // arriving together.
1078   size_t buf1Length = 5;
1079   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1080   memset(buf1->writableData(), 'a', buf1Length);
1081   buf1->append(buf1Length);
1082   size_t buf2Length = 7;
1083   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1084   memset(buf2->writableData(), 'b', buf2Length);
1085   buf2->append(buf2Length);
1086   size_t buf3Length = 11;
1087   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1088   memset(buf3->writableData(), 'c', buf3Length);
1089   buf3->append(buf3Length);
1090   WriteCallback wcb1;
1091   socket->writeChain(&wcb1, std::move(buf1));
1092   WriteCallback wcb2;
1093   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1094   write2.scheduleTimeout(100);
1095   WriteCallback wcb3;
1096   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1097   write3.scheduleTimeout(200);
1098
1099   evb.loop();
1100   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1101   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1102   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1103   if (wcb3.state != STATE_SUCCEEDED) {
1104     throw(wcb3.exception);
1105   }
1106   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1107
1108   // Make sure the reader got the data with the right grouping
1109   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1110   CHECK_EQ(rcb.buffers.size(), 2);
1111   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1112   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1113
1114   acceptedSocket->close();
1115   socket->close();
1116 }
1117
1118 /**
1119  * Test performing a zero-length write
1120  */
1121 TEST(AsyncSocketTest, ZeroLengthWrite) {
1122   TestServer server;
1123
1124   // connect()
1125   EventBase evb;
1126   std::shared_ptr<AsyncSocket> socket =
1127     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1128   evb.loop(); // loop until the socket is connected
1129
1130   auto acceptedSocket = server.acceptAsync(&evb);
1131   ReadCallback rcb;
1132   acceptedSocket->setReadCB(&rcb);
1133
1134   size_t len1 = 1024*1024;
1135   size_t len2 = 1024*1024;
1136   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1137   memset(buf.get(), 'a', len1);
1138   memset(buf.get(), 'b', len2);
1139
1140   WriteCallback wcb1;
1141   WriteCallback wcb2;
1142   WriteCallback wcb3;
1143   WriteCallback wcb4;
1144   socket->write(&wcb1, buf.get(), 0);
1145   socket->write(&wcb2, buf.get(), len1);
1146   socket->write(&wcb3, buf.get() + len1, 0);
1147   socket->write(&wcb4, buf.get() + len1, len2);
1148   socket->close();
1149
1150   evb.loop(); // loop until the data is sent
1151
1152   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1153   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1154   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1155   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1156   rcb.verifyData(buf.get(), len1 + len2);
1157 }
1158
1159 TEST(AsyncSocketTest, ZeroLengthWritev) {
1160   TestServer server;
1161
1162   // connect()
1163   EventBase evb;
1164   std::shared_ptr<AsyncSocket> socket =
1165     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1166   evb.loop(); // loop until the socket is connected
1167
1168   auto acceptedSocket = server.acceptAsync(&evb);
1169   ReadCallback rcb;
1170   acceptedSocket->setReadCB(&rcb);
1171
1172   size_t len1 = 1024*1024;
1173   size_t len2 = 1024*1024;
1174   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1175   memset(buf.get(), 'a', len1);
1176   memset(buf.get(), 'b', len2);
1177
1178   WriteCallback wcb;
1179   size_t iovCount = 4;
1180   struct iovec iov[iovCount];
1181   iov[0].iov_base = buf.get();
1182   iov[0].iov_len = len1;
1183   iov[1].iov_base = buf.get() + len1;
1184   iov[1].iov_len = 0;
1185   iov[2].iov_base = buf.get() + len1;
1186   iov[2].iov_len = len2;
1187   iov[3].iov_base = buf.get() + len1 + len2;
1188   iov[3].iov_len = 0;
1189
1190   socket->writev(&wcb, iov, iovCount);
1191   socket->close();
1192   evb.loop(); // loop until the data is sent
1193
1194   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1195   rcb.verifyData(buf.get(), len1 + len2);
1196 }
1197
1198 ///////////////////////////////////////////////////////////////////////////
1199 // close() related tests
1200 ///////////////////////////////////////////////////////////////////////////
1201
1202 /**
1203  * Test calling close() with pending writes when the socket is already closing.
1204  */
1205 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1206   TestServer server;
1207
1208   // connect()
1209   EventBase evb;
1210   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1211   ConnCallback ccb;
1212   socket->connect(&ccb, server.getAddress(), 30);
1213
1214   // accept the socket on the server side
1215   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1216
1217   // Loop to ensure the connect has completed
1218   evb.loop();
1219
1220   // Make sure we are connected
1221   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1222
1223   // Schedule pending writes, until several write attempts have blocked
1224   char buf[128];
1225   memset(buf, 'a', sizeof(buf));
1226   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1227   WriteCallbackVector writeCallbacks;
1228
1229   writeCallbacks.reserve(5);
1230   while (writeCallbacks.size() < 5) {
1231     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1232
1233     socket->write(wcb.get(), buf, sizeof(buf));
1234     if (wcb->state == STATE_SUCCEEDED) {
1235       // Succeeded immediately.  Keep performing more writes
1236       continue;
1237     }
1238
1239     // This write is blocked.
1240     // Have the write callback call close() when writeError() is invoked
1241     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1242     writeCallbacks.push_back(wcb);
1243   }
1244
1245   // Call closeNow() to immediately fail the pending writes
1246   socket->closeNow();
1247
1248   // Make sure writeError() was invoked on all of the pending write callbacks
1249   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1250        it != writeCallbacks.end();
1251        ++it) {
1252     CHECK_EQ((*it)->state, STATE_FAILED);
1253   }
1254 }
1255
1256 ///////////////////////////////////////////////////////////////////////////
1257 // ImmediateRead related tests
1258 ///////////////////////////////////////////////////////////////////////////
1259
1260 /* AsyncSocket use to verify immediate read works */
1261 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1262  public:
1263   bool immediateReadCalled = false;
1264   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1265  protected:
1266   virtual void checkForImmediateRead() noexcept override {
1267     immediateReadCalled = true;
1268     AsyncSocket::handleRead();
1269   }
1270 };
1271
1272 TEST(AsyncSocket, ConnectReadImmediateRead) {
1273   TestServer server;
1274
1275   const size_t maxBufferSz = 100;
1276   const size_t maxReadsPerEvent = 1;
1277   const size_t expectedDataSz = maxBufferSz * 3;
1278   char expectedData[expectedDataSz];
1279   memset(expectedData, 'j', expectedDataSz);
1280
1281   EventBase evb;
1282   ReadCallback rcb(maxBufferSz);
1283   AsyncSocketImmediateRead socket(&evb);
1284   socket.connect(nullptr, server.getAddress(), 30);
1285
1286   evb.loop(); // loop until the socket is connected
1287
1288   socket.setReadCB(&rcb);
1289   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1290   socket.immediateReadCalled = false;
1291
1292   auto acceptedSocket = server.acceptAsync(&evb);
1293
1294   ReadCallback rcbServer;
1295   WriteCallback wcbServer;
1296   rcbServer.dataAvailableCallback = [&]() {
1297     if (rcbServer.dataRead() == expectedDataSz) {
1298       // write back all data read
1299       rcbServer.verifyData(expectedData, expectedDataSz);
1300       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1301       acceptedSocket->close();
1302     }
1303   };
1304   acceptedSocket->setReadCB(&rcbServer);
1305
1306   // write data
1307   WriteCallback wcb1;
1308   socket.write(&wcb1, expectedData, expectedDataSz);
1309   evb.loop();
1310   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1311   rcb.verifyData(expectedData, expectedDataSz);
1312   CHECK_EQ(socket.immediateReadCalled, true);
1313 }
1314
1315 TEST(AsyncSocket, ConnectReadUninstallRead) {
1316   TestServer server;
1317
1318   const size_t maxBufferSz = 100;
1319   const size_t maxReadsPerEvent = 1;
1320   const size_t expectedDataSz = maxBufferSz * 3;
1321   char expectedData[expectedDataSz];
1322   memset(expectedData, 'k', expectedDataSz);
1323
1324   EventBase evb;
1325   ReadCallback rcb(maxBufferSz);
1326   AsyncSocketImmediateRead socket(&evb);
1327   socket.connect(nullptr, server.getAddress(), 30);
1328
1329   evb.loop(); // loop until the socket is connected
1330
1331   socket.setReadCB(&rcb);
1332   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1333   socket.immediateReadCalled = false;
1334
1335   auto acceptedSocket = server.acceptAsync(&evb);
1336
1337   ReadCallback rcbServer;
1338   WriteCallback wcbServer;
1339   rcbServer.dataAvailableCallback = [&]() {
1340     if (rcbServer.dataRead() == expectedDataSz) {
1341       // write back all data read
1342       rcbServer.verifyData(expectedData, expectedDataSz);
1343       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1344       acceptedSocket->close();
1345     }
1346   };
1347   acceptedSocket->setReadCB(&rcbServer);
1348
1349   rcb.dataAvailableCallback = [&]() {
1350     // we read data and reset readCB
1351     socket.setReadCB(nullptr);
1352   };
1353
1354   // write data
1355   WriteCallback wcb;
1356   socket.write(&wcb, expectedData, expectedDataSz);
1357   evb.loop();
1358   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1359
1360   /* we shoud've only read maxBufferSz data since readCallback_
1361    * was reset in dataAvailableCallback */
1362   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1363   CHECK_EQ(socket.immediateReadCalled, false);
1364 }
1365
1366 // TODO:
1367 // - Test connect() and have the connect callback set the read callback
1368 // - Test connect() and have the connect callback unset the read callback
1369 // - Test reading/writing/closing/destroying the socket in the connect callback
1370 // - Test reading/writing/closing/destroying the socket in the read callback
1371 // - Test reading/writing/closing/destroying the socket in the write callback
1372 // - Test one-way shutdown behavior
1373 // - Test changing the EventBase
1374 //
1375 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1376 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1377
1378
1379 ///////////////////////////////////////////////////////////////////////////
1380 // AsyncServerSocket tests
1381 ///////////////////////////////////////////////////////////////////////////
1382
1383 /**
1384  * Helper AcceptCallback class for the test code
1385  * It records the callbacks that were invoked, and also supports calling
1386  * generic std::function objects in each callback.
1387  */
1388 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1389  public:
1390   enum EventType {
1391     TYPE_START,
1392     TYPE_ACCEPT,
1393     TYPE_ERROR,
1394     TYPE_STOP
1395   };
1396   struct EventInfo {
1397     EventInfo(int fd, const folly::SocketAddress& addr)
1398       : type(TYPE_ACCEPT),
1399         fd(fd),
1400         address(addr),
1401         errorMsg() {}
1402     explicit EventInfo(const std::string& msg)
1403       : type(TYPE_ERROR),
1404         fd(-1),
1405         address(),
1406         errorMsg(msg) {}
1407     explicit EventInfo(EventType et)
1408       : type(et),
1409         fd(-1),
1410         address(),
1411         errorMsg() {}
1412
1413     EventType type;
1414     int fd;  // valid for TYPE_ACCEPT
1415     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1416     string errorMsg;  // valid for TYPE_ERROR
1417   };
1418   typedef std::deque<EventInfo> EventList;
1419
1420   TestAcceptCallback()
1421     : connectionAcceptedFn_(),
1422       acceptErrorFn_(),
1423       acceptStoppedFn_(),
1424       events_() {}
1425
1426   std::deque<EventInfo>* getEvents() {
1427     return &events_;
1428   }
1429
1430   void setConnectionAcceptedFn(
1431       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1432     connectionAcceptedFn_ = fn;
1433   }
1434   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1435     acceptErrorFn_ = fn;
1436   }
1437   void setAcceptStartedFn(const std::function<void()>& fn) {
1438     acceptStartedFn_ = fn;
1439   }
1440   void setAcceptStoppedFn(const std::function<void()>& fn) {
1441     acceptStoppedFn_ = fn;
1442   }
1443
1444   void connectionAccepted(int fd, const folly::SocketAddress& clientAddr)
1445       noexcept {
1446     events_.emplace_back(fd, clientAddr);
1447
1448     if (connectionAcceptedFn_) {
1449       connectionAcceptedFn_(fd, clientAddr);
1450     }
1451   }
1452   void acceptError(const std::exception& ex) noexcept {
1453     events_.emplace_back(ex.what());
1454
1455     if (acceptErrorFn_) {
1456       acceptErrorFn_(ex);
1457     }
1458   }
1459   void acceptStarted() noexcept {
1460     events_.emplace_back(TYPE_START);
1461
1462     if (acceptStartedFn_) {
1463       acceptStartedFn_();
1464     }
1465   }
1466   void acceptStopped() noexcept {
1467     events_.emplace_back(TYPE_STOP);
1468
1469     if (acceptStoppedFn_) {
1470       acceptStoppedFn_();
1471     }
1472   }
1473
1474  private:
1475   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1476   std::function<void(const std::exception&)> acceptErrorFn_;
1477   std::function<void()> acceptStartedFn_;
1478   std::function<void()> acceptStoppedFn_;
1479
1480   std::deque<EventInfo> events_;
1481 };
1482
1483 /**
1484  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1485  */
1486 TEST(AsyncSocketTest, ServerAcceptOptions) {
1487   EventBase eventBase;
1488
1489   // Create a server socket
1490   std::shared_ptr<AsyncServerSocket> serverSocket(
1491       AsyncServerSocket::newSocket(&eventBase));
1492   serverSocket->bind(0);
1493   serverSocket->listen(16);
1494   folly::SocketAddress serverAddress;
1495   serverSocket->getAddress(&serverAddress);
1496
1497   // Add a callback to accept one connection then stop the loop
1498   TestAcceptCallback acceptCallback;
1499   acceptCallback.setConnectionAcceptedFn(
1500     [&](int fd, const folly::SocketAddress& addr) {
1501       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1502     });
1503   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1504     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1505   });
1506   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1507   serverSocket->startAccepting();
1508
1509   // Connect to the server socket
1510   std::shared_ptr<AsyncSocket> socket(
1511       AsyncSocket::newSocket(&eventBase, serverAddress));
1512
1513   eventBase.loop();
1514
1515   // Verify that the server accepted a connection
1516   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1517   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1518                     TestAcceptCallback::TYPE_START);
1519   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1520                     TestAcceptCallback::TYPE_ACCEPT);
1521   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1522                     TestAcceptCallback::TYPE_STOP);
1523   int fd = acceptCallback.getEvents()->at(1).fd;
1524
1525   // The accepted connection should already be in non-blocking mode
1526   int flags = fcntl(fd, F_GETFL, 0);
1527   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1528
1529 #ifndef TCP_NOPUSH
1530   // The accepted connection should already have TCP_NODELAY set
1531   int value;
1532   socklen_t valueLength = sizeof(value);
1533   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1534   CHECK_EQ(rc, 0);
1535   CHECK_EQ(value, 1);
1536 #endif
1537 }
1538
1539 /**
1540  * Test AsyncServerSocket::removeAcceptCallback()
1541  */
1542 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1543   // Create a new AsyncServerSocket
1544   EventBase eventBase;
1545   std::shared_ptr<AsyncServerSocket> serverSocket(
1546       AsyncServerSocket::newSocket(&eventBase));
1547   serverSocket->bind(0);
1548   serverSocket->listen(16);
1549   folly::SocketAddress serverAddress;
1550   serverSocket->getAddress(&serverAddress);
1551
1552   // Add several accept callbacks
1553   TestAcceptCallback cb1;
1554   TestAcceptCallback cb2;
1555   TestAcceptCallback cb3;
1556   TestAcceptCallback cb4;
1557   TestAcceptCallback cb5;
1558   TestAcceptCallback cb6;
1559   TestAcceptCallback cb7;
1560
1561   // Test having callbacks remove other callbacks before them on the list,
1562   // after them on the list, or removing themselves.
1563   //
1564   // Have callback 2 remove callback 3 and callback 5 the first time it is
1565   // called.
1566   int cb2Count = 0;
1567   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1568       std::shared_ptr<AsyncSocket> sock2(
1569         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1570       });
1571   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1572     });
1573   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1574       std::shared_ptr<AsyncSocket> sock3(
1575         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1576     });
1577   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1578   std::shared_ptr<AsyncSocket> sock5(
1579       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1580
1581     });
1582   cb2.setConnectionAcceptedFn(
1583     [&](int fd, const folly::SocketAddress& addr) {
1584       if (cb2Count == 0) {
1585         serverSocket->removeAcceptCallback(&cb3, nullptr);
1586         serverSocket->removeAcceptCallback(&cb5, nullptr);
1587       }
1588       ++cb2Count;
1589     });
1590   // Have callback 6 remove callback 4 the first time it is called,
1591   // and destroy the server socket the second time it is called
1592   int cb6Count = 0;
1593   cb6.setConnectionAcceptedFn(
1594     [&](int fd, const folly::SocketAddress& addr) {
1595       if (cb6Count == 0) {
1596         serverSocket->removeAcceptCallback(&cb4, nullptr);
1597         std::shared_ptr<AsyncSocket> sock6(
1598           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1599         std::shared_ptr<AsyncSocket> sock7(
1600           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1601         std::shared_ptr<AsyncSocket> sock8(
1602           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1603
1604       } else {
1605         serverSocket.reset();
1606       }
1607       ++cb6Count;
1608     });
1609   // Have callback 7 remove itself
1610   cb7.setConnectionAcceptedFn(
1611     [&](int fd, const folly::SocketAddress& addr) {
1612       serverSocket->removeAcceptCallback(&cb7, nullptr);
1613     });
1614
1615   serverSocket->addAcceptCallback(&cb1, nullptr);
1616   serverSocket->addAcceptCallback(&cb2, nullptr);
1617   serverSocket->addAcceptCallback(&cb3, nullptr);
1618   serverSocket->addAcceptCallback(&cb4, nullptr);
1619   serverSocket->addAcceptCallback(&cb5, nullptr);
1620   serverSocket->addAcceptCallback(&cb6, nullptr);
1621   serverSocket->addAcceptCallback(&cb7, nullptr);
1622   serverSocket->startAccepting();
1623
1624   // Make several connections to the socket
1625   std::shared_ptr<AsyncSocket> sock1(
1626       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1627   std::shared_ptr<AsyncSocket> sock4(
1628       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1629
1630   // Loop until we are stopped
1631   eventBase.loop();
1632
1633   // Check to make sure that the expected callbacks were invoked.
1634   //
1635   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1636   // the AcceptCallbacks in round-robin fashion, in the order that they were
1637   // added.  The code is implemented this way right now, but the API doesn't
1638   // explicitly require it be done this way.  If we change the code not to be
1639   // exactly round robin in the future, we can simplify the test checks here.
1640   // (We'll also need to update the termination code, since we expect cb6 to
1641   // get called twice to terminate the loop.)
1642   CHECK_EQ(cb1.getEvents()->size(), 4);
1643   CHECK_EQ(cb1.getEvents()->at(0).type,
1644                     TestAcceptCallback::TYPE_START);
1645   CHECK_EQ(cb1.getEvents()->at(1).type,
1646                     TestAcceptCallback::TYPE_ACCEPT);
1647   CHECK_EQ(cb1.getEvents()->at(2).type,
1648                     TestAcceptCallback::TYPE_ACCEPT);
1649   CHECK_EQ(cb1.getEvents()->at(3).type,
1650                     TestAcceptCallback::TYPE_STOP);
1651
1652   CHECK_EQ(cb2.getEvents()->size(), 4);
1653   CHECK_EQ(cb2.getEvents()->at(0).type,
1654                     TestAcceptCallback::TYPE_START);
1655   CHECK_EQ(cb2.getEvents()->at(1).type,
1656                     TestAcceptCallback::TYPE_ACCEPT);
1657   CHECK_EQ(cb2.getEvents()->at(2).type,
1658                     TestAcceptCallback::TYPE_ACCEPT);
1659   CHECK_EQ(cb2.getEvents()->at(3).type,
1660                     TestAcceptCallback::TYPE_STOP);
1661
1662   CHECK_EQ(cb3.getEvents()->size(), 2);
1663   CHECK_EQ(cb3.getEvents()->at(0).type,
1664                     TestAcceptCallback::TYPE_START);
1665   CHECK_EQ(cb3.getEvents()->at(1).type,
1666                     TestAcceptCallback::TYPE_STOP);
1667
1668   CHECK_EQ(cb4.getEvents()->size(), 3);
1669   CHECK_EQ(cb4.getEvents()->at(0).type,
1670                     TestAcceptCallback::TYPE_START);
1671   CHECK_EQ(cb4.getEvents()->at(1).type,
1672                     TestAcceptCallback::TYPE_ACCEPT);
1673   CHECK_EQ(cb4.getEvents()->at(2).type,
1674                     TestAcceptCallback::TYPE_STOP);
1675
1676   CHECK_EQ(cb5.getEvents()->size(), 2);
1677   CHECK_EQ(cb5.getEvents()->at(0).type,
1678                     TestAcceptCallback::TYPE_START);
1679   CHECK_EQ(cb5.getEvents()->at(1).type,
1680                     TestAcceptCallback::TYPE_STOP);
1681
1682   CHECK_EQ(cb6.getEvents()->size(), 4);
1683   CHECK_EQ(cb6.getEvents()->at(0).type,
1684                     TestAcceptCallback::TYPE_START);
1685   CHECK_EQ(cb6.getEvents()->at(1).type,
1686                     TestAcceptCallback::TYPE_ACCEPT);
1687   CHECK_EQ(cb6.getEvents()->at(2).type,
1688                     TestAcceptCallback::TYPE_ACCEPT);
1689   CHECK_EQ(cb6.getEvents()->at(3).type,
1690                     TestAcceptCallback::TYPE_STOP);
1691
1692   CHECK_EQ(cb7.getEvents()->size(), 3);
1693   CHECK_EQ(cb7.getEvents()->at(0).type,
1694                     TestAcceptCallback::TYPE_START);
1695   CHECK_EQ(cb7.getEvents()->at(1).type,
1696                     TestAcceptCallback::TYPE_ACCEPT);
1697   CHECK_EQ(cb7.getEvents()->at(2).type,
1698                     TestAcceptCallback::TYPE_STOP);
1699 }
1700
1701 /**
1702  * Test AsyncServerSocket::removeAcceptCallback()
1703  */
1704 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1705   // Create a new AsyncServerSocket
1706   EventBase eventBase;
1707   std::shared_ptr<AsyncServerSocket> serverSocket(
1708       AsyncServerSocket::newSocket(&eventBase));
1709   serverSocket->bind(0);
1710   serverSocket->listen(16);
1711   folly::SocketAddress serverAddress;
1712   serverSocket->getAddress(&serverAddress);
1713
1714   // Add several accept callbacks
1715   TestAcceptCallback cb1;
1716   auto thread_id = pthread_self();
1717   cb1.setAcceptStartedFn([&](){
1718     CHECK_NE(thread_id, pthread_self());
1719     thread_id = pthread_self();
1720   });
1721   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1722     CHECK_EQ(thread_id, pthread_self());
1723     serverSocket->removeAcceptCallback(&cb1, nullptr);
1724   });
1725   cb1.setAcceptStoppedFn([&](){
1726     CHECK_EQ(thread_id, pthread_self());
1727   });
1728
1729   // Test having callbacks remove other callbacks before them on the list,
1730   serverSocket->addAcceptCallback(&cb1, nullptr);
1731   serverSocket->startAccepting();
1732
1733   // Make several connections to the socket
1734   std::shared_ptr<AsyncSocket> sock1(
1735       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1736
1737   // Loop in another thread
1738   auto other = std::thread([&](){
1739     eventBase.loop();
1740   });
1741   other.join();
1742
1743   // Check to make sure that the expected callbacks were invoked.
1744   //
1745   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1746   // the AcceptCallbacks in round-robin fashion, in the order that they were
1747   // added.  The code is implemented this way right now, but the API doesn't
1748   // explicitly require it be done this way.  If we change the code not to be
1749   // exactly round robin in the future, we can simplify the test checks here.
1750   // (We'll also need to update the termination code, since we expect cb6 to
1751   // get called twice to terminate the loop.)
1752   CHECK_EQ(cb1.getEvents()->size(), 3);
1753   CHECK_EQ(cb1.getEvents()->at(0).type,
1754                     TestAcceptCallback::TYPE_START);
1755   CHECK_EQ(cb1.getEvents()->at(1).type,
1756                     TestAcceptCallback::TYPE_ACCEPT);
1757   CHECK_EQ(cb1.getEvents()->at(2).type,
1758                     TestAcceptCallback::TYPE_STOP);
1759
1760 }
1761
1762 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1763   // Add a callback to accept one connection then stop accepting
1764   TestAcceptCallback acceptCallback;
1765   acceptCallback.setConnectionAcceptedFn(
1766     [&](int fd, const folly::SocketAddress& addr) {
1767       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1768     });
1769   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1770     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1771   });
1772   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1773   serverSocket->startAccepting();
1774
1775   // Connect to the server socket
1776   EventBase* eventBase = serverSocket->getEventBase();
1777   folly::SocketAddress serverAddress;
1778   serverSocket->getAddress(&serverAddress);
1779   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1780
1781   // Loop to process all events
1782   eventBase->loop();
1783
1784   // Verify that the server accepted a connection
1785   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1786   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1787                     TestAcceptCallback::TYPE_START);
1788   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1789                     TestAcceptCallback::TYPE_ACCEPT);
1790   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1791                     TestAcceptCallback::TYPE_STOP);
1792 }
1793
1794 /* Verify that we don't leak sockets if we are destroyed()
1795  * and there are still writes pending
1796  *
1797  * If destroy() only calls close() instead of closeNow(),
1798  * it would shutdown(writes) on the socket, but it would
1799  * never be close()'d, and the socket would leak
1800  */
1801 TEST(AsyncSocketTest, DestroyCloseTest) {
1802   TestServer server;
1803
1804   // connect()
1805   EventBase clientEB;
1806   EventBase serverEB;
1807   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1808   ConnCallback ccb;
1809   socket->connect(&ccb, server.getAddress(), 30);
1810
1811   // Accept the connection
1812   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1813   ReadCallback rcb;
1814   acceptedSocket->setReadCB(&rcb);
1815
1816   // Write a large buffer to the socket that is larger than kernel buffer
1817   size_t simpleBufLength = 5000000;
1818   char* simpleBuf = new char[simpleBufLength];
1819   memset(simpleBuf, 'a', simpleBufLength);
1820   WriteCallback wcb;
1821
1822   // Let the reads and writes run to completion
1823   int fd = acceptedSocket->getFd();
1824
1825   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1826   socket.reset();
1827   acceptedSocket.reset();
1828
1829   // Test that server socket was closed
1830   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1831   CHECK_EQ(sz, -1);
1832   CHECK_EQ(errno, 9);
1833   delete[] simpleBuf;
1834 }
1835
1836 /**
1837  * Test AsyncServerSocket::useExistingSocket()
1838  */
1839 TEST(AsyncSocketTest, ServerExistingSocket) {
1840   EventBase eventBase;
1841
1842   // Test creating a socket, and letting AsyncServerSocket bind and listen
1843   {
1844     // Manually create a socket
1845     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1846     ASSERT_GE(fd, 0);
1847
1848     // Create a server socket
1849     AsyncServerSocket::UniquePtr serverSocket(
1850         new AsyncServerSocket(&eventBase));
1851     serverSocket->useExistingSocket(fd);
1852     folly::SocketAddress address;
1853     serverSocket->getAddress(&address);
1854     address.setPort(0);
1855     serverSocket->bind(address);
1856     serverSocket->listen(16);
1857
1858     // Make sure the socket works
1859     serverSocketSanityTest(serverSocket.get());
1860   }
1861
1862   // Test creating a socket and binding manually,
1863   // then letting AsyncServerSocket listen
1864   {
1865     // Manually create a socket
1866     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1867     ASSERT_GE(fd, 0);
1868     // bind
1869     struct sockaddr_in addr;
1870     addr.sin_family = AF_INET;
1871     addr.sin_port = 0;
1872     addr.sin_addr.s_addr = INADDR_ANY;
1873     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1874                              sizeof(addr)), 0);
1875     // Look up the address that we bound to
1876     folly::SocketAddress boundAddress;
1877     boundAddress.setFromLocalAddress(fd);
1878
1879     // Create a server socket
1880     AsyncServerSocket::UniquePtr serverSocket(
1881         new AsyncServerSocket(&eventBase));
1882     serverSocket->useExistingSocket(fd);
1883     serverSocket->listen(16);
1884
1885     // Make sure AsyncServerSocket reports the same address that we bound to
1886     folly::SocketAddress serverSocketAddress;
1887     serverSocket->getAddress(&serverSocketAddress);
1888     CHECK_EQ(boundAddress, serverSocketAddress);
1889
1890     // Make sure the socket works
1891     serverSocketSanityTest(serverSocket.get());
1892   }
1893
1894   // Test creating a socket, binding and listening manually,
1895   // then giving it to AsyncServerSocket
1896   {
1897     // Manually create a socket
1898     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1899     ASSERT_GE(fd, 0);
1900     // bind
1901     struct sockaddr_in addr;
1902     addr.sin_family = AF_INET;
1903     addr.sin_port = 0;
1904     addr.sin_addr.s_addr = INADDR_ANY;
1905     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1906                              sizeof(addr)), 0);
1907     // Look up the address that we bound to
1908     folly::SocketAddress boundAddress;
1909     boundAddress.setFromLocalAddress(fd);
1910     // listen
1911     CHECK_EQ(listen(fd, 16), 0);
1912
1913     // Create a server socket
1914     AsyncServerSocket::UniquePtr serverSocket(
1915         new AsyncServerSocket(&eventBase));
1916     serverSocket->useExistingSocket(fd);
1917
1918     // Make sure AsyncServerSocket reports the same address that we bound to
1919     folly::SocketAddress serverSocketAddress;
1920     serverSocket->getAddress(&serverSocketAddress);
1921     CHECK_EQ(boundAddress, serverSocketAddress);
1922
1923     // Make sure the socket works
1924     serverSocketSanityTest(serverSocket.get());
1925   }
1926 }
1927
1928 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1929   EventBase eventBase;
1930
1931   // Create a server socket
1932   std::shared_ptr<AsyncServerSocket> serverSocket(
1933       AsyncServerSocket::newSocket(&eventBase));
1934   string path(1, 0);
1935   path.append("/anonymous");
1936   folly::SocketAddress serverAddress;
1937   serverAddress.setFromPath(path);
1938   serverSocket->bind(serverAddress);
1939   serverSocket->listen(16);
1940
1941   // Add a callback to accept one connection then stop the loop
1942   TestAcceptCallback acceptCallback;
1943   acceptCallback.setConnectionAcceptedFn(
1944     [&](int fd, const folly::SocketAddress& addr) {
1945       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1946     });
1947   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1948     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1949   });
1950   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1951   serverSocket->startAccepting();
1952
1953   // Connect to the server socket
1954   std::shared_ptr<AsyncSocket> socket(
1955       AsyncSocket::newSocket(&eventBase, serverAddress));
1956
1957   eventBase.loop();
1958
1959   // Verify that the server accepted a connection
1960   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1961   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1962                     TestAcceptCallback::TYPE_START);
1963   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1964                     TestAcceptCallback::TYPE_ACCEPT);
1965   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1966                     TestAcceptCallback::TYPE_STOP);
1967   int fd = acceptCallback.getEvents()->at(1).fd;
1968
1969   // The accepted connection should already be in non-blocking mode
1970   int flags = fcntl(fd, F_GETFL, 0);
1971   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1972 }