a37b92ff1fd553623d51faca96754d82a5d76e28
[folly.git] / folly / futures / ManualExecutor.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18 #include <folly/LifoSem.h>
19 #include <folly/futures/DrivableExecutor.h>
20 #include <folly/futures/ScheduledExecutor.h>
21 #include <memory>
22 #include <mutex>
23 #include <queue>
24 #include <cstdio>
25
26 namespace folly {
27   /// A ManualExecutor only does work when you turn the crank, by calling
28   /// run() or indirectly with makeProgress() or waitFor().
29   ///
30   /// The clock for a manual executor starts at 0 and advances only when you
31   /// ask it to. i.e. time is also under manual control.
32   ///
33   /// NB No attempt has been made to make anything other than add and schedule
34   /// threadsafe.
35   class ManualExecutor : public DrivableExecutor,
36                          public ScheduledExecutor {
37    public:
38     void add(Func) override;
39
40     /// Do work. Returns the number of functions that were executed (maybe 0).
41     /// Non-blocking, in the sense that we don't wait for work (we can't
42     /// control whether one of the functions blocks).
43     /// This is stable, it will not chase an ever-increasing tail of work.
44     /// This also means, there may be more work available to perform at the
45     /// moment that this returns.
46     size_t run();
47
48     /// Wait for work to do.
49     void wait();
50
51     /// Wait for work to do, and do it.
52     void makeProgress() {
53       wait();
54       run();
55     }
56
57     /// Implements DrivableExecutor
58     void drive() override {
59       makeProgress();
60     }
61
62     /// makeProgress until this Future is ready.
63     template <class F> void waitFor(F const& f) {
64       // TODO(5427828)
65 #if 0
66       while (!f.isReady())
67         makeProgress();
68 #else
69       while (!f.isReady()) {
70         run();
71       }
72 #endif
73
74     }
75
76     virtual void scheduleAt(Func&& f, TimePoint const& t) override {
77       std::lock_guard<std::mutex> lock(lock_);
78       scheduledFuncs_.emplace(t, std::move(f));
79       sem_.post();
80     }
81
82     /// Advance the clock. The clock never advances on its own.
83     /// Advancing the clock causes some work to be done, if work is available
84     /// to do (perhaps newly available because of the advanced clock).
85     /// If dur is <= 0 this is a noop.
86     void advance(Duration const& dur) {
87       advanceTo(now_ + dur);
88     }
89
90     /// Advance the clock to this absolute time. If t is <= now(),
91     /// this is a noop.
92     void advanceTo(TimePoint const& t);
93
94     TimePoint now() override { return now_; }
95
96     /// Flush the function queue. Destroys all stored functions without
97     /// executing them. Returns number of removed functions.
98     std::size_t clear() {
99       std::queue<Func> funcs;
100       std::priority_queue<ScheduledFunc> scheduled_funcs;
101
102       {
103         std::lock_guard<std::mutex> lock(lock_);
104         funcs_.swap(funcs);
105         scheduledFuncs_.swap(scheduled_funcs);
106       }
107
108       return funcs.size() + scheduled_funcs.size();
109     }
110
111    private:
112     std::mutex lock_;
113     std::queue<Func> funcs_;
114     LifoSem sem_;
115
116     // helper class to enable ordering of scheduled events in the priority
117     // queue
118     struct ScheduledFunc {
119       TimePoint time;
120       size_t ordinal;
121       Func mutable func;
122
123       ScheduledFunc(TimePoint const& t, Func&& f)
124         : time(t), func(std::move(f))
125       {
126         static size_t seq = 0;
127         ordinal = seq++;
128       }
129
130       bool operator<(ScheduledFunc const& b) const {
131         if (time == b.time)
132           return ordinal < b.ordinal;
133         return time < b.time;
134       }
135
136       Func&& moveOutFunc() const {
137         return std::move(func);
138       }
139     };
140     std::priority_queue<ScheduledFunc> scheduledFuncs_;
141     TimePoint now_ = TimePoint::min();
142   };
143
144 }