146f1748bd02af13789117d4c2a689f58f2ed72b
[folly.git] / folly / experimental / flat_combining / FlatCombining.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Baton.h>
20 #include <folly/Function.h>
21 #include <folly/IndexedMemPool.h>
22 #include <folly/Portability.h>
23 #include <folly/detail/CacheLocality.h>
24
25 #include <atomic>
26 #include <cassert>
27 #include <mutex>
28
29 namespace folly {
30
31 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
32 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
33 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
34 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
35 ///
36 /// FC is an alternative to coarse-grained locking for making
37 /// sequential data structures thread-safe while minimizing the
38 /// synchroniation overheads and cache coherence traffic associated
39 /// with locking.
40 ///
41 /// Under FC, when a thread finds the lock contended, it can
42 /// request (using a request record) that the lock holder execute its
43 /// operation on the shared data structure. There can be a designated
44 /// combiner thread or any thread can act as the combiner when it
45 /// holds the lock.
46 ///
47 /// Potential advantages of FC include:
48 /// - Reduced cache coherence traffic
49 /// - Reduced synchronization overheads, as the overheads of releasing
50 ///   and acquiring the lock are eliminated from the critical path of
51 ///   operating on the data structure.
52 /// - Opportunities for smart combining, where executing multiple
53 ///   operations together may take less time than executng the
54 ///   operations separately, e.g., K delete_min operations on a
55 ///   priority queue may be combined to take O(K + log N) time instead
56 ///   of O(K * log N).
57 ///
58 /// This implementation of flat combining supports:
59
60 /// - A simple interface that requires minimal extra code by the
61 ///   user. To use this interface efficiently the user-provided
62 ///   functions must be copyable to folly::Functio without dynamic
63 ///   allocation. If this is impossible or inconvenient, the user is
64 ///   encouraged to use the custom interface described below.
65 /// - A custom interface that supports custom combinining and custom
66 ///   request structure, either for the sake of smart combining or for
67 ///   efficiently supporting operations that are not be copyable to
68 ///   folly::Function without synamic allocation.
69 /// - Both synchronous and asynchronous operations.
70 /// - Request records with and without thread-caching.
71 /// - Combining with and without a dedicated combiner thread.
72 ///
73 /// This implementation differs from the algorithm in the SPAA 2010 paper:
74 /// - It does not require thread caching of request records
75 /// - It supports a dedicated combiner
76 /// - It supports asynchronous operations
77 ///
78 /// The generic FC class template supports generic data structures and
79 /// utilities with arbitrary operations. The template supports static
80 /// polymorphism for the combining function to enable custom smart
81 /// combining.
82 ///
83 /// A simple example of using the FC template:
84 ///   class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
85 ///     Foo foo_; // sequential data structure
86 ///    public:
87 ///     T bar(V& v) { // thread-safe execution of foo_.bar(v)
88 ///       T result;
89 ///       // Note: fn must be copyable to folly::Function without dynamic
90 ///       // allocation. Otherwise, it is recommended to use the custom
91 ///       // interface and manage the function arguments and results
92 ///       // explicitly in a custom request structure.
93 ///       auto fn = [&] { result = foo_.bar(v); };
94 ///       this->requestFC(fn);
95 ///       return result;
96 ///     }
97 ///   };
98 ///
99 /// See test/FlatCombiningExamples.h for more examples. See the
100 /// comments for requestFC() below for a list of simple and custom
101 /// variants of that function.
102
103 template <
104     typename T, // concurrent data structure using FC interface
105     typename Mutex = std::mutex,
106     template <typename> class Atom = std::atomic,
107     typename Req = /* default dummy type */ bool>
108 class FlatCombining {
109   using SavedFn = folly::Function<void()>;
110
111  public:
112   /// Combining request record.
113   class Rec {
114     FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
115     folly::Baton<Atom, true, false> valid_;
116     folly::Baton<Atom, true, false> done_;
117     folly::Baton<Atom, true, false> disconnected_;
118     size_t index_;
119     size_t next_;
120     uint64_t last_;
121     Req req_;
122     SavedFn fn_;
123
124    public:
125     Rec() {
126       setDone();
127       setDisconnected();
128     }
129
130     void setValid() {
131       valid_.post();
132     }
133
134     void clearValid() {
135       valid_.reset();
136     }
137
138     bool isValid() const {
139       return valid_.try_wait();
140     }
141
142     void setDone() {
143       done_.post();
144     }
145
146     void clearDone() {
147       done_.reset();
148     }
149
150     bool isDone() const {
151       return done_.try_wait();
152     }
153
154     void awaitDone() {
155       done_.wait();
156     }
157
158     void setDisconnected() {
159       disconnected_.post();
160     }
161
162     void clearDisconnected() {
163       disconnected_.reset();
164     }
165
166     bool isDisconnected() const {
167       return disconnected_.try_wait();
168     }
169
170     void setIndex(const size_t index) {
171       index_ = index;
172     }
173
174     size_t getIndex() const {
175       return index_;
176     }
177
178     void setNext(const size_t next) {
179       next_ = next;
180     }
181
182     size_t getNext() const {
183       return next_;
184     }
185
186     void setLast(const uint64_t pass) {
187       last_ = pass;
188     }
189
190     uint64_t getLast() const {
191       return last_;
192     }
193
194     Req& getReq() {
195       return req_;
196     }
197
198     template <typename Func>
199     void setFn(Func&& fn) {
200       static_assert(
201           std::is_nothrow_constructible<
202               folly::Function<void()>,
203               _t<std::decay<Func>>>::value,
204           "Try using a smaller function object that can fit in folly::Function "
205           "without allocation, or use the custom interface of requestFC() to "
206           "manage the requested function's arguments and results explicitly "
207           "in a custom request structure without allocation.");
208       fn_ = std::forward<Func>(fn);
209       assert(fn_);
210     }
211
212     void clearFn() {
213       fn_ = {};
214       assert(!fn_);
215     }
216
217     SavedFn& getFn() {
218       return fn_;
219     }
220
221     void complete() {
222       clearValid();
223       assert(!isDone());
224       setDone();
225     }
226   };
227
228   using Pool = folly::
229       IndexedMemPool<Rec, 32, 4, Atom, IndexedMemPoolTraitsLazyRecycle<Rec>>;
230
231  public:
232   /// The constructor takes three optional arguments:
233   /// - Optional dedicated combiner thread (default true)
234   /// - Number of records (if 0, then kDefaultNumRecs)
235   /// - A hint for the max. number of combined operations per
236   ///   combining session that is checked at the beginning of each pass
237   ///   on the request records (if 0, then kDefaultMaxops)
238   explicit FlatCombining(
239       const bool dedicated = true,
240       const uint32_t numRecs = 0, // number of combining records
241       const uint32_t maxOps = 0 // hint of max ops per combining session
242       )
243       : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
244         maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
245         recs_(NULL_INDEX),
246         dedicated_(dedicated),
247         recsPool_(numRecs_) {
248     if (dedicated_) {
249       // dedicated combiner thread
250       combiner_ = std::thread([this] { dedicatedCombining(); });
251     }
252   }
253
254   /// Destructor: If there is a dedicated combiner, the destructor
255   /// flags it to shutdown. Otherwise, the destructor waits for all
256   /// pending asynchronous requests to be completed.
257   ~FlatCombining() {
258     if (dedicated_) {
259       shutdown();
260       combiner_.join();
261     } else {
262       drainAll();
263     }
264   }
265
266   // Wait for all pending operations to complete. Useful primarily
267   // when there are asynchronous operations without a dedicated
268   // combiner.
269   void drainAll() {
270     for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
271       Rec& rec = recsPool_[i];
272       awaitDone(rec);
273     }
274   }
275
276   // Give the caller exclusive access.
277   void acquireExclusive() {
278     m_.lock();
279   }
280
281   // Try to give the caller exclusive access. Returns true iff successful.
282   bool tryExclusive() {
283     return m_.try_lock();
284   }
285
286   // Release exclusive access. The caller must have exclusive access.
287   void releaseExclusive() {
288     m_.unlock();
289   }
290
291   // Give the lock holder ownership of the mutex and exclusive access.
292   // No need for explicit release.
293   template <typename LockHolder>
294   void holdLock(LockHolder& l) {
295     l = LockHolder(m_);
296   }
297
298   // Give the caller's lock holder ownership of the mutex but without
299   // exclusive access. The caller can later use the lock holder to try
300   // to acquire exclusive access.
301   template <typename LockHolder>
302   void holdLock(LockHolder& l, std::defer_lock_t) {
303     l = LockHolder(m_, std::defer_lock);
304   }
305
306   // Execute an operation without combining
307   template <typename OpFunc>
308   void requestNoFC(OpFunc& opFn) {
309     std::lock_guard<Mutex> guard(m_);
310     opFn();
311   }
312
313   // This function first tries to execute the operation without
314   // combining. If unuccessful, it allocates a combining record if
315   // needed. If there are no available records, it waits for exclusive
316   // access and executes the operation. If a record is available and
317   // ready for use, it fills the record and indicates that the request
318   // is valid for combining. If the request is synchronous (by default
319   // or necessity), it waits for the operation to be completed by a
320   // combiner and optionally extracts the result, if any.
321   //
322   // This function can be called in several forms:
323   //   Simple forms that do not require the user to define a Req structure
324   //   or to override any request processing member functions:
325   //     requestFC(opFn)
326   //     requestFC(opFn, rec) // provides its own pre-allocated record
327   //     requestFC(opFn, rec, syncop) // asynchronous if syncop == false
328   //   Custom forms that require the user to define a Req structure and to
329   //   override some request processing member functions:
330   //     requestFC(opFn, fillFn)
331   //     requestFC(opFn, fillFn, rec)
332   //     requestFC(opFn, fillFn, rec, syncop)
333   //     requestFC(opFn, fillFn, resFn)
334   //     requestFC(opFn, fillFn, resFn, rec)
335   template <typename OpFunc>
336   void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
337     auto dummy = [](Req&) {};
338     requestOp(
339         std::forward<OpFunc>(opFn),
340         dummy /* fillFn */,
341         dummy /* resFn */,
342         rec,
343         syncop,
344         false /* simple */);
345   }
346   template <typename OpFunc, typename FillFunc>
347   void requestFC(
348       OpFunc&& opFn,
349       const FillFunc& fillFn,
350       Rec* rec = nullptr,
351       bool syncop = true) {
352     auto dummy = [](Req&) {};
353     requestOp(
354         std::forward<OpFunc>(opFn),
355         fillFn,
356         dummy /* resFn */,
357         rec,
358         syncop,
359         true /* custom */);
360   }
361   template <typename OpFunc, typename FillFunc, typename ResFn>
362   void requestFC(
363       OpFunc&& opFn,
364       const FillFunc& fillFn,
365       const ResFn& resFn,
366       Rec* rec = nullptr) {
367     // must wait for result to execute resFn -- so it must be synchronous
368     requestOp(
369         std::forward<OpFunc>(opFn),
370         fillFn,
371         resFn,
372         rec,
373         true /* sync */,
374         true /* custom*/);
375   }
376
377   // Allocate a record.
378   Rec* allocRec() {
379     auto idx = recsPool_.allocIndex();
380     if (idx == NULL_INDEX) {
381       return nullptr;
382     }
383     Rec& rec = recsPool_[idx];
384     rec.setIndex(idx);
385     return &rec;
386   }
387
388   // Free a record
389   void freeRec(Rec* rec) {
390     if (rec == nullptr) {
391       return;
392     }
393     auto idx = rec->getIndex();
394     recsPool_.recycleIndex(idx);
395   }
396
397   // Returns the number of uncombined operations so far.
398   uint64_t getNumUncombined() const {
399     return uncombined_;
400   }
401
402   // Returns the number of combined operations so far.
403   uint64_t getNumCombined() const {
404     return combined_;
405   }
406
407   // Returns the number of combining passes so far.
408   uint64_t getNumPasses() const {
409     return passes_;
410   }
411
412   // Returns the number of combining sessions so far.
413   uint64_t getNumSessions() const {
414     return sessions_;
415   }
416
417  protected:
418   const size_t NULL_INDEX = 0;
419   const uint32_t kDefaultMaxOps = 100;
420   const uint64_t kDefaultNumRecs = 64;
421   const uint64_t kIdleThreshold = 10;
422
423   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
424   Mutex m_;
425
426   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
427   folly::Baton<Atom, false, true> pending_;
428   Atom<bool> shutdown_{false};
429
430   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
431   uint32_t numRecs_;
432   uint32_t maxOps_;
433   Atom<size_t> recs_;
434   bool dedicated_;
435   std::thread combiner_;
436   Pool recsPool_;
437
438   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
439   uint64_t uncombined_ = 0;
440   uint64_t combined_ = 0;
441   uint64_t passes_ = 0;
442   uint64_t sessions_ = 0;
443
444   template <typename OpFunc, typename FillFunc, typename ResFn>
445   void requestOp(
446       OpFunc&& opFn,
447       const FillFunc& fillFn,
448       const ResFn& resFn,
449       Rec* rec,
450       bool syncop,
451       const bool custom) {
452     std::unique_lock<Mutex> l(this->m_, std::defer_lock);
453     if (l.try_lock()) {
454       // No contention
455       ++uncombined_;
456       tryCombining();
457       opFn();
458       return;
459     }
460
461     // Try FC
462     bool tc = (rec != nullptr);
463     if (!tc) {
464       // if an async op doesn't have a thread-cached record then turn
465       // it into a synchronous op.
466       syncop = true;
467       rec = allocRec();
468     }
469     if (rec == nullptr) {
470       // Can't use FC - Must acquire lock
471       l.lock();
472       ++uncombined_;
473       tryCombining();
474       opFn();
475       return;
476     }
477
478     // Use FC
479     // Wait if record is in use
480     awaitDone(*rec);
481     rec->clearDone();
482     // Fill record
483     if (custom) {
484       // Fill the request (custom)
485       Req& req = rec->getReq();
486       fillFn(req);
487       rec->clearFn();
488     } else {
489       rec->setFn(std::forward<OpFunc>(opFn));
490     }
491     // Indicate that record is valid
492     assert(!rec->isValid());
493     rec->setValid();
494     // end of combining critical path
495     setPending();
496     // store-load order setValid before isDisconnected
497     std::atomic_thread_fence(std::memory_order_seq_cst);
498     if (rec->isDisconnected()) {
499       rec->clearDisconnected();
500       pushRec(rec->getIndex());
501       setPending();
502     }
503     // If synchronous wait for the request to be completed
504     if (syncop) {
505       awaitDone(*rec);
506       if (custom) {
507         Req& req = rec->getReq();
508         resFn(req); // Extract the result (custom)
509       }
510       if (!tc) {
511         freeRec(rec); // Free the temporary record.
512       }
513     }
514   }
515
516   void pushRec(size_t idx) {
517     Rec& rec = recsPool_[idx];
518     while (true) {
519       auto head = recs_.load(std::memory_order_acquire);
520       rec.setNext(head); // there shouldn't be a data race here
521       if (recs_.compare_exchange_weak(head, idx)) {
522         return;
523       }
524     }
525   }
526
527   size_t getRecsHead() {
528     return recs_.load(std::memory_order_acquire);
529   }
530
531   size_t nextIndex(size_t idx) {
532     return recsPool_[idx].getNext();
533   }
534
535   void clearPending() {
536     pending_.reset();
537   }
538
539   void setPending() {
540     pending_.post();
541   }
542
543   bool isPending() const {
544     return pending_.try_wait();
545   }
546
547   void awaitPending() {
548     pending_.wait();
549   }
550
551   uint64_t combiningSession() {
552     uint64_t combined = 0;
553     do {
554       uint64_t count = static_cast<T*>(this)->combiningPass();
555       if (count == 0) {
556         break;
557       }
558       combined += count;
559       ++this->passes_;
560     } while (combined < this->maxOps_);
561     return combined;
562   }
563
564   void tryCombining() {
565     if (!dedicated_) {
566       while (isPending()) {
567         clearPending();
568         ++sessions_;
569         combined_ += combiningSession();
570       }
571     }
572   }
573
574   void dedicatedCombining() {
575     while (true) {
576       awaitPending();
577       clearPending();
578       if (shutdown_.load()) {
579         break;
580       }
581       while (true) {
582         uint64_t count;
583         ++sessions_;
584         {
585           std::lock_guard<Mutex> guard(m_);
586           count = combiningSession();
587           combined_ += count;
588         }
589         if (count < maxOps_) {
590           break;
591         }
592       }
593     }
594   }
595
596   void awaitDone(Rec& rec) {
597     if (dedicated_) {
598       rec.awaitDone();
599     } else {
600       awaitDoneTryLock(rec);
601     }
602   }
603
604   /// Waits for the request to be done and occasionally tries to
605   /// acquire the lock and to do combining. Used only in the absence
606   /// of a dedicated combiner.
607   void awaitDoneTryLock(Rec& rec) {
608     assert(!dedicated_);
609     int count = 0;
610     while (!rec.isDone()) {
611       if (count == 0) {
612         std::unique_lock<Mutex> l(m_, std::defer_lock);
613         if (l.try_lock()) {
614           setPending();
615           tryCombining();
616         }
617       } else {
618         folly::asm_volatile_pause();
619         if (++count == 1000) {
620           count = 0;
621         }
622       }
623     }
624   }
625
626   void shutdown() {
627     shutdown_.store(true);
628     setPending();
629   }
630
631   /// The following member functions may be overridden for customization
632
633   void combinedOp(Req&) {
634     throw std::runtime_error(
635         "FlatCombining::combinedOp(Req&) must be overridden in the derived"
636         " class if called.");
637   }
638
639   void processReq(Rec& rec) {
640     SavedFn& opFn = rec.getFn();
641     if (opFn) {
642       // simple interface
643       opFn();
644     } else {
645       // custom interface
646       Req& req = rec.getReq();
647       static_cast<T*>(this)->combinedOp(req); // defined in derived class
648     }
649     rec.setLast(passes_);
650     rec.complete();
651   }
652
653   uint64_t combiningPass() {
654     uint64_t count = 0;
655     auto idx = getRecsHead();
656     Rec* prev = nullptr;
657     while (idx != NULL_INDEX) {
658       Rec& rec = recsPool_[idx];
659       auto next = rec.getNext();
660       bool valid = rec.isValid();
661       if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
662           (prev != nullptr)) {
663         // Disconnect
664         prev->setNext(next);
665         rec.setDisconnected();
666         // store-load order setDisconnected before isValid
667         std::atomic_thread_fence(std::memory_order_seq_cst);
668         valid = rec.isValid();
669       } else {
670         prev = &rec;
671       }
672       if (valid) {
673         processReq(rec);
674         ++count;
675       }
676       idx = next;
677     }
678     return count;
679   }
680 };
681
682 } // namespace folly {