2017
[folly.git] / folly / fibers / Baton.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "Baton.h"
17
18 #include <chrono>
19
20 #include <folly/detail/MemoryIdler.h>
21 #include <folly/fibers/FiberManagerInternal.h>
22 #include <folly/portability/Asm.h>
23
24 namespace folly {
25 namespace fibers {
26
27 void Baton::wait() {
28   wait([]() {});
29 }
30
31 void Baton::wait(TimeoutHandler& timeoutHandler) {
32   auto timeoutFunc = [this, &timeoutHandler] {
33     if (!try_wait()) {
34       postHelper(TIMEOUT);
35     }
36     timeoutHandler.timeoutPtr_ = 0;
37   };
38   timeoutHandler.timeoutFunc_ = std::ref(timeoutFunc);
39   timeoutHandler.fiberManager_ = FiberManager::getFiberManagerUnsafe();
40   wait();
41   timeoutHandler.cancelTimeout();
42 }
43
44 bool Baton::timed_wait(TimeoutController::Duration timeout) {
45   return timed_wait(timeout, []() {});
46 }
47
48 void Baton::waitThread() {
49   if (spinWaitForEarlyPost()) {
50     assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
51     return;
52   }
53
54   auto fiber = waitingFiber_.load();
55
56   if (LIKELY(
57           fiber == NO_WAITER &&
58           waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
59     do {
60       folly::detail::MemoryIdler::futexWait(
61           futex_.futex, uint32_t(THREAD_WAITING));
62       fiber = waitingFiber_.load(std::memory_order_acquire);
63     } while (fiber == THREAD_WAITING);
64   }
65
66   if (LIKELY(fiber == POSTED)) {
67     return;
68   }
69
70   // Handle errors
71   if (fiber == TIMEOUT) {
72     throw std::logic_error("Thread baton can't have timeout status");
73   }
74   if (fiber == THREAD_WAITING) {
75     throw std::logic_error("Other thread is already waiting on this baton");
76   }
77   throw std::logic_error("Other fiber is already waiting on this baton");
78 }
79
80 bool Baton::spinWaitForEarlyPost() {
81   static_assert(
82       PreBlockAttempts > 0,
83       "isn't this assert clearer than an uninitialized variable warning?");
84   for (int i = 0; i < PreBlockAttempts; ++i) {
85     if (try_wait()) {
86       // hooray!
87       return true;
88     }
89     // The pause instruction is the polite way to spin, but it doesn't
90     // actually affect correctness to omit it if we don't have it.
91     // Pausing donates the full capabilities of the current core to
92     // its other hyperthreads for a dozen cycles or so
93     asm_volatile_pause();
94   }
95
96   return false;
97 }
98
99 bool Baton::timedWaitThread(TimeoutController::Duration timeout) {
100   if (spinWaitForEarlyPost()) {
101     assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
102     return true;
103   }
104
105   auto fiber = waitingFiber_.load();
106
107   if (LIKELY(
108           fiber == NO_WAITER &&
109           waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
110     auto deadline = TimeoutController::Clock::now() + timeout;
111     do {
112       const auto wait_rv =
113           futex_.futex.futexWaitUntil(uint32_t(THREAD_WAITING), deadline);
114       if (wait_rv == folly::detail::FutexResult::TIMEDOUT) {
115         return false;
116       }
117       fiber = waitingFiber_.load(std::memory_order_relaxed);
118     } while (fiber == THREAD_WAITING);
119   }
120
121   if (LIKELY(fiber == POSTED)) {
122     return true;
123   }
124
125   // Handle errors
126   if (fiber == TIMEOUT) {
127     throw std::logic_error("Thread baton can't have timeout status");
128   }
129   if (fiber == THREAD_WAITING) {
130     throw std::logic_error("Other thread is already waiting on this baton");
131   }
132   throw std::logic_error("Other fiber is already waiting on this baton");
133 }
134
135 void Baton::post() {
136   postHelper(POSTED);
137 }
138
139 void Baton::postHelper(intptr_t new_value) {
140   auto fiber = waitingFiber_.load();
141
142   do {
143     if (fiber == THREAD_WAITING) {
144       assert(new_value == POSTED);
145
146       return postThread();
147     }
148
149     if (fiber == POSTED || fiber == TIMEOUT) {
150       return;
151     }
152   } while (!waitingFiber_.compare_exchange_weak(fiber, new_value));
153
154   if (fiber != NO_WAITER) {
155     reinterpret_cast<Fiber*>(fiber)->resume();
156   }
157 }
158
159 bool Baton::try_wait() {
160   auto state = waitingFiber_.load();
161   return state == POSTED;
162 }
163
164 void Baton::postThread() {
165   auto expected = THREAD_WAITING;
166
167   if (!waitingFiber_.compare_exchange_strong(expected, POSTED)) {
168     return;
169   }
170
171   futex_.futex.futexWake(1);
172 }
173
174 void Baton::reset() {
175   waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);
176   ;
177 }
178
179 void Baton::TimeoutHandler::scheduleTimeout(
180     TimeoutController::Duration timeout) {
181   assert(fiberManager_ != nullptr);
182   assert(timeoutFunc_ != nullptr);
183   assert(timeoutPtr_ == 0);
184
185   if (timeout.count() > 0) {
186     timeoutPtr_ =
187         fiberManager_->timeoutManager_->registerTimeout(timeoutFunc_, timeout);
188   }
189 }
190
191 void Baton::TimeoutHandler::cancelTimeout() {
192   if (timeoutPtr_) {
193     fiberManager_->timeoutManager_->cancel(timeoutPtr_);
194   }
195 }
196 }
197 }