Fix Build: IPv6: SocketAddressTest.SetFromStrings and AsyncSocketTest.ConnectTimeout...
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
26
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
29 #include <iostream>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <poll.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
36 #include <thread>
37
38 using namespace boost;
39
40 using std::string;
41 using std::vector;
42 using std::min;
43 using std::cerr;
44 using std::endl;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
48
49 using namespace folly;
50
51 class DelayedWrite: public AsyncTimeout {
52  public:
53   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55       bool cork, bool lastWrite = false):
56     AsyncTimeout(socket->getEventBase()),
57     socket_(socket),
58     bufs_(std::move(bufs)),
59     wcb_(wcb),
60     cork_(cork),
61     lastWrite_(lastWrite) {}
62
63  private:
64   void timeoutExpired() noexcept override {
65     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66     socket_->writeChain(wcb_, std::move(bufs_), flags);
67     if (lastWrite_) {
68       socket_->shutdownWrite();
69     }
70   }
71
72   std::shared_ptr<AsyncSocket> socket_;
73   unique_ptr<IOBuf> bufs_;
74   AsyncTransportWrapper::WriteCallback* wcb_;
75   bool cork_;
76   bool lastWrite_;
77 };
78
79 ///////////////////////////////////////////////////////////////////////////
80 // connect() tests
81 ///////////////////////////////////////////////////////////////////////////
82
83 /**
84  * Test connecting to a server
85  */
86 TEST(AsyncSocketTest, Connect) {
87   // Start listening on a local port
88   TestServer server;
89
90   // Connect using a AsyncSocket
91   EventBase evb;
92   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
93   ConnCallback cb;
94   socket->connect(&cb, server.getAddress(), 30);
95
96   evb.loop();
97
98   CHECK_EQ(cb.state, STATE_SUCCEEDED);
99 }
100
101 /**
102  * Test connecting to a server that isn't listening
103  */
104 TEST(AsyncSocketTest, ConnectRefused) {
105   EventBase evb;
106
107   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
108
109   // Hopefully nothing is actually listening on this address
110   folly::SocketAddress addr("127.0.0.1", 65535);
111   ConnCallback cb;
112   socket->connect(&cb, addr, 30);
113
114   evb.loop();
115
116   CHECK_EQ(cb.state, STATE_FAILED);
117   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
118 }
119
120 /**
121  * Test connection timeout
122  */
123 TEST(AsyncSocketTest, ConnectTimeout) {
124   EventBase evb;
125
126   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
127
128   // Try connecting to server that won't respond.
129   //
130   // This depends somewhat on the network where this test is run.
131   // Hopefully this IP will be routable but unresponsive.
132   // (Alternatively, we could try listening on a local raw socket, but that
133   // normally requires root privileges.)
134   auto host =
135       SocketAddressTestHelper::isIPv6Enabled() ?
136       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
137       SocketAddressTestHelper::isIPv4Enabled() ?
138       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
139       nullptr;
140   SocketAddress addr(host, 65535);
141   ConnCallback cb;
142   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
143
144   evb.loop();
145
146   CHECK_EQ(cb.state, STATE_FAILED);
147   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
148
149   // Verify that we can still get the peer address after a timeout.
150   // Use case is if the client was created from a client pool, and we want
151   // to log which peer failed.
152   folly::SocketAddress peer;
153   socket->getPeerAddress(&peer);
154   CHECK_EQ(peer, addr);
155 }
156
157 /**
158  * Test writing immediately after connecting, without waiting for connect
159  * to finish.
160  */
161 TEST(AsyncSocketTest, ConnectAndWrite) {
162   TestServer server;
163
164   // connect()
165   EventBase evb;
166   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
167   ConnCallback ccb;
168   socket->connect(&ccb, server.getAddress(), 30);
169
170   // write()
171   char buf[128];
172   memset(buf, 'a', sizeof(buf));
173   WriteCallback wcb;
174   socket->write(&wcb, buf, sizeof(buf));
175
176   // Loop.  We don't bother accepting on the server socket yet.
177   // The kernel should be able to buffer the write request so it can succeed.
178   evb.loop();
179
180   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
181   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
182
183   // Make sure the server got a connection and received the data
184   socket->close();
185   server.verifyConnection(buf, sizeof(buf));
186 }
187
188 /**
189  * Test connecting using a nullptr connect callback.
190  */
191 TEST(AsyncSocketTest, ConnectNullCallback) {
192   TestServer server;
193
194   // connect()
195   EventBase evb;
196   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
197   socket->connect(nullptr, server.getAddress(), 30);
198
199   // write some data, just so we have some way of verifing
200   // that the socket works correctly after connecting
201   char buf[128];
202   memset(buf, 'a', sizeof(buf));
203   WriteCallback wcb;
204   socket->write(&wcb, buf, sizeof(buf));
205
206   evb.loop();
207
208   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
209
210   // Make sure the server got a connection and received the data
211   socket->close();
212   server.verifyConnection(buf, sizeof(buf));
213 }
214
215 /**
216  * Test calling both write() and close() immediately after connecting, without
217  * waiting for connect to finish.
218  *
219  * This exercises the STATE_CONNECTING_CLOSING code.
220  */
221 TEST(AsyncSocketTest, ConnectWriteAndClose) {
222   TestServer server;
223
224   // connect()
225   EventBase evb;
226   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
227   ConnCallback ccb;
228   socket->connect(&ccb, server.getAddress(), 30);
229
230   // write()
231   char buf[128];
232   memset(buf, 'a', sizeof(buf));
233   WriteCallback wcb;
234   socket->write(&wcb, buf, sizeof(buf));
235
236   // close()
237   socket->close();
238
239   // Loop.  We don't bother accepting on the server socket yet.
240   // The kernel should be able to buffer the write request so it can succeed.
241   evb.loop();
242
243   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
244   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
245
246   // Make sure the server got a connection and received the data
247   server.verifyConnection(buf, sizeof(buf));
248 }
249
250 /**
251  * Test calling close() immediately after connect()
252  */
253 TEST(AsyncSocketTest, ConnectAndClose) {
254   TestServer server;
255
256   // Connect using a AsyncSocket
257   EventBase evb;
258   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
259   ConnCallback ccb;
260   socket->connect(&ccb, server.getAddress(), 30);
261
262   // Hopefully the connect didn't succeed immediately.
263   // If it did, we can't exercise the close-while-connecting code path.
264   if (ccb.state == STATE_SUCCEEDED) {
265     LOG(INFO) << "connect() succeeded immediately; aborting test "
266                        "of close-during-connect behavior";
267     return;
268   }
269
270   socket->close();
271
272   // Loop, although there shouldn't be anything to do.
273   evb.loop();
274
275   // Make sure the connection was aborted
276   CHECK_EQ(ccb.state, STATE_FAILED);
277 }
278
279 /**
280  * Test calling closeNow() immediately after connect()
281  *
282  * This should be identical to the normal close behavior.
283  */
284 TEST(AsyncSocketTest, ConnectAndCloseNow) {
285   TestServer server;
286
287   // Connect using a AsyncSocket
288   EventBase evb;
289   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
290   ConnCallback ccb;
291   socket->connect(&ccb, server.getAddress(), 30);
292
293   // Hopefully the connect didn't succeed immediately.
294   // If it did, we can't exercise the close-while-connecting code path.
295   if (ccb.state == STATE_SUCCEEDED) {
296     LOG(INFO) << "connect() succeeded immediately; aborting test "
297                        "of closeNow()-during-connect behavior";
298     return;
299   }
300
301   socket->closeNow();
302
303   // Loop, although there shouldn't be anything to do.
304   evb.loop();
305
306   // Make sure the connection was aborted
307   CHECK_EQ(ccb.state, STATE_FAILED);
308 }
309
310 /**
311  * Test calling both write() and closeNow() immediately after connecting,
312  * without waiting for connect to finish.
313  *
314  * This should abort the pending write.
315  */
316 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
317   TestServer server;
318
319   // connect()
320   EventBase evb;
321   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
322   ConnCallback ccb;
323   socket->connect(&ccb, server.getAddress(), 30);
324
325   // Hopefully the connect didn't succeed immediately.
326   // If it did, we can't exercise the close-while-connecting code path.
327   if (ccb.state == STATE_SUCCEEDED) {
328     LOG(INFO) << "connect() succeeded immediately; aborting test "
329                        "of write-during-connect behavior";
330     return;
331   }
332
333   // write()
334   char buf[128];
335   memset(buf, 'a', sizeof(buf));
336   WriteCallback wcb;
337   socket->write(&wcb, buf, sizeof(buf));
338
339   // close()
340   socket->closeNow();
341
342   // Loop, although there shouldn't be anything to do.
343   evb.loop();
344
345   CHECK_EQ(ccb.state, STATE_FAILED);
346   CHECK_EQ(wcb.state, STATE_FAILED);
347 }
348
349 /**
350  * Test installing a read callback immediately, before connect() finishes.
351  */
352 TEST(AsyncSocketTest, ConnectAndRead) {
353   TestServer server;
354
355   // connect()
356   EventBase evb;
357   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
358   ConnCallback ccb;
359   socket->connect(&ccb, server.getAddress(), 30);
360
361   ReadCallback rcb;
362   socket->setReadCB(&rcb);
363
364   // Even though we haven't looped yet, we should be able to accept
365   // the connection and send data to it.
366   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
367   uint8_t buf[128];
368   memset(buf, 'a', sizeof(buf));
369   acceptedSocket->write(buf, sizeof(buf));
370   acceptedSocket->flush();
371   acceptedSocket->close();
372
373   // Loop, although there shouldn't be anything to do.
374   evb.loop();
375
376   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
377   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
378   CHECK_EQ(rcb.buffers.size(), 1);
379   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
380   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
381 }
382
383 /**
384  * Test installing a read callback and then closing immediately before the
385  * connect attempt finishes.
386  */
387 TEST(AsyncSocketTest, ConnectReadAndClose) {
388   TestServer server;
389
390   // connect()
391   EventBase evb;
392   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
393   ConnCallback ccb;
394   socket->connect(&ccb, server.getAddress(), 30);
395
396   // Hopefully the connect didn't succeed immediately.
397   // If it did, we can't exercise the close-while-connecting code path.
398   if (ccb.state == STATE_SUCCEEDED) {
399     LOG(INFO) << "connect() succeeded immediately; aborting test "
400                        "of read-during-connect behavior";
401     return;
402   }
403
404   ReadCallback rcb;
405   socket->setReadCB(&rcb);
406
407   // close()
408   socket->close();
409
410   // Loop, although there shouldn't be anything to do.
411   evb.loop();
412
413   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
414   CHECK_EQ(rcb.buffers.size(), 0);
415   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
416 }
417
418 /**
419  * Test both writing and installing a read callback immediately,
420  * before connect() finishes.
421  */
422 TEST(AsyncSocketTest, ConnectWriteAndRead) {
423   TestServer server;
424
425   // connect()
426   EventBase evb;
427   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
428   ConnCallback ccb;
429   socket->connect(&ccb, server.getAddress(), 30);
430
431   // write()
432   char buf1[128];
433   memset(buf1, 'a', sizeof(buf1));
434   WriteCallback wcb;
435   socket->write(&wcb, buf1, sizeof(buf1));
436
437   // set a read callback
438   ReadCallback rcb;
439   socket->setReadCB(&rcb);
440
441   // Even though we haven't looped yet, we should be able to accept
442   // the connection and send data to it.
443   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
444   uint8_t buf2[128];
445   memset(buf2, 'b', sizeof(buf2));
446   acceptedSocket->write(buf2, sizeof(buf2));
447   acceptedSocket->flush();
448
449   // shut down the write half of acceptedSocket, so that the AsyncSocket
450   // will stop reading and we can break out of the event loop.
451   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
452
453   // Loop
454   evb.loop();
455
456   // Make sure the connect succeeded
457   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
458
459   // Make sure the AsyncSocket read the data written by the accepted socket
460   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
461   CHECK_EQ(rcb.buffers.size(), 1);
462   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
463   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
464
465   // Close the AsyncSocket so we'll see EOF on acceptedSocket
466   socket->close();
467
468   // Make sure the accepted socket saw the data written by the AsyncSocket
469   uint8_t readbuf[sizeof(buf1)];
470   acceptedSocket->readAll(readbuf, sizeof(readbuf));
471   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
472   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
473   CHECK_EQ(bytesRead, 0);
474 }
475
476 /**
477  * Test writing to the socket then shutting down writes before the connect
478  * attempt finishes.
479  */
480 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
481   TestServer server;
482
483   // connect()
484   EventBase evb;
485   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
486   ConnCallback ccb;
487   socket->connect(&ccb, server.getAddress(), 30);
488
489   // Hopefully the connect didn't succeed immediately.
490   // If it did, we can't exercise the write-while-connecting code path.
491   if (ccb.state == STATE_SUCCEEDED) {
492     LOG(INFO) << "connect() succeeded immediately; skipping test";
493     return;
494   }
495
496   // Ask to write some data
497   char wbuf[128];
498   memset(wbuf, 'a', sizeof(wbuf));
499   WriteCallback wcb;
500   socket->write(&wcb, wbuf, sizeof(wbuf));
501   socket->shutdownWrite();
502
503   // Shutdown writes
504   socket->shutdownWrite();
505
506   // Even though we haven't looped yet, we should be able to accept
507   // the connection.
508   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
509
510   // Since the connection is still in progress, there should be no data to
511   // read yet.  Verify that the accepted socket is not readable.
512   struct pollfd fds[1];
513   fds[0].fd = acceptedSocket->getSocketFD();
514   fds[0].events = POLLIN;
515   fds[0].revents = 0;
516   int rc = poll(fds, 1, 0);
517   CHECK_EQ(rc, 0);
518
519   // Write data to the accepted socket
520   uint8_t acceptedWbuf[192];
521   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
522   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
523   acceptedSocket->flush();
524
525   // Loop
526   evb.loop();
527
528   // The loop should have completed the connection, written the queued data,
529   // and shutdown writes on the socket.
530   //
531   // Check that the connection was completed successfully and that the write
532   // callback succeeded.
533   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
534   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
535
536   // Check that we can read the data that was written to the socket, and that
537   // we see an EOF, since its socket was half-shutdown.
538   uint8_t readbuf[sizeof(wbuf)];
539   acceptedSocket->readAll(readbuf, sizeof(readbuf));
540   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
541   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
542   CHECK_EQ(bytesRead, 0);
543
544   // Close the accepted socket.  This will cause it to see EOF
545   // and uninstall the read callback when we loop next.
546   acceptedSocket->close();
547
548   // Install a read callback, then loop again.
549   ReadCallback rcb;
550   socket->setReadCB(&rcb);
551   evb.loop();
552
553   // This loop should have read the data and seen the EOF
554   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
555   CHECK_EQ(rcb.buffers.size(), 1);
556   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
557   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
558                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
559 }
560
561 /**
562  * Test reading, writing, and shutting down writes before the connect attempt
563  * finishes.
564  */
565 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
566   TestServer server;
567
568   // connect()
569   EventBase evb;
570   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
571   ConnCallback ccb;
572   socket->connect(&ccb, server.getAddress(), 30);
573
574   // Hopefully the connect didn't succeed immediately.
575   // If it did, we can't exercise the write-while-connecting code path.
576   if (ccb.state == STATE_SUCCEEDED) {
577     LOG(INFO) << "connect() succeeded immediately; skipping test";
578     return;
579   }
580
581   // Install a read callback
582   ReadCallback rcb;
583   socket->setReadCB(&rcb);
584
585   // Ask to write some data
586   char wbuf[128];
587   memset(wbuf, 'a', sizeof(wbuf));
588   WriteCallback wcb;
589   socket->write(&wcb, wbuf, sizeof(wbuf));
590
591   // Shutdown writes
592   socket->shutdownWrite();
593
594   // Even though we haven't looped yet, we should be able to accept
595   // the connection.
596   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
597
598   // Since the connection is still in progress, there should be no data to
599   // read yet.  Verify that the accepted socket is not readable.
600   struct pollfd fds[1];
601   fds[0].fd = acceptedSocket->getSocketFD();
602   fds[0].events = POLLIN;
603   fds[0].revents = 0;
604   int rc = poll(fds, 1, 0);
605   CHECK_EQ(rc, 0);
606
607   // Write data to the accepted socket
608   uint8_t acceptedWbuf[192];
609   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
610   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
611   acceptedSocket->flush();
612   // Shutdown writes to the accepted socket.  This will cause it to see EOF
613   // and uninstall the read callback.
614   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
615
616   // Loop
617   evb.loop();
618
619   // The loop should have completed the connection, written the queued data,
620   // shutdown writes on the socket, read the data we wrote to it, and see the
621   // EOF.
622   //
623   // Check that the connection was completed successfully and that the read
624   // and write callbacks were invoked as expected.
625   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
626   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
627   CHECK_EQ(rcb.buffers.size(), 1);
628   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
629   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
630                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
631   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
632
633   // Check that we can read the data that was written to the socket, and that
634   // we see an EOF, since its socket was half-shutdown.
635   uint8_t readbuf[sizeof(wbuf)];
636   acceptedSocket->readAll(readbuf, sizeof(readbuf));
637   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
638   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
639   CHECK_EQ(bytesRead, 0);
640
641   // Fully close both sockets
642   acceptedSocket->close();
643   socket->close();
644 }
645
646 /**
647  * Test reading, writing, and calling shutdownWriteNow() before the
648  * connect attempt finishes.
649  */
650 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
651   TestServer server;
652
653   // connect()
654   EventBase evb;
655   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
656   ConnCallback ccb;
657   socket->connect(&ccb, server.getAddress(), 30);
658
659   // Hopefully the connect didn't succeed immediately.
660   // If it did, we can't exercise the write-while-connecting code path.
661   if (ccb.state == STATE_SUCCEEDED) {
662     LOG(INFO) << "connect() succeeded immediately; skipping test";
663     return;
664   }
665
666   // Install a read callback
667   ReadCallback rcb;
668   socket->setReadCB(&rcb);
669
670   // Ask to write some data
671   char wbuf[128];
672   memset(wbuf, 'a', sizeof(wbuf));
673   WriteCallback wcb;
674   socket->write(&wcb, wbuf, sizeof(wbuf));
675
676   // Shutdown writes immediately.
677   // This should immediately discard the data that we just tried to write.
678   socket->shutdownWriteNow();
679
680   // Verify that writeError() was invoked on the write callback.
681   CHECK_EQ(wcb.state, STATE_FAILED);
682   CHECK_EQ(wcb.bytesWritten, 0);
683
684   // Even though we haven't looped yet, we should be able to accept
685   // the connection.
686   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
687
688   // Since the connection is still in progress, there should be no data to
689   // read yet.  Verify that the accepted socket is not readable.
690   struct pollfd fds[1];
691   fds[0].fd = acceptedSocket->getSocketFD();
692   fds[0].events = POLLIN;
693   fds[0].revents = 0;
694   int rc = poll(fds, 1, 0);
695   CHECK_EQ(rc, 0);
696
697   // Write data to the accepted socket
698   uint8_t acceptedWbuf[192];
699   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
700   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
701   acceptedSocket->flush();
702   // Shutdown writes to the accepted socket.  This will cause it to see EOF
703   // and uninstall the read callback.
704   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
705
706   // Loop
707   evb.loop();
708
709   // The loop should have completed the connection, written the queued data,
710   // shutdown writes on the socket, read the data we wrote to it, and see the
711   // EOF.
712   //
713   // Check that the connection was completed successfully and that the read
714   // callback was invoked as expected.
715   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
716   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
717   CHECK_EQ(rcb.buffers.size(), 1);
718   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
719   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
720                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
721
722   // Since we used shutdownWriteNow(), it should have discarded all pending
723   // write data.  Verify we see an immediate EOF when reading from the accepted
724   // socket.
725   uint8_t readbuf[sizeof(wbuf)];
726   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
727   CHECK_EQ(bytesRead, 0);
728
729   // Fully close both sockets
730   acceptedSocket->close();
731   socket->close();
732 }
733
734 // Helper function for use in testConnectOptWrite()
735 // Temporarily disable the read callback
736 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
737   // Uninstall the read callback
738   socket->setReadCB(nullptr);
739   // Schedule the read callback to be reinstalled after 1ms
740   socket->getEventBase()->runInLoop(
741       std::bind(&AsyncSocket::setReadCB, socket, rcb));
742 }
743
744 /**
745  * Test connect+write, then have the connect callback perform another write.
746  *
747  * This tests interaction of the optimistic writing after connect with
748  * additional write attempts that occur in the connect callback.
749  */
750 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
751   TestServer server;
752   EventBase evb;
753   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
754
755   // connect()
756   ConnCallback ccb;
757   socket->connect(&ccb, server.getAddress(), 30);
758
759   // Hopefully the connect didn't succeed immediately.
760   // If it did, we can't exercise the optimistic write code path.
761   if (ccb.state == STATE_SUCCEEDED) {
762     LOG(INFO) << "connect() succeeded immediately; aborting test "
763                        "of optimistic write behavior";
764     return;
765   }
766
767   // Tell the connect callback to perform a write when the connect succeeds
768   WriteCallback wcb2;
769   scoped_array<char> buf2(new char[size2]);
770   memset(buf2.get(), 'b', size2);
771   if (size2 > 0) {
772     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
773     // Tell the second write callback to close the connection when it is done
774     wcb2.successCallback = [&] { socket->closeNow(); };
775   }
776
777   // Schedule one write() immediately, before the connect finishes
778   scoped_array<char> buf1(new char[size1]);
779   memset(buf1.get(), 'a', size1);
780   WriteCallback wcb1;
781   if (size1 > 0) {
782     socket->write(&wcb1, buf1.get(), size1);
783   }
784
785   if (close) {
786     // immediately perform a close, before connect() completes
787     socket->close();
788   }
789
790   // Start reading from the other endpoint after 10ms.
791   // If we're using large buffers, we have to read so that the writes don't
792   // block forever.
793   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
794   ReadCallback rcb;
795   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
796                                         acceptedSocket.get(), &rcb);
797   socket->getEventBase()->tryRunAfterDelay(
798       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
799       10);
800
801   // Loop.  We don't bother accepting on the server socket yet.
802   // The kernel should be able to buffer the write request so it can succeed.
803   evb.loop();
804
805   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
806   if (size1 > 0) {
807     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
808   }
809   if (size2 > 0) {
810     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
811   }
812
813   socket->close();
814
815   // Make sure the read callback received all of the data
816   size_t bytesRead = 0;
817   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
818        it != rcb.buffers.end();
819        ++it) {
820     size_t start = bytesRead;
821     bytesRead += it->length;
822     size_t end = bytesRead;
823     if (start < size1) {
824       size_t cmpLen = min(size1, end) - start;
825       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
826     }
827     if (end > size1 && end <= size1 + size2) {
828       size_t itOffset;
829       size_t buf2Offset;
830       size_t cmpLen;
831       if (start >= size1) {
832         itOffset = 0;
833         buf2Offset = start - size1;
834         cmpLen = end - start;
835       } else {
836         itOffset = size1 - start;
837         buf2Offset = 0;
838         cmpLen = end - size1;
839       }
840       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
841                                cmpLen),
842                         0);
843     }
844   }
845   CHECK_EQ(bytesRead, size1 + size2);
846 }
847
848 TEST(AsyncSocketTest, ConnectCallbackWrite) {
849   // Test using small writes that should both succeed immediately
850   testConnectOptWrite(100, 200);
851
852   // Test using a large buffer in the connect callback, that should block
853   const size_t largeSize = 8*1024*1024;
854   testConnectOptWrite(100, largeSize);
855
856   // Test using a large initial write
857   testConnectOptWrite(largeSize, 100);
858
859   // Test using two large buffers
860   testConnectOptWrite(largeSize, largeSize);
861
862   // Test a small write in the connect callback,
863   // but no immediate write before connect completes
864   testConnectOptWrite(0, 64);
865
866   // Test a large write in the connect callback,
867   // but no immediate write before connect completes
868   testConnectOptWrite(0, largeSize);
869
870   // Test connect, a small write, then immediately call close() before connect
871   // completes
872   testConnectOptWrite(211, 0, true);
873
874   // Test connect, a large immediate write (that will block), then immediately
875   // call close() before connect completes
876   testConnectOptWrite(largeSize, 0, true);
877 }
878
879 ///////////////////////////////////////////////////////////////////////////
880 // write() related tests
881 ///////////////////////////////////////////////////////////////////////////
882
883 /**
884  * Test writing using a nullptr callback
885  */
886 TEST(AsyncSocketTest, WriteNullCallback) {
887   TestServer server;
888
889   // connect()
890   EventBase evb;
891   std::shared_ptr<AsyncSocket> socket =
892     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
893   evb.loop(); // loop until the socket is connected
894
895   // write() with a nullptr callback
896   char buf[128];
897   memset(buf, 'a', sizeof(buf));
898   socket->write(nullptr, buf, sizeof(buf));
899
900   evb.loop(); // loop until the data is sent
901
902   // Make sure the server got a connection and received the data
903   socket->close();
904   server.verifyConnection(buf, sizeof(buf));
905 }
906
907 /**
908  * Test writing with a send timeout
909  */
910 TEST(AsyncSocketTest, WriteTimeout) {
911   TestServer server;
912
913   // connect()
914   EventBase evb;
915   std::shared_ptr<AsyncSocket> socket =
916     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
917   evb.loop(); // loop until the socket is connected
918
919   // write() a large chunk of data, with no-one on the other end reading
920   size_t writeLength = 8*1024*1024;
921   uint32_t timeout = 200;
922   socket->setSendTimeout(timeout);
923   scoped_array<char> buf(new char[writeLength]);
924   memset(buf.get(), 'a', writeLength);
925   WriteCallback wcb;
926   socket->write(&wcb, buf.get(), writeLength);
927
928   TimePoint start;
929   evb.loop();
930   TimePoint end;
931
932   // Make sure the write attempt timed out as requested
933   CHECK_EQ(wcb.state, STATE_FAILED);
934   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
935
936   // Check that the write timed out within a reasonable period of time.
937   // We don't check for exactly the specified timeout, since AsyncSocket only
938   // times out when it hasn't made progress for that period of time.
939   //
940   // On linux, the first write sends a few hundred kb of data, then blocks for
941   // writability, and then unblocks again after 40ms and is able to write
942   // another smaller of data before blocking permanently.  Therefore it doesn't
943   // time out until 40ms + timeout.
944   //
945   // I haven't fully verified the cause of this, but I believe it probably
946   // occurs because the receiving end delays sending an ack for up to 40ms.
947   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
948   // the ack, it can send some more data.  However, after that point the
949   // receiver's kernel buffer is full.  This 40ms delay happens even with
950   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
951   // kernel may be automatically disabling TCP_QUICKACK after receiving some
952   // data.
953   //
954   // For now, we simply check that the timeout occurred within 160ms of
955   // the requested value.
956   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
957 }
958
959 /**
960  * Test writing to a socket that the remote endpoint has closed
961  */
962 TEST(AsyncSocketTest, WritePipeError) {
963   TestServer server;
964
965   // connect()
966   EventBase evb;
967   std::shared_ptr<AsyncSocket> socket =
968     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
969   socket->setSendTimeout(1000);
970   evb.loop(); // loop until the socket is connected
971
972   // accept and immediately close the socket
973   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
974   acceptedSocket.reset();
975
976   // write() a large chunk of data
977   size_t writeLength = 8*1024*1024;
978   scoped_array<char> buf(new char[writeLength]);
979   memset(buf.get(), 'a', writeLength);
980   WriteCallback wcb;
981   socket->write(&wcb, buf.get(), writeLength);
982
983   evb.loop();
984
985   // Make sure the write failed.
986   // It would be nice if AsyncSocketException could convey the errno value,
987   // so that we could check for EPIPE
988   CHECK_EQ(wcb.state, STATE_FAILED);
989   CHECK_EQ(wcb.exception.getType(),
990                     AsyncSocketException::INTERNAL_ERROR);
991 }
992
993 /**
994  * Test writing a mix of simple buffers and IOBufs
995  */
996 TEST(AsyncSocketTest, WriteIOBuf) {
997   TestServer server;
998
999   // connect()
1000   EventBase evb;
1001   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1002   ConnCallback ccb;
1003   socket->connect(&ccb, server.getAddress(), 30);
1004
1005   // Accept the connection
1006   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1007   ReadCallback rcb;
1008   acceptedSocket->setReadCB(&rcb);
1009
1010   // Write a simple buffer to the socket
1011   size_t simpleBufLength = 5;
1012   char simpleBuf[simpleBufLength];
1013   memset(simpleBuf, 'a', simpleBufLength);
1014   WriteCallback wcb;
1015   socket->write(&wcb, simpleBuf, simpleBufLength);
1016
1017   // Write a single-element IOBuf chain
1018   size_t buf1Length = 7;
1019   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1020   memset(buf1->writableData(), 'b', buf1Length);
1021   buf1->append(buf1Length);
1022   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1023   WriteCallback wcb2;
1024   socket->writeChain(&wcb2, std::move(buf1));
1025
1026   // Write a multiple-element IOBuf chain
1027   size_t buf2Length = 11;
1028   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1029   memset(buf2->writableData(), 'c', buf2Length);
1030   buf2->append(buf2Length);
1031   size_t buf3Length = 13;
1032   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1033   memset(buf3->writableData(), 'd', buf3Length);
1034   buf3->append(buf3Length);
1035   buf2->appendChain(std::move(buf3));
1036   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1037   buf2Copy->coalesce();
1038   WriteCallback wcb3;
1039   socket->writeChain(&wcb3, std::move(buf2));
1040   socket->shutdownWrite();
1041
1042   // Let the reads and writes run to completion
1043   evb.loop();
1044
1045   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1046   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1047   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1048
1049   // Make sure the reader got the right data in the right order
1050   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1051   CHECK_EQ(rcb.buffers.size(), 1);
1052   CHECK_EQ(rcb.buffers[0].length,
1053       simpleBufLength + buf1Length + buf2Length + buf3Length);
1054   CHECK_EQ(
1055       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1056   CHECK_EQ(
1057       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1058           buf1Copy->data(), buf1Copy->length()), 0);
1059   CHECK_EQ(
1060       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1061           buf2Copy->data(), buf2Copy->length()), 0);
1062
1063   acceptedSocket->close();
1064   socket->close();
1065 }
1066
1067 TEST(AsyncSocketTest, WriteIOBufCorked) {
1068   TestServer server;
1069
1070   // connect()
1071   EventBase evb;
1072   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1073   ConnCallback ccb;
1074   socket->connect(&ccb, server.getAddress(), 30);
1075
1076   // Accept the connection
1077   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1078   ReadCallback rcb;
1079   acceptedSocket->setReadCB(&rcb);
1080
1081   // Do three writes, 100ms apart, with the "cork" flag set
1082   // on the second write.  The reader should see the first write
1083   // arrive by itself, followed by the second and third writes
1084   // arriving together.
1085   size_t buf1Length = 5;
1086   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1087   memset(buf1->writableData(), 'a', buf1Length);
1088   buf1->append(buf1Length);
1089   size_t buf2Length = 7;
1090   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1091   memset(buf2->writableData(), 'b', buf2Length);
1092   buf2->append(buf2Length);
1093   size_t buf3Length = 11;
1094   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1095   memset(buf3->writableData(), 'c', buf3Length);
1096   buf3->append(buf3Length);
1097   WriteCallback wcb1;
1098   socket->writeChain(&wcb1, std::move(buf1));
1099   WriteCallback wcb2;
1100   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1101   write2.scheduleTimeout(100);
1102   WriteCallback wcb3;
1103   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1104   write3.scheduleTimeout(200);
1105
1106   evb.loop();
1107   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1108   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1109   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1110   if (wcb3.state != STATE_SUCCEEDED) {
1111     throw(wcb3.exception);
1112   }
1113   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1114
1115   // Make sure the reader got the data with the right grouping
1116   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1117   CHECK_EQ(rcb.buffers.size(), 2);
1118   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1119   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1120
1121   acceptedSocket->close();
1122   socket->close();
1123 }
1124
1125 /**
1126  * Test performing a zero-length write
1127  */
1128 TEST(AsyncSocketTest, ZeroLengthWrite) {
1129   TestServer server;
1130
1131   // connect()
1132   EventBase evb;
1133   std::shared_ptr<AsyncSocket> socket =
1134     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1135   evb.loop(); // loop until the socket is connected
1136
1137   auto acceptedSocket = server.acceptAsync(&evb);
1138   ReadCallback rcb;
1139   acceptedSocket->setReadCB(&rcb);
1140
1141   size_t len1 = 1024*1024;
1142   size_t len2 = 1024*1024;
1143   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1144   memset(buf.get(), 'a', len1);
1145   memset(buf.get(), 'b', len2);
1146
1147   WriteCallback wcb1;
1148   WriteCallback wcb2;
1149   WriteCallback wcb3;
1150   WriteCallback wcb4;
1151   socket->write(&wcb1, buf.get(), 0);
1152   socket->write(&wcb2, buf.get(), len1);
1153   socket->write(&wcb3, buf.get() + len1, 0);
1154   socket->write(&wcb4, buf.get() + len1, len2);
1155   socket->close();
1156
1157   evb.loop(); // loop until the data is sent
1158
1159   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1160   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1161   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1162   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1163   rcb.verifyData(buf.get(), len1 + len2);
1164 }
1165
1166 TEST(AsyncSocketTest, ZeroLengthWritev) {
1167   TestServer server;
1168
1169   // connect()
1170   EventBase evb;
1171   std::shared_ptr<AsyncSocket> socket =
1172     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1173   evb.loop(); // loop until the socket is connected
1174
1175   auto acceptedSocket = server.acceptAsync(&evb);
1176   ReadCallback rcb;
1177   acceptedSocket->setReadCB(&rcb);
1178
1179   size_t len1 = 1024*1024;
1180   size_t len2 = 1024*1024;
1181   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1182   memset(buf.get(), 'a', len1);
1183   memset(buf.get(), 'b', len2);
1184
1185   WriteCallback wcb;
1186   size_t iovCount = 4;
1187   struct iovec iov[iovCount];
1188   iov[0].iov_base = buf.get();
1189   iov[0].iov_len = len1;
1190   iov[1].iov_base = buf.get() + len1;
1191   iov[1].iov_len = 0;
1192   iov[2].iov_base = buf.get() + len1;
1193   iov[2].iov_len = len2;
1194   iov[3].iov_base = buf.get() + len1 + len2;
1195   iov[3].iov_len = 0;
1196
1197   socket->writev(&wcb, iov, iovCount);
1198   socket->close();
1199   evb.loop(); // loop until the data is sent
1200
1201   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1202   rcb.verifyData(buf.get(), len1 + len2);
1203 }
1204
1205 ///////////////////////////////////////////////////////////////////////////
1206 // close() related tests
1207 ///////////////////////////////////////////////////////////////////////////
1208
1209 /**
1210  * Test calling close() with pending writes when the socket is already closing.
1211  */
1212 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1213   TestServer server;
1214
1215   // connect()
1216   EventBase evb;
1217   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1218   ConnCallback ccb;
1219   socket->connect(&ccb, server.getAddress(), 30);
1220
1221   // accept the socket on the server side
1222   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1223
1224   // Loop to ensure the connect has completed
1225   evb.loop();
1226
1227   // Make sure we are connected
1228   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1229
1230   // Schedule pending writes, until several write attempts have blocked
1231   char buf[128];
1232   memset(buf, 'a', sizeof(buf));
1233   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1234   WriteCallbackVector writeCallbacks;
1235
1236   writeCallbacks.reserve(5);
1237   while (writeCallbacks.size() < 5) {
1238     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1239
1240     socket->write(wcb.get(), buf, sizeof(buf));
1241     if (wcb->state == STATE_SUCCEEDED) {
1242       // Succeeded immediately.  Keep performing more writes
1243       continue;
1244     }
1245
1246     // This write is blocked.
1247     // Have the write callback call close() when writeError() is invoked
1248     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1249     writeCallbacks.push_back(wcb);
1250   }
1251
1252   // Call closeNow() to immediately fail the pending writes
1253   socket->closeNow();
1254
1255   // Make sure writeError() was invoked on all of the pending write callbacks
1256   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1257        it != writeCallbacks.end();
1258        ++it) {
1259     CHECK_EQ((*it)->state, STATE_FAILED);
1260   }
1261 }
1262
1263 ///////////////////////////////////////////////////////////////////////////
1264 // ImmediateRead related tests
1265 ///////////////////////////////////////////////////////////////////////////
1266
1267 /* AsyncSocket use to verify immediate read works */
1268 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1269  public:
1270   bool immediateReadCalled = false;
1271   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1272  protected:
1273   void checkForImmediateRead() noexcept override {
1274     immediateReadCalled = true;
1275     AsyncSocket::handleRead();
1276   }
1277 };
1278
1279 TEST(AsyncSocket, ConnectReadImmediateRead) {
1280   TestServer server;
1281
1282   const size_t maxBufferSz = 100;
1283   const size_t maxReadsPerEvent = 1;
1284   const size_t expectedDataSz = maxBufferSz * 3;
1285   char expectedData[expectedDataSz];
1286   memset(expectedData, 'j', expectedDataSz);
1287
1288   EventBase evb;
1289   ReadCallback rcb(maxBufferSz);
1290   AsyncSocketImmediateRead socket(&evb);
1291   socket.connect(nullptr, server.getAddress(), 30);
1292
1293   evb.loop(); // loop until the socket is connected
1294
1295   socket.setReadCB(&rcb);
1296   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1297   socket.immediateReadCalled = false;
1298
1299   auto acceptedSocket = server.acceptAsync(&evb);
1300
1301   ReadCallback rcbServer;
1302   WriteCallback wcbServer;
1303   rcbServer.dataAvailableCallback = [&]() {
1304     if (rcbServer.dataRead() == expectedDataSz) {
1305       // write back all data read
1306       rcbServer.verifyData(expectedData, expectedDataSz);
1307       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1308       acceptedSocket->close();
1309     }
1310   };
1311   acceptedSocket->setReadCB(&rcbServer);
1312
1313   // write data
1314   WriteCallback wcb1;
1315   socket.write(&wcb1, expectedData, expectedDataSz);
1316   evb.loop();
1317   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1318   rcb.verifyData(expectedData, expectedDataSz);
1319   CHECK_EQ(socket.immediateReadCalled, true);
1320 }
1321
1322 TEST(AsyncSocket, ConnectReadUninstallRead) {
1323   TestServer server;
1324
1325   const size_t maxBufferSz = 100;
1326   const size_t maxReadsPerEvent = 1;
1327   const size_t expectedDataSz = maxBufferSz * 3;
1328   char expectedData[expectedDataSz];
1329   memset(expectedData, 'k', expectedDataSz);
1330
1331   EventBase evb;
1332   ReadCallback rcb(maxBufferSz);
1333   AsyncSocketImmediateRead socket(&evb);
1334   socket.connect(nullptr, server.getAddress(), 30);
1335
1336   evb.loop(); // loop until the socket is connected
1337
1338   socket.setReadCB(&rcb);
1339   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1340   socket.immediateReadCalled = false;
1341
1342   auto acceptedSocket = server.acceptAsync(&evb);
1343
1344   ReadCallback rcbServer;
1345   WriteCallback wcbServer;
1346   rcbServer.dataAvailableCallback = [&]() {
1347     if (rcbServer.dataRead() == expectedDataSz) {
1348       // write back all data read
1349       rcbServer.verifyData(expectedData, expectedDataSz);
1350       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1351       acceptedSocket->close();
1352     }
1353   };
1354   acceptedSocket->setReadCB(&rcbServer);
1355
1356   rcb.dataAvailableCallback = [&]() {
1357     // we read data and reset readCB
1358     socket.setReadCB(nullptr);
1359   };
1360
1361   // write data
1362   WriteCallback wcb;
1363   socket.write(&wcb, expectedData, expectedDataSz);
1364   evb.loop();
1365   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1366
1367   /* we shoud've only read maxBufferSz data since readCallback_
1368    * was reset in dataAvailableCallback */
1369   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1370   CHECK_EQ(socket.immediateReadCalled, false);
1371 }
1372
1373 // TODO:
1374 // - Test connect() and have the connect callback set the read callback
1375 // - Test connect() and have the connect callback unset the read callback
1376 // - Test reading/writing/closing/destroying the socket in the connect callback
1377 // - Test reading/writing/closing/destroying the socket in the read callback
1378 // - Test reading/writing/closing/destroying the socket in the write callback
1379 // - Test one-way shutdown behavior
1380 // - Test changing the EventBase
1381 //
1382 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1383 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1384
1385
1386 ///////////////////////////////////////////////////////////////////////////
1387 // AsyncServerSocket tests
1388 ///////////////////////////////////////////////////////////////////////////
1389
1390 /**
1391  * Helper AcceptCallback class for the test code
1392  * It records the callbacks that were invoked, and also supports calling
1393  * generic std::function objects in each callback.
1394  */
1395 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1396  public:
1397   enum EventType {
1398     TYPE_START,
1399     TYPE_ACCEPT,
1400     TYPE_ERROR,
1401     TYPE_STOP
1402   };
1403   struct EventInfo {
1404     EventInfo(int fd, const folly::SocketAddress& addr)
1405       : type(TYPE_ACCEPT),
1406         fd(fd),
1407         address(addr),
1408         errorMsg() {}
1409     explicit EventInfo(const std::string& msg)
1410       : type(TYPE_ERROR),
1411         fd(-1),
1412         address(),
1413         errorMsg(msg) {}
1414     explicit EventInfo(EventType et)
1415       : type(et),
1416         fd(-1),
1417         address(),
1418         errorMsg() {}
1419
1420     EventType type;
1421     int fd;  // valid for TYPE_ACCEPT
1422     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1423     string errorMsg;  // valid for TYPE_ERROR
1424   };
1425   typedef std::deque<EventInfo> EventList;
1426
1427   TestAcceptCallback()
1428     : connectionAcceptedFn_(),
1429       acceptErrorFn_(),
1430       acceptStoppedFn_(),
1431       events_() {}
1432
1433   std::deque<EventInfo>* getEvents() {
1434     return &events_;
1435   }
1436
1437   void setConnectionAcceptedFn(
1438       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1439     connectionAcceptedFn_ = fn;
1440   }
1441   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1442     acceptErrorFn_ = fn;
1443   }
1444   void setAcceptStartedFn(const std::function<void()>& fn) {
1445     acceptStartedFn_ = fn;
1446   }
1447   void setAcceptStoppedFn(const std::function<void()>& fn) {
1448     acceptStoppedFn_ = fn;
1449   }
1450
1451   void connectionAccepted(
1452       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1453     events_.emplace_back(fd, clientAddr);
1454
1455     if (connectionAcceptedFn_) {
1456       connectionAcceptedFn_(fd, clientAddr);
1457     }
1458   }
1459   void acceptError(const std::exception& ex) noexcept override {
1460     events_.emplace_back(ex.what());
1461
1462     if (acceptErrorFn_) {
1463       acceptErrorFn_(ex);
1464     }
1465   }
1466   void acceptStarted() noexcept override {
1467     events_.emplace_back(TYPE_START);
1468
1469     if (acceptStartedFn_) {
1470       acceptStartedFn_();
1471     }
1472   }
1473   void acceptStopped() noexcept override {
1474     events_.emplace_back(TYPE_STOP);
1475
1476     if (acceptStoppedFn_) {
1477       acceptStoppedFn_();
1478     }
1479   }
1480
1481  private:
1482   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1483   std::function<void(const std::exception&)> acceptErrorFn_;
1484   std::function<void()> acceptStartedFn_;
1485   std::function<void()> acceptStoppedFn_;
1486
1487   std::deque<EventInfo> events_;
1488 };
1489
1490 /**
1491  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1492  */
1493 TEST(AsyncSocketTest, ServerAcceptOptions) {
1494   EventBase eventBase;
1495
1496   // Create a server socket
1497   std::shared_ptr<AsyncServerSocket> serverSocket(
1498       AsyncServerSocket::newSocket(&eventBase));
1499   serverSocket->bind(0);
1500   serverSocket->listen(16);
1501   folly::SocketAddress serverAddress;
1502   serverSocket->getAddress(&serverAddress);
1503
1504   // Add a callback to accept one connection then stop the loop
1505   TestAcceptCallback acceptCallback;
1506   acceptCallback.setConnectionAcceptedFn(
1507     [&](int fd, const folly::SocketAddress& addr) {
1508       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1509     });
1510   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1511     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1512   });
1513   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1514   serverSocket->startAccepting();
1515
1516   // Connect to the server socket
1517   std::shared_ptr<AsyncSocket> socket(
1518       AsyncSocket::newSocket(&eventBase, serverAddress));
1519
1520   eventBase.loop();
1521
1522   // Verify that the server accepted a connection
1523   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1524   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1525                     TestAcceptCallback::TYPE_START);
1526   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1527                     TestAcceptCallback::TYPE_ACCEPT);
1528   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1529                     TestAcceptCallback::TYPE_STOP);
1530   int fd = acceptCallback.getEvents()->at(1).fd;
1531
1532   // The accepted connection should already be in non-blocking mode
1533   int flags = fcntl(fd, F_GETFL, 0);
1534   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1535
1536 #ifndef TCP_NOPUSH
1537   // The accepted connection should already have TCP_NODELAY set
1538   int value;
1539   socklen_t valueLength = sizeof(value);
1540   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1541   CHECK_EQ(rc, 0);
1542   CHECK_EQ(value, 1);
1543 #endif
1544 }
1545
1546 /**
1547  * Test AsyncServerSocket::removeAcceptCallback()
1548  */
1549 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1550   // Create a new AsyncServerSocket
1551   EventBase eventBase;
1552   std::shared_ptr<AsyncServerSocket> serverSocket(
1553       AsyncServerSocket::newSocket(&eventBase));
1554   serverSocket->bind(0);
1555   serverSocket->listen(16);
1556   folly::SocketAddress serverAddress;
1557   serverSocket->getAddress(&serverAddress);
1558
1559   // Add several accept callbacks
1560   TestAcceptCallback cb1;
1561   TestAcceptCallback cb2;
1562   TestAcceptCallback cb3;
1563   TestAcceptCallback cb4;
1564   TestAcceptCallback cb5;
1565   TestAcceptCallback cb6;
1566   TestAcceptCallback cb7;
1567
1568   // Test having callbacks remove other callbacks before them on the list,
1569   // after them on the list, or removing themselves.
1570   //
1571   // Have callback 2 remove callback 3 and callback 5 the first time it is
1572   // called.
1573   int cb2Count = 0;
1574   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1575       std::shared_ptr<AsyncSocket> sock2(
1576         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1577       });
1578   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1579     });
1580   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1581       std::shared_ptr<AsyncSocket> sock3(
1582         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1583     });
1584   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1585   std::shared_ptr<AsyncSocket> sock5(
1586       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1587
1588     });
1589   cb2.setConnectionAcceptedFn(
1590     [&](int fd, const folly::SocketAddress& addr) {
1591       if (cb2Count == 0) {
1592         serverSocket->removeAcceptCallback(&cb3, nullptr);
1593         serverSocket->removeAcceptCallback(&cb5, nullptr);
1594       }
1595       ++cb2Count;
1596     });
1597   // Have callback 6 remove callback 4 the first time it is called,
1598   // and destroy the server socket the second time it is called
1599   int cb6Count = 0;
1600   cb6.setConnectionAcceptedFn(
1601     [&](int fd, const folly::SocketAddress& addr) {
1602       if (cb6Count == 0) {
1603         serverSocket->removeAcceptCallback(&cb4, nullptr);
1604         std::shared_ptr<AsyncSocket> sock6(
1605           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1606         std::shared_ptr<AsyncSocket> sock7(
1607           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1608         std::shared_ptr<AsyncSocket> sock8(
1609           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1610
1611       } else {
1612         serverSocket.reset();
1613       }
1614       ++cb6Count;
1615     });
1616   // Have callback 7 remove itself
1617   cb7.setConnectionAcceptedFn(
1618     [&](int fd, const folly::SocketAddress& addr) {
1619       serverSocket->removeAcceptCallback(&cb7, nullptr);
1620     });
1621
1622   serverSocket->addAcceptCallback(&cb1, nullptr);
1623   serverSocket->addAcceptCallback(&cb2, nullptr);
1624   serverSocket->addAcceptCallback(&cb3, nullptr);
1625   serverSocket->addAcceptCallback(&cb4, nullptr);
1626   serverSocket->addAcceptCallback(&cb5, nullptr);
1627   serverSocket->addAcceptCallback(&cb6, nullptr);
1628   serverSocket->addAcceptCallback(&cb7, nullptr);
1629   serverSocket->startAccepting();
1630
1631   // Make several connections to the socket
1632   std::shared_ptr<AsyncSocket> sock1(
1633       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1634   std::shared_ptr<AsyncSocket> sock4(
1635       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1636
1637   // Loop until we are stopped
1638   eventBase.loop();
1639
1640   // Check to make sure that the expected callbacks were invoked.
1641   //
1642   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1643   // the AcceptCallbacks in round-robin fashion, in the order that they were
1644   // added.  The code is implemented this way right now, but the API doesn't
1645   // explicitly require it be done this way.  If we change the code not to be
1646   // exactly round robin in the future, we can simplify the test checks here.
1647   // (We'll also need to update the termination code, since we expect cb6 to
1648   // get called twice to terminate the loop.)
1649   CHECK_EQ(cb1.getEvents()->size(), 4);
1650   CHECK_EQ(cb1.getEvents()->at(0).type,
1651                     TestAcceptCallback::TYPE_START);
1652   CHECK_EQ(cb1.getEvents()->at(1).type,
1653                     TestAcceptCallback::TYPE_ACCEPT);
1654   CHECK_EQ(cb1.getEvents()->at(2).type,
1655                     TestAcceptCallback::TYPE_ACCEPT);
1656   CHECK_EQ(cb1.getEvents()->at(3).type,
1657                     TestAcceptCallback::TYPE_STOP);
1658
1659   CHECK_EQ(cb2.getEvents()->size(), 4);
1660   CHECK_EQ(cb2.getEvents()->at(0).type,
1661                     TestAcceptCallback::TYPE_START);
1662   CHECK_EQ(cb2.getEvents()->at(1).type,
1663                     TestAcceptCallback::TYPE_ACCEPT);
1664   CHECK_EQ(cb2.getEvents()->at(2).type,
1665                     TestAcceptCallback::TYPE_ACCEPT);
1666   CHECK_EQ(cb2.getEvents()->at(3).type,
1667                     TestAcceptCallback::TYPE_STOP);
1668
1669   CHECK_EQ(cb3.getEvents()->size(), 2);
1670   CHECK_EQ(cb3.getEvents()->at(0).type,
1671                     TestAcceptCallback::TYPE_START);
1672   CHECK_EQ(cb3.getEvents()->at(1).type,
1673                     TestAcceptCallback::TYPE_STOP);
1674
1675   CHECK_EQ(cb4.getEvents()->size(), 3);
1676   CHECK_EQ(cb4.getEvents()->at(0).type,
1677                     TestAcceptCallback::TYPE_START);
1678   CHECK_EQ(cb4.getEvents()->at(1).type,
1679                     TestAcceptCallback::TYPE_ACCEPT);
1680   CHECK_EQ(cb4.getEvents()->at(2).type,
1681                     TestAcceptCallback::TYPE_STOP);
1682
1683   CHECK_EQ(cb5.getEvents()->size(), 2);
1684   CHECK_EQ(cb5.getEvents()->at(0).type,
1685                     TestAcceptCallback::TYPE_START);
1686   CHECK_EQ(cb5.getEvents()->at(1).type,
1687                     TestAcceptCallback::TYPE_STOP);
1688
1689   CHECK_EQ(cb6.getEvents()->size(), 4);
1690   CHECK_EQ(cb6.getEvents()->at(0).type,
1691                     TestAcceptCallback::TYPE_START);
1692   CHECK_EQ(cb6.getEvents()->at(1).type,
1693                     TestAcceptCallback::TYPE_ACCEPT);
1694   CHECK_EQ(cb6.getEvents()->at(2).type,
1695                     TestAcceptCallback::TYPE_ACCEPT);
1696   CHECK_EQ(cb6.getEvents()->at(3).type,
1697                     TestAcceptCallback::TYPE_STOP);
1698
1699   CHECK_EQ(cb7.getEvents()->size(), 3);
1700   CHECK_EQ(cb7.getEvents()->at(0).type,
1701                     TestAcceptCallback::TYPE_START);
1702   CHECK_EQ(cb7.getEvents()->at(1).type,
1703                     TestAcceptCallback::TYPE_ACCEPT);
1704   CHECK_EQ(cb7.getEvents()->at(2).type,
1705                     TestAcceptCallback::TYPE_STOP);
1706 }
1707
1708 /**
1709  * Test AsyncServerSocket::removeAcceptCallback()
1710  */
1711 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1712   // Create a new AsyncServerSocket
1713   EventBase eventBase;
1714   std::shared_ptr<AsyncServerSocket> serverSocket(
1715       AsyncServerSocket::newSocket(&eventBase));
1716   serverSocket->bind(0);
1717   serverSocket->listen(16);
1718   folly::SocketAddress serverAddress;
1719   serverSocket->getAddress(&serverAddress);
1720
1721   // Add several accept callbacks
1722   TestAcceptCallback cb1;
1723   auto thread_id = pthread_self();
1724   cb1.setAcceptStartedFn([&](){
1725     CHECK_NE(thread_id, pthread_self());
1726     thread_id = pthread_self();
1727   });
1728   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1729     CHECK_EQ(thread_id, pthread_self());
1730     serverSocket->removeAcceptCallback(&cb1, nullptr);
1731   });
1732   cb1.setAcceptStoppedFn([&](){
1733     CHECK_EQ(thread_id, pthread_self());
1734   });
1735
1736   // Test having callbacks remove other callbacks before them on the list,
1737   serverSocket->addAcceptCallback(&cb1, nullptr);
1738   serverSocket->startAccepting();
1739
1740   // Make several connections to the socket
1741   std::shared_ptr<AsyncSocket> sock1(
1742       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1743
1744   // Loop in another thread
1745   auto other = std::thread([&](){
1746     eventBase.loop();
1747   });
1748   other.join();
1749
1750   // Check to make sure that the expected callbacks were invoked.
1751   //
1752   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1753   // the AcceptCallbacks in round-robin fashion, in the order that they were
1754   // added.  The code is implemented this way right now, but the API doesn't
1755   // explicitly require it be done this way.  If we change the code not to be
1756   // exactly round robin in the future, we can simplify the test checks here.
1757   // (We'll also need to update the termination code, since we expect cb6 to
1758   // get called twice to terminate the loop.)
1759   CHECK_EQ(cb1.getEvents()->size(), 3);
1760   CHECK_EQ(cb1.getEvents()->at(0).type,
1761                     TestAcceptCallback::TYPE_START);
1762   CHECK_EQ(cb1.getEvents()->at(1).type,
1763                     TestAcceptCallback::TYPE_ACCEPT);
1764   CHECK_EQ(cb1.getEvents()->at(2).type,
1765                     TestAcceptCallback::TYPE_STOP);
1766
1767 }
1768
1769 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1770   // Add a callback to accept one connection then stop accepting
1771   TestAcceptCallback acceptCallback;
1772   acceptCallback.setConnectionAcceptedFn(
1773     [&](int fd, const folly::SocketAddress& addr) {
1774       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1775     });
1776   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1777     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1778   });
1779   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1780   serverSocket->startAccepting();
1781
1782   // Connect to the server socket
1783   EventBase* eventBase = serverSocket->getEventBase();
1784   folly::SocketAddress serverAddress;
1785   serverSocket->getAddress(&serverAddress);
1786   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1787
1788   // Loop to process all events
1789   eventBase->loop();
1790
1791   // Verify that the server accepted a connection
1792   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1793   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1794                     TestAcceptCallback::TYPE_START);
1795   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1796                     TestAcceptCallback::TYPE_ACCEPT);
1797   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1798                     TestAcceptCallback::TYPE_STOP);
1799 }
1800
1801 /* Verify that we don't leak sockets if we are destroyed()
1802  * and there are still writes pending
1803  *
1804  * If destroy() only calls close() instead of closeNow(),
1805  * it would shutdown(writes) on the socket, but it would
1806  * never be close()'d, and the socket would leak
1807  */
1808 TEST(AsyncSocketTest, DestroyCloseTest) {
1809   TestServer server;
1810
1811   // connect()
1812   EventBase clientEB;
1813   EventBase serverEB;
1814   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1815   ConnCallback ccb;
1816   socket->connect(&ccb, server.getAddress(), 30);
1817
1818   // Accept the connection
1819   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1820   ReadCallback rcb;
1821   acceptedSocket->setReadCB(&rcb);
1822
1823   // Write a large buffer to the socket that is larger than kernel buffer
1824   size_t simpleBufLength = 5000000;
1825   char* simpleBuf = new char[simpleBufLength];
1826   memset(simpleBuf, 'a', simpleBufLength);
1827   WriteCallback wcb;
1828
1829   // Let the reads and writes run to completion
1830   int fd = acceptedSocket->getFd();
1831
1832   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1833   socket.reset();
1834   acceptedSocket.reset();
1835
1836   // Test that server socket was closed
1837   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1838   CHECK_EQ(sz, -1);
1839   CHECK_EQ(errno, 9);
1840   delete[] simpleBuf;
1841 }
1842
1843 /**
1844  * Test AsyncServerSocket::useExistingSocket()
1845  */
1846 TEST(AsyncSocketTest, ServerExistingSocket) {
1847   EventBase eventBase;
1848
1849   // Test creating a socket, and letting AsyncServerSocket bind and listen
1850   {
1851     // Manually create a socket
1852     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1853     ASSERT_GE(fd, 0);
1854
1855     // Create a server socket
1856     AsyncServerSocket::UniquePtr serverSocket(
1857         new AsyncServerSocket(&eventBase));
1858     serverSocket->useExistingSocket(fd);
1859     folly::SocketAddress address;
1860     serverSocket->getAddress(&address);
1861     address.setPort(0);
1862     serverSocket->bind(address);
1863     serverSocket->listen(16);
1864
1865     // Make sure the socket works
1866     serverSocketSanityTest(serverSocket.get());
1867   }
1868
1869   // Test creating a socket and binding manually,
1870   // then letting AsyncServerSocket listen
1871   {
1872     // Manually create a socket
1873     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1874     ASSERT_GE(fd, 0);
1875     // bind
1876     struct sockaddr_in addr;
1877     addr.sin_family = AF_INET;
1878     addr.sin_port = 0;
1879     addr.sin_addr.s_addr = INADDR_ANY;
1880     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1881                              sizeof(addr)), 0);
1882     // Look up the address that we bound to
1883     folly::SocketAddress boundAddress;
1884     boundAddress.setFromLocalAddress(fd);
1885
1886     // Create a server socket
1887     AsyncServerSocket::UniquePtr serverSocket(
1888         new AsyncServerSocket(&eventBase));
1889     serverSocket->useExistingSocket(fd);
1890     serverSocket->listen(16);
1891
1892     // Make sure AsyncServerSocket reports the same address that we bound to
1893     folly::SocketAddress serverSocketAddress;
1894     serverSocket->getAddress(&serverSocketAddress);
1895     CHECK_EQ(boundAddress, serverSocketAddress);
1896
1897     // Make sure the socket works
1898     serverSocketSanityTest(serverSocket.get());
1899   }
1900
1901   // Test creating a socket, binding and listening manually,
1902   // then giving it to AsyncServerSocket
1903   {
1904     // Manually create a socket
1905     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1906     ASSERT_GE(fd, 0);
1907     // bind
1908     struct sockaddr_in addr;
1909     addr.sin_family = AF_INET;
1910     addr.sin_port = 0;
1911     addr.sin_addr.s_addr = INADDR_ANY;
1912     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1913                              sizeof(addr)), 0);
1914     // Look up the address that we bound to
1915     folly::SocketAddress boundAddress;
1916     boundAddress.setFromLocalAddress(fd);
1917     // listen
1918     CHECK_EQ(listen(fd, 16), 0);
1919
1920     // Create a server socket
1921     AsyncServerSocket::UniquePtr serverSocket(
1922         new AsyncServerSocket(&eventBase));
1923     serverSocket->useExistingSocket(fd);
1924
1925     // Make sure AsyncServerSocket reports the same address that we bound to
1926     folly::SocketAddress serverSocketAddress;
1927     serverSocket->getAddress(&serverSocketAddress);
1928     CHECK_EQ(boundAddress, serverSocketAddress);
1929
1930     // Make sure the socket works
1931     serverSocketSanityTest(serverSocket.get());
1932   }
1933 }
1934
1935 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1936   EventBase eventBase;
1937
1938   // Create a server socket
1939   std::shared_ptr<AsyncServerSocket> serverSocket(
1940       AsyncServerSocket::newSocket(&eventBase));
1941   string path(1, 0);
1942   path.append("/anonymous");
1943   folly::SocketAddress serverAddress;
1944   serverAddress.setFromPath(path);
1945   serverSocket->bind(serverAddress);
1946   serverSocket->listen(16);
1947
1948   // Add a callback to accept one connection then stop the loop
1949   TestAcceptCallback acceptCallback;
1950   acceptCallback.setConnectionAcceptedFn(
1951     [&](int fd, const folly::SocketAddress& addr) {
1952       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1953     });
1954   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1955     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1956   });
1957   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1958   serverSocket->startAccepting();
1959
1960   // Connect to the server socket
1961   std::shared_ptr<AsyncSocket> socket(
1962       AsyncSocket::newSocket(&eventBase, serverAddress));
1963
1964   eventBase.loop();
1965
1966   // Verify that the server accepted a connection
1967   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1968   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1969                     TestAcceptCallback::TYPE_START);
1970   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1971                     TestAcceptCallback::TYPE_ACCEPT);
1972   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1973                     TestAcceptCallback::TYPE_STOP);
1974   int fd = acceptCallback.getEvents()->at(1).fd;
1975
1976   // The accepted connection should already be in non-blocking mode
1977   int flags = fcntl(fd, F_GETFL, 0);
1978   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1979 }