9c033503b3bb941d2a0f31f321864f2a1c231723
[folly.git] / folly / experimental / flat_combining / FlatCombining.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Baton.h>
20 #include <folly/Function.h>
21 #include <folly/IndexedMemPool.h>
22 #include <folly/Portability.h>
23 #include <folly/detail/CacheLocality.h>
24
25 #include <atomic>
26 #include <cassert>
27 #include <mutex>
28
29 namespace folly {
30
31 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
32 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
33 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
34 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
35 ///
36 /// FC is an alternative to coarse-grained locking for making
37 /// sequential data structures thread-safe while minimizing the
38 /// synchroniation overheads and cache coherence traffic associated
39 /// with locking.
40 ///
41 /// Under FC, when a thread finds the lock contended, it can
42 /// request (using a request record) that the lock holder execute its
43 /// operation on the shared data structure. There can be a designated
44 /// combiner thread or any thread can act as the combiner when it
45 /// holds the lock.
46 ///
47 /// Potential advantages of FC include:
48 /// - Reduced cache coherence traffic
49 /// - Reduced synchronization overheads, as the overheads of releasing
50 ///   and acquiring the lock are eliminated from the critical path of
51 ///   operating on the data structure.
52 /// - Opportunities for smart combining, where executing multiple
53 ///   operations together may take less time than executng the
54 ///   operations separately, e.g., K delete_min operations on a
55 ///   priority queue may be combined to take O(K + log N) time instead
56 ///   of O(K * log N).
57 ///
58 /// This implementation of flat combining supports:
59
60 /// - A simple interface that requires minimal extra code by the
61 ///   user. To use this interface efficiently the user-provided
62 ///   functions must be copyable to folly::Functio without dynamic
63 ///   allocation. If this is impossible or inconvenient, the user is
64 ///   encouraged to use the custom interface described below.
65 /// - A custom interface that supports custom combinining and custom
66 ///   request structure, either for the sake of smart combining or for
67 ///   efficiently supporting operations that are not be copyable to
68 ///   folly::Function without synamic allocation.
69 /// - Both synchronous and asynchronous operations.
70 /// - Request records with and without thread-caching.
71 /// - Combining with and without a dedicated combiner thread.
72 ///
73 /// This implementation differs from the algorithm in the SPAA 2010 paper:
74 /// - It does not require thread caching of request records
75 /// - It supports a dedicated combiner
76 /// - It supports asynchronous operations
77 ///
78 /// The generic FC class template supports generic data structures and
79 /// utilities with arbitrary operations. The template supports static
80 /// polymorphism for the combining function to enable custom smart
81 /// combining.
82 ///
83 /// A simple example of using the FC template:
84 ///   class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
85 ///     Foo foo_; // sequential data structure
86 ///    public:
87 ///     T bar(V v) { // thread-safe execution of foo_.bar(v)
88 ///       T result;
89 ///       // Note: fn must be copyable to folly::Function without dynamic
90 ///       // allocation. Otherwise, it is recommended to use the custom
91 ///       // interface and manage the function arguments and results
92 ///       // explicitly in a custom request structure.
93 ///       auto fn = [&] { result = foo_.bar(v); };
94 ///       this->requestFC(fn);
95 ///       return result;
96 ///     }
97 ///   };
98 ///
99 /// See test/FlatCombiningExamples.h for more examples. See the
100 /// comments for requestFC() below for a list of simple and custom
101 /// variants of that function.
102
103 template <
104     typename T, // concurrent data structure using FC interface
105     typename Mutex = std::mutex,
106     template <typename> class Atom = std::atomic,
107     typename Req = /* default dummy type */ bool>
108 class FlatCombining {
109   using SavedFn = folly::Function<void()>;
110
111  public:
112   /// Combining request record.
113   class Rec {
114     FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
115     folly::Baton<Atom, true, false> valid_;
116     folly::Baton<Atom, true, false> done_;
117     folly::Baton<Atom, true, false> disconnected_;
118     size_t index_;
119     size_t next_;
120     uint64_t last_;
121     Req req_;
122     SavedFn fn_;
123
124    public:
125     Rec() {
126       setDone();
127       setDisconnected();
128     }
129
130     void setValid() {
131       valid_.post();
132     }
133
134     void clearValid() {
135       valid_.reset();
136     }
137
138     bool isValid() const {
139       return valid_.try_wait();
140     }
141
142     void setDone() {
143       done_.post();
144     }
145
146     void clearDone() {
147       done_.reset();
148     }
149
150     bool isDone() const {
151       return done_.try_wait();
152     }
153
154     void awaitDone() {
155       done_.wait();
156     }
157
158     void setDisconnected() {
159       disconnected_.post();
160     }
161
162     void clearDisconnected() {
163       disconnected_.reset();
164     }
165
166     bool isDisconnected() const {
167       return disconnected_.try_wait();
168     }
169
170     void setIndex(const size_t index) {
171       index_ = index;
172     }
173
174     size_t getIndex() const {
175       return index_;
176     }
177
178     void setNext(const size_t next) {
179       next_ = next;
180     }
181
182     size_t getNext() const {
183       return next_;
184     }
185
186     void setLast(const uint64_t pass) {
187       last_ = pass;
188     }
189
190     uint64_t getLast() const {
191       return last_;
192     }
193
194     Req& getReq() {
195       return req_;
196     }
197
198     template <typename Func>
199     void setFn(Func&& fn) {
200       fn_ = std::forward<Func>(fn);
201       assert(fn_);
202       // If the following assertion is triggered, the user should
203       // either change the provided function, i.e., fn, to fit in
204       // folly::Function without allocation or use the custom
205       // interface to request combining for fn and manage its
206       // arguments (and results, if any) explicitly in a custom
207       // request structure.
208       assert(!fn_.hasAllocatedMemory());
209     }
210
211     void clearFn() {
212       fn_ = {};
213       assert(!fn_);
214     }
215
216     SavedFn& getFn() {
217       return fn_;
218     }
219
220     void complete() {
221       clearValid();
222       assert(!isDone());
223       setDone();
224     }
225   };
226
227   using Pool = folly::IndexedMemPool<Rec, 32, 4, Atom, false, false>;
228
229  public:
230   /// The constructor takes three optional arguments:
231   /// - Optional dedicated combiner thread (default true)
232   /// - Number of records (if 0, then kDefaultNumRecs)
233   /// - A hint for the max. number of combined operations per
234   ///   combining session that is checked at the beginning of each pass
235   ///   on the request records (if 0, then kDefaultMaxops)
236   explicit FlatCombining(
237       const bool dedicated = true,
238       uint32_t numRecs = 0, // number of combining records
239       const uint32_t maxOps = 0 // hint of max ops per combining session
240       )
241       : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
242         maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
243         recs_(NULL_INDEX),
244         dedicated_(dedicated),
245         recsPool_(numRecs_) {
246     if (dedicated_) {
247       // dedicated combiner thread
248       combiner_ = std::thread([this] { dedicatedCombining(); });
249     }
250   }
251
252   /// Destructor: If there is a dedicated combiner, the destructor
253   /// flags it to shutdown. Otherwise, the destructor waits for all
254   /// pending asynchronous requests to be completed.
255   ~FlatCombining() {
256     if (dedicated_) {
257       shutdown();
258       combiner_.join();
259     } else {
260       drainAll();
261     }
262   }
263
264   // Wait for all pending operations to complete. Useful primarily
265   // when there are asynchronous operations without a dedicated
266   // combiner.
267   void drainAll() {
268     for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
269       Rec& rec = recsPool_[i];
270       awaitDone(rec);
271     }
272   }
273
274   // Give the caller exclusive access.
275   void acquireExclusive() {
276     m_.lock();
277   }
278
279   // Try to give the caller exclusive access. Returns true iff successful.
280   bool tryExclusive() {
281     return m_.try_lock();
282   }
283
284   // Release exclusive access. The caller must have exclusive access.
285   void releaseExclusive() {
286     m_.unlock();
287   }
288
289   // Execute an operation without combining
290   template <typename OpFunc>
291   void requestNoFC(OpFunc& opFn) {
292     std::lock_guard<Mutex> guard(m_);
293     opFn();
294   }
295
296   // This function first tries to execute the operation without
297   // combining. If unuccessful, it allocates a combining record if
298   // needed. If there are no available records, it waits for exclusive
299   // access and executes the operation. If a record is available and
300   // ready for use, it fills the record and indicates that the request
301   // is valid for combining. If the request is synchronous (by default
302   // or necessity), it waits for the operation to be completed by a
303   // combiner and optionally extracts the result, if any.
304   //
305   // This function can be called in several forms:
306   //   Simple forms that do not require the user to define a Req structure
307   //   or to override any request processing member functions:
308   //     requestFC(opFn)
309   //     requestFC(opFn, rec) // provides its own pre-allocated record
310   //     requestFC(opFn, rec, syncop) // asynchronous if syncop == false
311   //   Custom forms that require the user to define a Req structure and to
312   //   override some request processing member functions:
313   //     requestFC(opFn, fillFn)
314   //     requestFC(opFn, fillFn, rec)
315   //     requestFC(opFn, fillFn, rec, syncop)
316   //     requestFC(opFn, fillFn, resFn)
317   //     requestFC(opFn, fillFn, resFn, rec)
318   template <typename OpFunc>
319   void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
320     auto dummy = [](Req&) {};
321     requestOp(
322         std::forward<OpFunc>(opFn),
323         dummy /* fillFn */,
324         dummy /* resFn */,
325         rec,
326         syncop,
327         false /* simple */);
328   }
329   template <typename OpFunc, typename FillFunc>
330   void requestFC(
331       OpFunc&& opFn,
332       const FillFunc& fillFn,
333       Rec* rec = nullptr,
334       bool syncop = true) {
335     auto dummy = [](Req&) {};
336     requestOp(
337         std::forward<OpFunc>(opFn),
338         fillFn,
339         dummy /* resFn */,
340         rec,
341         syncop,
342         true /* custom */);
343   }
344   template <typename OpFunc, typename FillFunc, typename ResFn>
345   void requestFC(
346       OpFunc&& opFn,
347       const FillFunc& fillFn,
348       const ResFn& resFn,
349       Rec* rec = nullptr) {
350     // must wait for result to execute resFn -- so it must be synchronous
351     requestOp(
352         std::forward<OpFunc>(opFn),
353         fillFn,
354         resFn,
355         rec,
356         true /* sync */,
357         true /* custom*/);
358   }
359
360   // Allocate a record.
361   Rec* allocRec() {
362     auto idx = recsPool_.allocIndex();
363     if (idx == NULL_INDEX) {
364       outOfSpaceCount_.fetch_add(1);
365       return nullptr;
366     }
367     Rec& rec = recsPool_[idx];
368     rec.setIndex(idx);
369     return &rec;
370   }
371
372   // Free a record
373   void freeRec(Rec* rec) {
374     if (rec == nullptr) {
375       return;
376     }
377     auto idx = rec->getIndex();
378     recsPool_.recycleIndex(idx);
379   }
380
381   // Returns a count of the number of combined operations so far.
382   uint64_t getCombinedOpCount() {
383     std::lock_guard<Mutex> guard(m_);
384     return combined_;
385   }
386
387   // Returns a count of the number of combining passes so far.
388   uint64_t getCombiningPasses() {
389     std::lock_guard<Mutex> guard(m_);
390     return passes_;
391   }
392
393   uint64_t getOutOfSpaceCount() {
394     return outOfSpaceCount_.load();
395   }
396
397  protected:
398   const size_t NULL_INDEX = 0;
399   const uint32_t kDefaultMaxOps = 100;
400   const uint64_t kDefaultNumRecs = 64;
401   const uint64_t kIdleThreshold = 10;
402
403   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
404   Mutex m_;
405
406   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
407   folly::Baton<Atom, false, true> pending_;
408   Atom<bool> shutdown_{false};
409
410   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
411   uint32_t numRecs_;
412   uint32_t maxOps_;
413   Atom<size_t> recs_;
414   bool dedicated_;
415   std::thread combiner_;
416   Pool recsPool_;
417
418   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
419   uint64_t combined_ = 0;
420   uint64_t passes_ = 0;
421   uint64_t sessions_ = 0;
422   Atom<uint64_t> outOfSpaceCount_{0};
423
424   template <typename OpFunc, typename FillFunc, typename ResFn>
425   void requestOp(
426       OpFunc&& opFn,
427       const FillFunc& fillFn,
428       const ResFn& resFn,
429       Rec* rec,
430       bool syncop,
431       const bool custom) {
432     std::unique_lock<Mutex> l(this->m_, std::defer_lock);
433     if (l.try_lock()) {
434       // No contention
435       tryCombining();
436       opFn();
437       return;
438     }
439
440     // Try FC
441     bool tc = (rec != nullptr);
442     if (!tc) {
443       // if an async op doesn't have a thread-cached record then turn
444       // it into a synchronous op.
445       syncop = true;
446       rec = allocRec();
447     }
448     if (rec == nullptr) {
449       // Can't use FC - Must acquire lock
450       l.lock();
451       opFn();
452       return;
453     }
454
455     // Use FC
456     // Wait if record is in use
457     awaitDone(*rec);
458     rec->clearDone();
459     // Fill record
460     if (custom) {
461       // Fill the request (custom)
462       Req& req = rec->getReq();
463       fillFn(req);
464       rec->clearFn();
465     } else {
466       rec->setFn(std::forward<OpFunc>(opFn));
467     }
468     // Indicate that record is valid
469     assert(!rec->isValid());
470     rec->setValid();
471     // end of combining critical path
472     setPending();
473     // store-load order setValid before isDisconnected
474     std::atomic_thread_fence(std::memory_order_seq_cst);
475     if (rec->isDisconnected()) {
476       rec->clearDisconnected();
477       pushRec(rec->getIndex());
478       setPending();
479     }
480     // If synchronous wait for the request to be completed
481     if (syncop) {
482       awaitDone(*rec);
483       if (custom) {
484         Req& req = rec->getReq();
485         resFn(req); // Extract the result (custom)
486       }
487       if (!tc) {
488         freeRec(rec); // Free the temporary record.
489       }
490     }
491   }
492
493   void pushRec(size_t idx) {
494     Rec& rec = recsPool_[idx];
495     while (true) {
496       auto head = recs_.load(std::memory_order_acquire);
497       rec.setNext(head); // there shouldn't be a data race here
498       if (recs_.compare_exchange_weak(head, idx)) {
499         return;
500       }
501     }
502   }
503
504   size_t getRecsHead() {
505     return recs_.load(std::memory_order_acquire);
506   }
507
508   size_t nextIndex(size_t idx) {
509     return recsPool_[idx].getNext();
510   }
511
512   void clearPending() {
513     pending_.reset();
514   }
515
516   void setPending() {
517     pending_.post();
518   }
519
520   bool isPending() const {
521     return pending_.try_wait();
522   }
523
524   void awaitPending() {
525     pending_.wait();
526   }
527
528   uint64_t combiningSession() {
529     uint64_t combined = 0;
530     do {
531       uint64_t count = static_cast<T*>(this)->combiningPass();
532       if (count == 0) {
533         break;
534       }
535       combined += count;
536       ++this->passes_;
537     } while (combined < this->maxOps_);
538     return combined;
539   }
540
541   void tryCombining() {
542     if (!dedicated_) {
543       while (isPending()) {
544         clearPending();
545         combined_ += combiningSession();
546       }
547     }
548   }
549
550   void dedicatedCombining() {
551     while (true) {
552       awaitPending();
553       clearPending();
554       if (shutdown_.load()) {
555         break;
556       }
557       while (true) {
558         uint64_t count;
559         ++sessions_;
560         {
561           std::lock_guard<Mutex> guard(m_);
562           count = combiningSession();
563           combined_ += count;
564         }
565         if (count < maxOps_) {
566           break;
567         }
568       }
569     }
570   }
571
572   void awaitDone(Rec& rec) {
573     if (dedicated_) {
574       rec.awaitDone();
575     } else {
576       awaitDoneTryLock(rec);
577     }
578   }
579
580   /// Waits for the request to be done and occasionally tries to
581   /// acquire the lock and to do combining. Used only in the absence
582   /// of a dedicated combiner.
583   void awaitDoneTryLock(Rec& rec) {
584     assert(!dedicated_);
585     int count = 0;
586     while (!rec.isDone()) {
587       if (count == 0) {
588         std::unique_lock<Mutex> l(m_, std::defer_lock);
589         if (l.try_lock()) {
590           setPending();
591           tryCombining();
592         }
593       } else {
594         folly::asm_volatile_pause();
595         if (++count == 1000) {
596           count = 0;
597         }
598       }
599     }
600   }
601
602   void shutdown() {
603     shutdown_.store(true);
604     setPending();
605   }
606
607   /// The following member functions may be overridden for customization
608
609   void combinedOp(Req&) {
610     throw std::runtime_error(
611         "FlatCombining::combinedOp(Req&) must be overridden in the derived"
612         " class if called.");
613   }
614
615   void processReq(Rec& rec) {
616     SavedFn& opFn = rec.getFn();
617     if (opFn) {
618       // simple interface
619       opFn();
620     } else {
621       // custom interface
622       Req& req = rec.getReq();
623       static_cast<T*>(this)->combinedOp(req); // defined in derived class
624     }
625     rec.setLast(passes_);
626     rec.complete();
627   }
628
629   uint64_t combiningPass() {
630     uint64_t count = 0;
631     auto idx = getRecsHead();
632     Rec* prev = nullptr;
633     while (idx != NULL_INDEX) {
634       Rec& rec = recsPool_[idx];
635       auto next = rec.getNext();
636       bool valid = rec.isValid();
637       if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
638           (prev != nullptr)) {
639         // Disconnect
640         prev->setNext(next);
641         rec.setDisconnected();
642         // store-load order setDisconnected before isValid
643         std::atomic_thread_fence(std::memory_order_seq_cst);
644         valid = rec.isValid();
645       } else {
646         prev = &rec;
647       }
648       if (valid) {
649         processReq(rec);
650         ++count;
651       }
652       idx = next;
653     }
654     return count;
655   }
656 };
657
658 } // namespace folly {