Flat combining: Add lock holder with deferred option. Minor fixes.
[folly.git] / folly / experimental / flat_combining / FlatCombining.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Baton.h>
20 #include <folly/Function.h>
21 #include <folly/IndexedMemPool.h>
22 #include <folly/Portability.h>
23 #include <folly/detail/CacheLocality.h>
24
25 #include <atomic>
26 #include <cassert>
27 #include <mutex>
28
29 namespace folly {
30
31 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
32 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
33 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
34 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
35 ///
36 /// FC is an alternative to coarse-grained locking for making
37 /// sequential data structures thread-safe while minimizing the
38 /// synchroniation overheads and cache coherence traffic associated
39 /// with locking.
40 ///
41 /// Under FC, when a thread finds the lock contended, it can
42 /// request (using a request record) that the lock holder execute its
43 /// operation on the shared data structure. There can be a designated
44 /// combiner thread or any thread can act as the combiner when it
45 /// holds the lock.
46 ///
47 /// Potential advantages of FC include:
48 /// - Reduced cache coherence traffic
49 /// - Reduced synchronization overheads, as the overheads of releasing
50 ///   and acquiring the lock are eliminated from the critical path of
51 ///   operating on the data structure.
52 /// - Opportunities for smart combining, where executing multiple
53 ///   operations together may take less time than executng the
54 ///   operations separately, e.g., K delete_min operations on a
55 ///   priority queue may be combined to take O(K + log N) time instead
56 ///   of O(K * log N).
57 ///
58 /// This implementation of flat combining supports:
59
60 /// - A simple interface that requires minimal extra code by the
61 ///   user. To use this interface efficiently the user-provided
62 ///   functions must be copyable to folly::Functio without dynamic
63 ///   allocation. If this is impossible or inconvenient, the user is
64 ///   encouraged to use the custom interface described below.
65 /// - A custom interface that supports custom combinining and custom
66 ///   request structure, either for the sake of smart combining or for
67 ///   efficiently supporting operations that are not be copyable to
68 ///   folly::Function without synamic allocation.
69 /// - Both synchronous and asynchronous operations.
70 /// - Request records with and without thread-caching.
71 /// - Combining with and without a dedicated combiner thread.
72 ///
73 /// This implementation differs from the algorithm in the SPAA 2010 paper:
74 /// - It does not require thread caching of request records
75 /// - It supports a dedicated combiner
76 /// - It supports asynchronous operations
77 ///
78 /// The generic FC class template supports generic data structures and
79 /// utilities with arbitrary operations. The template supports static
80 /// polymorphism for the combining function to enable custom smart
81 /// combining.
82 ///
83 /// A simple example of using the FC template:
84 ///   class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
85 ///     Foo foo_; // sequential data structure
86 ///    public:
87 ///     T bar(V& v) { // thread-safe execution of foo_.bar(v)
88 ///       T result;
89 ///       // Note: fn must be copyable to folly::Function without dynamic
90 ///       // allocation. Otherwise, it is recommended to use the custom
91 ///       // interface and manage the function arguments and results
92 ///       // explicitly in a custom request structure.
93 ///       auto fn = [&] { result = foo_.bar(v); };
94 ///       this->requestFC(fn);
95 ///       return result;
96 ///     }
97 ///   };
98 ///
99 /// See test/FlatCombiningExamples.h for more examples. See the
100 /// comments for requestFC() below for a list of simple and custom
101 /// variants of that function.
102
103 template <
104     typename T, // concurrent data structure using FC interface
105     typename Mutex = std::mutex,
106     template <typename> class Atom = std::atomic,
107     typename Req = /* default dummy type */ bool>
108 class FlatCombining {
109   using SavedFn = folly::Function<void()>;
110
111  public:
112   /// Combining request record.
113   class Rec {
114     FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
115     folly::Baton<Atom, true, false> valid_;
116     folly::Baton<Atom, true, false> done_;
117     folly::Baton<Atom, true, false> disconnected_;
118     size_t index_;
119     size_t next_;
120     uint64_t last_;
121     Req req_;
122     SavedFn fn_;
123
124    public:
125     Rec() {
126       setDone();
127       setDisconnected();
128     }
129
130     void setValid() {
131       valid_.post();
132     }
133
134     void clearValid() {
135       valid_.reset();
136     }
137
138     bool isValid() const {
139       return valid_.try_wait();
140     }
141
142     void setDone() {
143       done_.post();
144     }
145
146     void clearDone() {
147       done_.reset();
148     }
149
150     bool isDone() const {
151       return done_.try_wait();
152     }
153
154     void awaitDone() {
155       done_.wait();
156     }
157
158     void setDisconnected() {
159       disconnected_.post();
160     }
161
162     void clearDisconnected() {
163       disconnected_.reset();
164     }
165
166     bool isDisconnected() const {
167       return disconnected_.try_wait();
168     }
169
170     void setIndex(const size_t index) {
171       index_ = index;
172     }
173
174     size_t getIndex() const {
175       return index_;
176     }
177
178     void setNext(const size_t next) {
179       next_ = next;
180     }
181
182     size_t getNext() const {
183       return next_;
184     }
185
186     void setLast(const uint64_t pass) {
187       last_ = pass;
188     }
189
190     uint64_t getLast() const {
191       return last_;
192     }
193
194     Req& getReq() {
195       return req_;
196     }
197
198     template <typename Func>
199     void setFn(Func&& fn) {
200       static_assert(
201           std::is_nothrow_constructible<
202               folly::Function<void()>,
203               _t<std::decay<Func>>>::value,
204           "Try using a smaller function object that can fit in folly::Function "
205           "without allocation, or use the custom interface of requestFC() to "
206           "manage the requested function's arguments and results explicitly "
207           "in a custom request structure without allocation.");
208       fn_ = std::forward<Func>(fn);
209       assert(fn_);
210     }
211
212     void clearFn() {
213       fn_ = {};
214       assert(!fn_);
215     }
216
217     SavedFn& getFn() {
218       return fn_;
219     }
220
221     void complete() {
222       clearValid();
223       assert(!isDone());
224       setDone();
225     }
226   };
227
228   using Pool = folly::IndexedMemPool<Rec, 32, 4, Atom, false, false>;
229
230  public:
231   /// The constructor takes three optional arguments:
232   /// - Optional dedicated combiner thread (default true)
233   /// - Number of records (if 0, then kDefaultNumRecs)
234   /// - A hint for the max. number of combined operations per
235   ///   combining session that is checked at the beginning of each pass
236   ///   on the request records (if 0, then kDefaultMaxops)
237   explicit FlatCombining(
238       const bool dedicated = true,
239       const uint32_t numRecs = 0, // number of combining records
240       const uint32_t maxOps = 0 // hint of max ops per combining session
241       )
242       : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
243         maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
244         recs_(NULL_INDEX),
245         dedicated_(dedicated),
246         recsPool_(numRecs_) {
247     if (dedicated_) {
248       // dedicated combiner thread
249       combiner_ = std::thread([this] { dedicatedCombining(); });
250     }
251   }
252
253   /// Destructor: If there is a dedicated combiner, the destructor
254   /// flags it to shutdown. Otherwise, the destructor waits for all
255   /// pending asynchronous requests to be completed.
256   ~FlatCombining() {
257     if (dedicated_) {
258       shutdown();
259       combiner_.join();
260     } else {
261       drainAll();
262     }
263   }
264
265   // Wait for all pending operations to complete. Useful primarily
266   // when there are asynchronous operations without a dedicated
267   // combiner.
268   void drainAll() {
269     for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
270       Rec& rec = recsPool_[i];
271       awaitDone(rec);
272     }
273   }
274
275   // Give the caller exclusive access.
276   void acquireExclusive() {
277     m_.lock();
278   }
279
280   // Try to give the caller exclusive access. Returns true iff successful.
281   bool tryExclusive() {
282     return m_.try_lock();
283   }
284
285   // Release exclusive access. The caller must have exclusive access.
286   void releaseExclusive() {
287     m_.unlock();
288   }
289
290   // Give the lock holder ownership of the mutex and exclusive access.
291   // No need for explicit release.
292   template <typename LockHolder>
293   void holdLock(LockHolder& l) {
294     l = LockHolder(m_);
295   }
296
297   // Give the caller's lock holder ownership of the mutex but without
298   // exclusive access. The caller can later use the lock holder to try
299   // to acquire exclusive access.
300   template <typename LockHolder>
301   void holdLock(LockHolder& l, std::defer_lock_t) {
302     l = LockHolder(m_, std::defer_lock);
303   }
304
305   // Execute an operation without combining
306   template <typename OpFunc>
307   void requestNoFC(OpFunc& opFn) {
308     std::lock_guard<Mutex> guard(m_);
309     opFn();
310   }
311
312   // This function first tries to execute the operation without
313   // combining. If unuccessful, it allocates a combining record if
314   // needed. If there are no available records, it waits for exclusive
315   // access and executes the operation. If a record is available and
316   // ready for use, it fills the record and indicates that the request
317   // is valid for combining. If the request is synchronous (by default
318   // or necessity), it waits for the operation to be completed by a
319   // combiner and optionally extracts the result, if any.
320   //
321   // This function can be called in several forms:
322   //   Simple forms that do not require the user to define a Req structure
323   //   or to override any request processing member functions:
324   //     requestFC(opFn)
325   //     requestFC(opFn, rec) // provides its own pre-allocated record
326   //     requestFC(opFn, rec, syncop) // asynchronous if syncop == false
327   //   Custom forms that require the user to define a Req structure and to
328   //   override some request processing member functions:
329   //     requestFC(opFn, fillFn)
330   //     requestFC(opFn, fillFn, rec)
331   //     requestFC(opFn, fillFn, rec, syncop)
332   //     requestFC(opFn, fillFn, resFn)
333   //     requestFC(opFn, fillFn, resFn, rec)
334   template <typename OpFunc>
335   void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
336     auto dummy = [](Req&) {};
337     requestOp(
338         std::forward<OpFunc>(opFn),
339         dummy /* fillFn */,
340         dummy /* resFn */,
341         rec,
342         syncop,
343         false /* simple */);
344   }
345   template <typename OpFunc, typename FillFunc>
346   void requestFC(
347       OpFunc&& opFn,
348       const FillFunc& fillFn,
349       Rec* rec = nullptr,
350       bool syncop = true) {
351     auto dummy = [](Req&) {};
352     requestOp(
353         std::forward<OpFunc>(opFn),
354         fillFn,
355         dummy /* resFn */,
356         rec,
357         syncop,
358         true /* custom */);
359   }
360   template <typename OpFunc, typename FillFunc, typename ResFn>
361   void requestFC(
362       OpFunc&& opFn,
363       const FillFunc& fillFn,
364       const ResFn& resFn,
365       Rec* rec = nullptr) {
366     // must wait for result to execute resFn -- so it must be synchronous
367     requestOp(
368         std::forward<OpFunc>(opFn),
369         fillFn,
370         resFn,
371         rec,
372         true /* sync */,
373         true /* custom*/);
374   }
375
376   // Allocate a record.
377   Rec* allocRec() {
378     auto idx = recsPool_.allocIndex();
379     if (idx == NULL_INDEX) {
380       return nullptr;
381     }
382     Rec& rec = recsPool_[idx];
383     rec.setIndex(idx);
384     return &rec;
385   }
386
387   // Free a record
388   void freeRec(Rec* rec) {
389     if (rec == nullptr) {
390       return;
391     }
392     auto idx = rec->getIndex();
393     recsPool_.recycleIndex(idx);
394   }
395
396   // Returns the number of uncombined operations so far.
397   uint64_t getNumUncombined() const {
398     return uncombined_;
399   }
400
401   // Returns the number of combined operations so far.
402   uint64_t getNumCombined() const {
403     return combined_;
404   }
405
406   // Returns the number of combining passes so far.
407   uint64_t getNumPasses() const {
408     return passes_;
409   }
410
411   // Returns the number of combining sessions so far.
412   uint64_t getNumSessions() const {
413     return sessions_;
414   }
415
416  protected:
417   const size_t NULL_INDEX = 0;
418   const uint32_t kDefaultMaxOps = 100;
419   const uint64_t kDefaultNumRecs = 64;
420   const uint64_t kIdleThreshold = 10;
421
422   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
423   Mutex m_;
424
425   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
426   folly::Baton<Atom, false, true> pending_;
427   Atom<bool> shutdown_{false};
428
429   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
430   uint32_t numRecs_;
431   uint32_t maxOps_;
432   Atom<size_t> recs_;
433   bool dedicated_;
434   std::thread combiner_;
435   Pool recsPool_;
436
437   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
438   uint64_t uncombined_ = 0;
439   uint64_t combined_ = 0;
440   uint64_t passes_ = 0;
441   uint64_t sessions_ = 0;
442
443   template <typename OpFunc, typename FillFunc, typename ResFn>
444   void requestOp(
445       OpFunc&& opFn,
446       const FillFunc& fillFn,
447       const ResFn& resFn,
448       Rec* rec,
449       bool syncop,
450       const bool custom) {
451     std::unique_lock<Mutex> l(this->m_, std::defer_lock);
452     if (l.try_lock()) {
453       // No contention
454       ++uncombined_;
455       tryCombining();
456       opFn();
457       return;
458     }
459
460     // Try FC
461     bool tc = (rec != nullptr);
462     if (!tc) {
463       // if an async op doesn't have a thread-cached record then turn
464       // it into a synchronous op.
465       syncop = true;
466       rec = allocRec();
467     }
468     if (rec == nullptr) {
469       // Can't use FC - Must acquire lock
470       l.lock();
471       ++uncombined_;
472       tryCombining();
473       opFn();
474       return;
475     }
476
477     // Use FC
478     // Wait if record is in use
479     awaitDone(*rec);
480     rec->clearDone();
481     // Fill record
482     if (custom) {
483       // Fill the request (custom)
484       Req& req = rec->getReq();
485       fillFn(req);
486       rec->clearFn();
487     } else {
488       rec->setFn(std::forward<OpFunc>(opFn));
489     }
490     // Indicate that record is valid
491     assert(!rec->isValid());
492     rec->setValid();
493     // end of combining critical path
494     setPending();
495     // store-load order setValid before isDisconnected
496     std::atomic_thread_fence(std::memory_order_seq_cst);
497     if (rec->isDisconnected()) {
498       rec->clearDisconnected();
499       pushRec(rec->getIndex());
500       setPending();
501     }
502     // If synchronous wait for the request to be completed
503     if (syncop) {
504       awaitDone(*rec);
505       if (custom) {
506         Req& req = rec->getReq();
507         resFn(req); // Extract the result (custom)
508       }
509       if (!tc) {
510         freeRec(rec); // Free the temporary record.
511       }
512     }
513   }
514
515   void pushRec(size_t idx) {
516     Rec& rec = recsPool_[idx];
517     while (true) {
518       auto head = recs_.load(std::memory_order_acquire);
519       rec.setNext(head); // there shouldn't be a data race here
520       if (recs_.compare_exchange_weak(head, idx)) {
521         return;
522       }
523     }
524   }
525
526   size_t getRecsHead() {
527     return recs_.load(std::memory_order_acquire);
528   }
529
530   size_t nextIndex(size_t idx) {
531     return recsPool_[idx].getNext();
532   }
533
534   void clearPending() {
535     pending_.reset();
536   }
537
538   void setPending() {
539     pending_.post();
540   }
541
542   bool isPending() const {
543     return pending_.try_wait();
544   }
545
546   void awaitPending() {
547     pending_.wait();
548   }
549
550   uint64_t combiningSession() {
551     uint64_t combined = 0;
552     do {
553       uint64_t count = static_cast<T*>(this)->combiningPass();
554       if (count == 0) {
555         break;
556       }
557       combined += count;
558       ++this->passes_;
559     } while (combined < this->maxOps_);
560     return combined;
561   }
562
563   void tryCombining() {
564     if (!dedicated_) {
565       while (isPending()) {
566         clearPending();
567         ++sessions_;
568         combined_ += combiningSession();
569       }
570     }
571   }
572
573   void dedicatedCombining() {
574     while (true) {
575       awaitPending();
576       clearPending();
577       if (shutdown_.load()) {
578         break;
579       }
580       while (true) {
581         uint64_t count;
582         ++sessions_;
583         {
584           std::lock_guard<Mutex> guard(m_);
585           count = combiningSession();
586           combined_ += count;
587         }
588         if (count < maxOps_) {
589           break;
590         }
591       }
592     }
593   }
594
595   void awaitDone(Rec& rec) {
596     if (dedicated_) {
597       rec.awaitDone();
598     } else {
599       awaitDoneTryLock(rec);
600     }
601   }
602
603   /// Waits for the request to be done and occasionally tries to
604   /// acquire the lock and to do combining. Used only in the absence
605   /// of a dedicated combiner.
606   void awaitDoneTryLock(Rec& rec) {
607     assert(!dedicated_);
608     int count = 0;
609     while (!rec.isDone()) {
610       if (count == 0) {
611         std::unique_lock<Mutex> l(m_, std::defer_lock);
612         if (l.try_lock()) {
613           setPending();
614           tryCombining();
615         }
616       } else {
617         folly::asm_volatile_pause();
618         if (++count == 1000) {
619           count = 0;
620         }
621       }
622     }
623   }
624
625   void shutdown() {
626     shutdown_.store(true);
627     setPending();
628   }
629
630   /// The following member functions may be overridden for customization
631
632   void combinedOp(Req&) {
633     throw std::runtime_error(
634         "FlatCombining::combinedOp(Req&) must be overridden in the derived"
635         " class if called.");
636   }
637
638   void processReq(Rec& rec) {
639     SavedFn& opFn = rec.getFn();
640     if (opFn) {
641       // simple interface
642       opFn();
643     } else {
644       // custom interface
645       Req& req = rec.getReq();
646       static_cast<T*>(this)->combinedOp(req); // defined in derived class
647     }
648     rec.setLast(passes_);
649     rec.complete();
650   }
651
652   uint64_t combiningPass() {
653     uint64_t count = 0;
654     auto idx = getRecsHead();
655     Rec* prev = nullptr;
656     while (idx != NULL_INDEX) {
657       Rec& rec = recsPool_[idx];
658       auto next = rec.getNext();
659       bool valid = rec.isValid();
660       if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
661           (prev != nullptr)) {
662         // Disconnect
663         prev->setNext(next);
664         rec.setDisconnected();
665         // store-load order setDisconnected before isValid
666         std::atomic_thread_fence(std::memory_order_seq_cst);
667         valid = rec.isValid();
668       } else {
669         prev = &rec;
670       }
671       if (valid) {
672         processReq(rec);
673         ++count;
674       }
675       idx = next;
676     }
677     return count;
678   }
679 };
680
681 } // namespace folly {