Move folly/LifoSem.h
[folly.git] / folly / executors / ManualExecutor.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <cstdio>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23
24 #include <folly/executors/DrivableExecutor.h>
25 #include <folly/executors/ScheduledExecutor.h>
26 #include <folly/synchronization/LifoSem.h>
27
28 namespace folly {
29   /// A ManualExecutor only does work when you turn the crank, by calling
30   /// run() or indirectly with makeProgress() or waitFor().
31   ///
32   /// The clock for a manual executor starts at 0 and advances only when you
33   /// ask it to. i.e. time is also under manual control.
34   ///
35   /// NB No attempt has been made to make anything other than add and schedule
36   /// threadsafe.
37   class ManualExecutor : public DrivableExecutor,
38                          public ScheduledExecutor {
39    public:
40     void add(Func) override;
41
42     /// Do work. Returns the number of functions that were executed (maybe 0).
43     /// Non-blocking, in the sense that we don't wait for work (we can't
44     /// control whether one of the functions blocks).
45     /// This is stable, it will not chase an ever-increasing tail of work.
46     /// This also means, there may be more work available to perform at the
47     /// moment that this returns.
48     size_t run();
49
50     // Do work until there is no more work to do.
51     // Returns the number of functions that were executed (maybe 0).
52     // Unlike run, this method is not stable. It will chase an infinite tail of
53     // work so should be used with care.
54     // There will be no work available to perform at the moment that this
55     // returns.
56     size_t drain();
57
58     /// Wait for work to do.
59     void wait();
60
61     /// Wait for work to do, and do it.
62     void makeProgress() {
63       wait();
64       run();
65     }
66
67     /// Implements DrivableExecutor
68     void drive() override {
69       makeProgress();
70     }
71
72     /// makeProgress until this Future is ready.
73     template <class F> void waitFor(F const& f) {
74       // TODO(5427828)
75 #if 0
76       while (!f.isReady())
77         makeProgress();
78 #else
79       while (!f.isReady()) {
80         run();
81       }
82 #endif
83
84     }
85
86     void scheduleAt(Func&& f, TimePoint const& t) override {
87       std::lock_guard<std::mutex> lock(lock_);
88       scheduledFuncs_.emplace(t, std::move(f));
89       sem_.post();
90     }
91
92     /// Advance the clock. The clock never advances on its own.
93     /// Advancing the clock causes some work to be done, if work is available
94     /// to do (perhaps newly available because of the advanced clock).
95     /// If dur is <= 0 this is a noop.
96     void advance(Duration const& dur) {
97       advanceTo(now_ + dur);
98     }
99
100     /// Advance the clock to this absolute time. If t is <= now(),
101     /// this is a noop.
102     void advanceTo(TimePoint const& t);
103
104     TimePoint now() override { return now_; }
105
106     /// Flush the function queue. Destroys all stored functions without
107     /// executing them. Returns number of removed functions.
108     std::size_t clear() {
109       std::queue<Func> funcs;
110       std::priority_queue<ScheduledFunc> scheduled_funcs;
111
112       {
113         std::lock_guard<std::mutex> lock(lock_);
114         funcs_.swap(funcs);
115         scheduledFuncs_.swap(scheduled_funcs);
116       }
117
118       return funcs.size() + scheduled_funcs.size();
119     }
120
121    private:
122     std::mutex lock_;
123     std::queue<Func> funcs_;
124     LifoSem sem_;
125
126     // helper class to enable ordering of scheduled events in the priority
127     // queue
128     struct ScheduledFunc {
129       TimePoint time;
130       size_t ordinal;
131       Func mutable func;
132
133       ScheduledFunc(TimePoint const& t, Func&& f)
134         : time(t), func(std::move(f))
135       {
136         static size_t seq = 0;
137         ordinal = seq++;
138       }
139
140       bool operator<(ScheduledFunc const& b) const {
141         // Earlier-scheduled things must be *higher* priority
142         // in the max-based std::priority_queue
143         if (time == b.time) {
144           return ordinal > b.ordinal;
145         }
146         return time > b.time;
147       }
148
149       Func&& moveOutFunc() const {
150         return std::move(func);
151       }
152     };
153     std::priority_queue<ScheduledFunc> scheduledFuncs_;
154     TimePoint now_ = TimePoint::min();
155   };
156
157   } // namespace folly