Adds writer test case for RCU
[folly.git] / folly / io / ShutdownSocketSet.cpp
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/ShutdownSocketSet.h>
18
19 #include <chrono>
20 #include <thread>
21
22 #include <glog/logging.h>
23
24 #include <folly/FileUtil.h>
25 #include <folly/Singleton.h>
26 #include <folly/portability/Sockets.h>
27
28 namespace folly {
29
30 namespace {
31 struct PrivateTag {};
32 folly::Singleton<folly::ShutdownSocketSet, PrivateTag> singleton;
33 } // namespace
34
35 ShutdownSocketSet::ShutdownSocketSet(int maxFd)
36     : maxFd_(maxFd),
37       data_(static_cast<std::atomic<uint8_t>*>(
38           folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
39       nullFile_("/dev/null", O_RDWR) {}
40
41 std::shared_ptr<ShutdownSocketSet> ShutdownSocketSet::getInstance() {
42   return singleton.try_get();
43 }
44
45 void ShutdownSocketSet::add(int fd) {
46   // Silently ignore any fds >= maxFd_, very unlikely
47   DCHECK_GE(fd, 0);
48   if (fd >= maxFd_) {
49     return;
50   }
51
52   auto& sref = data_[size_t(fd)];
53   uint8_t prevState = FREE;
54   CHECK(sref.compare_exchange_strong(
55       prevState, IN_USE, std::memory_order_relaxed))
56       << "Invalid prev state for fd " << fd << ": " << int(prevState);
57 }
58
59 void ShutdownSocketSet::remove(int fd) {
60   DCHECK_GE(fd, 0);
61   if (fd >= maxFd_) {
62     return;
63   }
64
65   auto& sref = data_[size_t(fd)];
66   uint8_t prevState = 0;
67
68   prevState = sref.load(std::memory_order_relaxed);
69   do {
70     switch (prevState) {
71       case IN_SHUTDOWN:
72         std::this_thread::sleep_for(std::chrono::milliseconds(1));
73         prevState = sref.load(std::memory_order_relaxed);
74         continue;
75       case FREE:
76         LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
77                    << int(prevState);
78     }
79   } while (
80       !sref.compare_exchange_weak(prevState, FREE, std::memory_order_relaxed));
81 }
82
83 int ShutdownSocketSet::close(int fd) {
84   DCHECK_GE(fd, 0);
85   if (fd >= maxFd_) {
86     return folly::closeNoInt(fd);
87   }
88
89   auto& sref = data_[size_t(fd)];
90   uint8_t prevState = sref.load(std::memory_order_relaxed);
91   uint8_t newState = 0;
92
93   do {
94     switch (prevState) {
95       case IN_USE:
96       case SHUT_DOWN:
97         newState = FREE;
98         break;
99       case IN_SHUTDOWN:
100         newState = MUST_CLOSE;
101         break;
102       default:
103         LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
104                    << int(prevState);
105     }
106   } while (!sref.compare_exchange_weak(
107       prevState, newState, std::memory_order_relaxed));
108
109   return newState == FREE ? folly::closeNoInt(fd) : 0;
110 }
111
112 void ShutdownSocketSet::shutdown(int fd, bool abortive) {
113   DCHECK_GE(fd, 0);
114   if (fd >= maxFd_) {
115     doShutdown(fd, abortive);
116     return;
117   }
118
119   auto& sref = data_[size_t(fd)];
120   uint8_t prevState = IN_USE;
121   if (!sref.compare_exchange_strong(
122           prevState, IN_SHUTDOWN, std::memory_order_relaxed)) {
123     return;
124   }
125
126   doShutdown(fd, abortive);
127
128   prevState = IN_SHUTDOWN;
129   if (sref.compare_exchange_strong(
130           prevState, SHUT_DOWN, std::memory_order_relaxed)) {
131     return;
132   }
133
134   CHECK_EQ(prevState, MUST_CLOSE)
135     << "Invalid prev state for fd " << fd << ": " << int(prevState);
136
137   folly::closeNoInt(fd);  // ignore errors, nothing to do
138
139   CHECK(
140       sref.compare_exchange_strong(prevState, FREE, std::memory_order_relaxed))
141       << "Invalid prev state for fd " << fd << ": " << int(prevState);
142 }
143
144 void ShutdownSocketSet::shutdownAll(bool abortive) {
145   for (int i = 0; i < maxFd_; ++i) {
146     auto& sref = data_[size_t(i)];
147     if (sref.load(std::memory_order_relaxed) == IN_USE) {
148       shutdown(i, abortive);
149     }
150   }
151 }
152
153 void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
154   // shutdown() the socket first, to awaken any threads blocked on the fd
155   // (subsequent IO will fail because it's been shutdown); close()ing the
156   // socket does not wake up blockers, see
157   // http://stackoverflow.com/a/3624545/1736339
158   folly::shutdownNoInt(fd, SHUT_RDWR);
159
160   // If abortive shutdown is desired, we'll set the SO_LINGER option on
161   // the socket with a timeout of 0; this will cause RST to be sent on
162   // close.
163   if (abortive) {
164     struct linger l = {1, 0};
165     if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
166       // Probably not a socket, ignore.
167       return;
168     }
169   }
170
171   // We can't close() the socket, as that would be dangerous; a new file
172   // could be opened and get the same file descriptor, and then code assuming
173   // the old fd would do IO in the wrong place. We'll (atomically) dup2
174   // /dev/null onto the fd instead.
175   folly::dup2NoInt(nullFile_.fd(), fd);
176 }
177
178 } // namespace folly