bd685cff9eb1ae10624585ac228af276e362b098
[folly.git] / folly / fibers / Baton.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "Baton.h"
17
18 #include <chrono>
19
20 #include <folly/detail/MemoryIdler.h>
21 #include <folly/fibers/FiberManagerInternal.h>
22 #include <folly/portability/Asm.h>
23
24 namespace folly {
25 namespace fibers {
26
27 void Baton::wait() {
28   wait([]() {});
29 }
30
31 void Baton::wait(TimeoutHandler& timeoutHandler) {
32   auto timeoutFunc = [this, &timeoutHandler] {
33     if (!try_wait()) {
34       postHelper(TIMEOUT);
35     }
36     timeoutHandler.timeoutPtr_ = 0;
37   };
38   timeoutHandler.timeoutFunc_ = std::ref(timeoutFunc);
39   timeoutHandler.fiberManager_ = FiberManager::getFiberManagerUnsafe();
40   wait();
41   timeoutHandler.cancelTimeout();
42 }
43
44 bool Baton::timed_wait(TimeoutController::Duration timeout) {
45   return timed_wait(timeout, []() {});
46 }
47
48 void Baton::waitThread() {
49   if (spinWaitForEarlyPost()) {
50     assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
51     return;
52   }
53
54   auto fiber = waitingFiber_.load();
55
56   if (LIKELY(
57           fiber == NO_WAITER &&
58           waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
59     do {
60       folly::detail::MemoryIdler::futexWait(futex_.futex, THREAD_WAITING);
61       fiber = waitingFiber_.load(std::memory_order_acquire);
62     } while (fiber == THREAD_WAITING);
63   }
64
65   if (LIKELY(fiber == POSTED)) {
66     return;
67   }
68
69   // Handle errors
70   if (fiber == TIMEOUT) {
71     throw std::logic_error("Thread baton can't have timeout status");
72   }
73   if (fiber == THREAD_WAITING) {
74     throw std::logic_error("Other thread is already waiting on this baton");
75   }
76   throw std::logic_error("Other fiber is already waiting on this baton");
77 }
78
79 bool Baton::spinWaitForEarlyPost() {
80   static_assert(
81       PreBlockAttempts > 0,
82       "isn't this assert clearer than an uninitialized variable warning?");
83   for (int i = 0; i < PreBlockAttempts; ++i) {
84     if (try_wait()) {
85       // hooray!
86       return true;
87     }
88     // The pause instruction is the polite way to spin, but it doesn't
89     // actually affect correctness to omit it if we don't have it.
90     // Pausing donates the full capabilities of the current core to
91     // its other hyperthreads for a dozen cycles or so
92     asm_volatile_pause();
93   }
94
95   return false;
96 }
97
98 bool Baton::timedWaitThread(TimeoutController::Duration timeout) {
99   if (spinWaitForEarlyPost()) {
100     assert(waitingFiber_.load(std::memory_order_acquire) == POSTED);
101     return true;
102   }
103
104   auto fiber = waitingFiber_.load();
105
106   if (LIKELY(
107           fiber == NO_WAITER &&
108           waitingFiber_.compare_exchange_strong(fiber, THREAD_WAITING))) {
109     auto deadline = TimeoutController::Clock::now() + timeout;
110     do {
111       const auto wait_rv =
112           futex_.futex.futexWaitUntil(THREAD_WAITING, deadline);
113       if (wait_rv == folly::detail::FutexResult::TIMEDOUT) {
114         return false;
115       }
116       fiber = waitingFiber_.load(std::memory_order_relaxed);
117     } while (fiber == THREAD_WAITING);
118   }
119
120   if (LIKELY(fiber == POSTED)) {
121     return true;
122   }
123
124   // Handle errors
125   if (fiber == TIMEOUT) {
126     throw std::logic_error("Thread baton can't have timeout status");
127   }
128   if (fiber == THREAD_WAITING) {
129     throw std::logic_error("Other thread is already waiting on this baton");
130   }
131   throw std::logic_error("Other fiber is already waiting on this baton");
132 }
133
134 void Baton::post() {
135   postHelper(POSTED);
136 }
137
138 void Baton::postHelper(intptr_t new_value) {
139   auto fiber = waitingFiber_.load();
140
141   do {
142     if (fiber == THREAD_WAITING) {
143       assert(new_value == POSTED);
144
145       return postThread();
146     }
147
148     if (fiber == POSTED || fiber == TIMEOUT) {
149       return;
150     }
151   } while (!waitingFiber_.compare_exchange_weak(fiber, new_value));
152
153   if (fiber != NO_WAITER) {
154     reinterpret_cast<Fiber*>(fiber)->setData(0);
155   }
156 }
157
158 bool Baton::try_wait() {
159   auto state = waitingFiber_.load();
160   return state == POSTED;
161 }
162
163 void Baton::postThread() {
164   auto expected = THREAD_WAITING;
165
166   if (!waitingFiber_.compare_exchange_strong(expected, POSTED)) {
167     return;
168   }
169
170   futex_.futex.futexWake(1);
171 }
172
173 void Baton::reset() {
174   waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);
175   ;
176 }
177
178 void Baton::TimeoutHandler::scheduleTimeout(
179     TimeoutController::Duration timeout) {
180   assert(fiberManager_ != nullptr);
181   assert(timeoutFunc_ != nullptr);
182   assert(timeoutPtr_ == 0);
183
184   if (timeout.count() > 0) {
185     timeoutPtr_ =
186         fiberManager_->timeoutManager_->registerTimeout(timeoutFunc_, timeout);
187   }
188 }
189
190 void Baton::TimeoutHandler::cancelTimeout() {
191   if (timeoutPtr_) {
192     fiberManager_->timeoutManager_->cancel(timeoutPtr_);
193   }
194 }
195 }
196 }