Adds writer test case for RCU
[folly.git] / folly / experimental / flat_combining / FlatCombining.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/IndexedMemPool.h>
21 #include <folly/Portability.h>
22 #include <folly/concurrency/CacheLocality.h>
23 #include <folly/synchronization/SaturatingSemaphore.h>
24
25 #include <atomic>
26 #include <cassert>
27 #include <mutex>
28 #include <thread>
29
30 namespace folly {
31
32 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
33 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
34 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
35 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
36 ///
37 /// FC is an alternative to coarse-grained locking for making
38 /// sequential data structures thread-safe while minimizing the
39 /// synchronization overheads and cache coherence traffic associated
40 /// with locking.
41 ///
42 /// Under FC, when a thread finds the lock contended, it can
43 /// request (using a request record) that the lock holder execute its
44 /// operation on the shared data structure. There can be a designated
45 /// combiner thread or any thread can act as the combiner when it
46 /// holds the lock.
47 ///
48 /// Potential advantages of FC include:
49 /// - Reduced cache coherence traffic
50 /// - Reduced synchronization overheads, as the overheads of releasing
51 ///   and acquiring the lock are eliminated from the critical path of
52 ///   operating on the data structure.
53 /// - Opportunities for smart combining, where executing multiple
54 ///   operations together may take less time than executing the
55 ///   operations separately, e.g., K delete_min operations on a
56 ///   priority queue may be combined to take O(K + log N) time instead
57 ///   of O(K * log N).
58 ///
59 /// This implementation of flat combining supports:
60
61 /// - A simple interface that requires minimal extra code by the
62 ///   user. To use this interface efficiently the user-provided
63 ///   functions must be copyable to folly::Function without dynamic
64 ///   allocation. If this is impossible or inconvenient, the user is
65 ///   encouraged to use the custom interface described below.
66 /// - A custom interface that supports custom combining and custom
67 ///   request structure, either for the sake of smart combining or for
68 ///   efficiently supporting operations that are not be copyable to
69 ///   folly::Function without dynamic allocation.
70 /// - Both synchronous and asynchronous operations.
71 /// - Request records with and without thread-caching.
72 /// - Combining with and without a dedicated combiner thread.
73 ///
74 /// This implementation differs from the algorithm in the SPAA 2010 paper:
75 /// - It does not require thread caching of request records
76 /// - It supports a dedicated combiner
77 /// - It supports asynchronous operations
78 ///
79 /// The generic FC class template supports generic data structures and
80 /// utilities with arbitrary operations. The template supports static
81 /// polymorphism for the combining function to enable custom smart
82 /// combining.
83 ///
84 /// A simple example of using the FC template:
85 ///   class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
86 ///     Foo foo_; // sequential data structure
87 ///    public:
88 ///     T bar(V& v) { // thread-safe execution of foo_.bar(v)
89 ///       T result;
90 ///       // Note: fn must be copyable to folly::Function without dynamic
91 ///       // allocation. Otherwise, it is recommended to use the custom
92 ///       // interface and manage the function arguments and results
93 ///       // explicitly in a custom request structure.
94 ///       auto fn = [&] { result = foo_.bar(v); };
95 ///       this->requestFC(fn);
96 ///       return result;
97 ///     }
98 ///   };
99 ///
100 /// See test/FlatCombiningExamples.h for more examples. See the
101 /// comments for requestFC() below for a list of simple and custom
102 /// variants of that function.
103
104 template <
105     typename T, // concurrent data structure using FC interface
106     typename Mutex = std::mutex,
107     template <typename> class Atom = std::atomic,
108     typename Req = /* default dummy type */ bool>
109 class FlatCombining {
110   using SavedFn = folly::Function<void()>;
111
112  public:
113   /// Combining request record.
114   class Rec {
115     alignas(hardware_destructive_interference_size)
116         folly::SaturatingSemaphore<false, Atom> valid_;
117     folly::SaturatingSemaphore<false, Atom> done_;
118     folly::SaturatingSemaphore<false, Atom> disconnected_;
119     size_t index_;
120     size_t next_;
121     uint64_t last_;
122     Req req_;
123     SavedFn fn_;
124
125    public:
126     Rec() {
127       setDone();
128       setDisconnected();
129     }
130
131     void setValid() {
132       valid_.post();
133     }
134
135     void clearValid() {
136       valid_.reset();
137     }
138
139     bool isValid() const {
140       return valid_.ready();
141     }
142
143     void setDone() {
144       done_.post();
145     }
146
147     void clearDone() {
148       done_.reset();
149     }
150
151     bool isDone() const {
152       return done_.ready();
153     }
154
155     void awaitDone() {
156       done_.wait();
157     }
158
159     void setDisconnected() {
160       disconnected_.post();
161     }
162
163     void clearDisconnected() {
164       disconnected_.reset();
165     }
166
167     bool isDisconnected() const {
168       return disconnected_.ready();
169     }
170
171     void setIndex(const size_t index) {
172       index_ = index;
173     }
174
175     size_t getIndex() const {
176       return index_;
177     }
178
179     void setNext(const size_t next) {
180       next_ = next;
181     }
182
183     size_t getNext() const {
184       return next_;
185     }
186
187     void setLast(const uint64_t pass) {
188       last_ = pass;
189     }
190
191     uint64_t getLast() const {
192       return last_;
193     }
194
195     Req& getReq() {
196       return req_;
197     }
198
199     template <typename Func>
200     void setFn(Func&& fn) {
201       static_assert(
202           std::is_nothrow_constructible<
203               folly::Function<void()>,
204               _t<std::decay<Func>>>::value,
205           "Try using a smaller function object that can fit in folly::Function "
206           "without allocation, or use the custom interface of requestFC() to "
207           "manage the requested function's arguments and results explicitly "
208           "in a custom request structure without allocation.");
209       fn_ = std::forward<Func>(fn);
210       assert(fn_);
211     }
212
213     void clearFn() {
214       fn_ = {};
215       assert(!fn_);
216     }
217
218     SavedFn& getFn() {
219       return fn_;
220     }
221
222     void complete() {
223       clearValid();
224       assert(!isDone());
225       setDone();
226     }
227   };
228
229   using Pool = folly::
230       IndexedMemPool<Rec, 32, 4, Atom, IndexedMemPoolTraitsLazyRecycle<Rec>>;
231
232  public:
233   /// The constructor takes three optional arguments:
234   /// - Optional dedicated combiner thread (default true)
235   /// - Number of records (if 0, then kDefaultNumRecs)
236   /// - A hint for the max. number of combined operations per
237   ///   combining session that is checked at the beginning of each pass
238   ///   on the request records (if 0, then kDefaultMaxops)
239   explicit FlatCombining(
240       const bool dedicated = true,
241       const uint32_t numRecs = 0, // number of combining records
242       const uint32_t maxOps = 0 // hint of max ops per combining session
243       )
244       : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
245         maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
246         recs_(NULL_INDEX),
247         dedicated_(dedicated),
248         recsPool_(numRecs_) {
249     if (dedicated_) {
250       // dedicated combiner thread
251       combiner_ = std::thread([this] { dedicatedCombining(); });
252     }
253   }
254
255   /// Destructor: If there is a dedicated combiner, the destructor
256   /// flags it to shutdown. Otherwise, the destructor waits for all
257   /// pending asynchronous requests to be completed.
258   ~FlatCombining() {
259     if (dedicated_) {
260       shutdown();
261       combiner_.join();
262     } else {
263       drainAll();
264     }
265   }
266
267   // Wait for all pending operations to complete. Useful primarily
268   // when there are asynchronous operations without a dedicated
269   // combiner.
270   void drainAll() {
271     for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
272       Rec& rec = recsPool_[i];
273       awaitDone(rec);
274     }
275   }
276
277   // Give the caller exclusive access.
278   void acquireExclusive() {
279     m_.lock();
280   }
281
282   // Try to give the caller exclusive access. Returns true iff successful.
283   bool tryExclusive() {
284     return m_.try_lock();
285   }
286
287   // Release exclusive access. The caller must have exclusive access.
288   void releaseExclusive() {
289     m_.unlock();
290   }
291
292   // Give the lock holder ownership of the mutex and exclusive access.
293   // No need for explicit release.
294   template <typename LockHolder>
295   void holdLock(LockHolder& l) {
296     l = LockHolder(m_);
297   }
298
299   // Give the caller's lock holder ownership of the mutex but without
300   // exclusive access. The caller can later use the lock holder to try
301   // to acquire exclusive access.
302   template <typename LockHolder>
303   void holdLock(LockHolder& l, std::defer_lock_t) {
304     l = LockHolder(m_, std::defer_lock);
305   }
306
307   // Execute an operation without combining
308   template <typename OpFunc>
309   void requestNoFC(OpFunc& opFn) {
310     std::lock_guard<Mutex> guard(m_);
311     opFn();
312   }
313
314   // This function first tries to execute the operation without
315   // combining. If unuccessful, it allocates a combining record if
316   // needed. If there are no available records, it waits for exclusive
317   // access and executes the operation. If a record is available and
318   // ready for use, it fills the record and indicates that the request
319   // is valid for combining. If the request is synchronous (by default
320   // or necessity), it waits for the operation to be completed by a
321   // combiner and optionally extracts the result, if any.
322   //
323   // This function can be called in several forms:
324   //   Simple forms that do not require the user to define a Req structure
325   //   or to override any request processing member functions:
326   //     requestFC(opFn)
327   //     requestFC(opFn, rec) // provides its own pre-allocated record
328   //     requestFC(opFn, rec, syncop) // asynchronous if syncop == false
329   //   Custom forms that require the user to define a Req structure and to
330   //   override some request processing member functions:
331   //     requestFC(opFn, fillFn)
332   //     requestFC(opFn, fillFn, rec)
333   //     requestFC(opFn, fillFn, rec, syncop)
334   //     requestFC(opFn, fillFn, resFn)
335   //     requestFC(opFn, fillFn, resFn, rec)
336   template <typename OpFunc>
337   void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
338     auto dummy = [](Req&) {};
339     requestOp(
340         std::forward<OpFunc>(opFn),
341         dummy /* fillFn */,
342         dummy /* resFn */,
343         rec,
344         syncop,
345         false /* simple */);
346   }
347   template <typename OpFunc, typename FillFunc>
348   void requestFC(
349       OpFunc&& opFn,
350       const FillFunc& fillFn,
351       Rec* rec = nullptr,
352       bool syncop = true) {
353     auto dummy = [](Req&) {};
354     requestOp(
355         std::forward<OpFunc>(opFn),
356         fillFn,
357         dummy /* resFn */,
358         rec,
359         syncop,
360         true /* custom */);
361   }
362   template <typename OpFunc, typename FillFunc, typename ResFn>
363   void requestFC(
364       OpFunc&& opFn,
365       const FillFunc& fillFn,
366       const ResFn& resFn,
367       Rec* rec = nullptr) {
368     // must wait for result to execute resFn -- so it must be synchronous
369     requestOp(
370         std::forward<OpFunc>(opFn),
371         fillFn,
372         resFn,
373         rec,
374         true /* sync */,
375         true /* custom*/);
376   }
377
378   // Allocate a record.
379   Rec* allocRec() {
380     auto idx = recsPool_.allocIndex();
381     if (idx == NULL_INDEX) {
382       return nullptr;
383     }
384     Rec& rec = recsPool_[idx];
385     rec.setIndex(idx);
386     return &rec;
387   }
388
389   // Free a record
390   void freeRec(Rec* rec) {
391     if (rec == nullptr) {
392       return;
393     }
394     auto idx = rec->getIndex();
395     recsPool_.recycleIndex(idx);
396   }
397
398   // Returns the number of uncombined operations so far.
399   uint64_t getNumUncombined() const {
400     return uncombined_;
401   }
402
403   // Returns the number of combined operations so far.
404   uint64_t getNumCombined() const {
405     return combined_;
406   }
407
408   // Returns the number of combining passes so far.
409   uint64_t getNumPasses() const {
410     return passes_;
411   }
412
413   // Returns the number of combining sessions so far.
414   uint64_t getNumSessions() const {
415     return sessions_;
416   }
417
418  protected:
419   const size_t NULL_INDEX = 0;
420   const uint32_t kDefaultMaxOps = 100;
421   const uint64_t kDefaultNumRecs = 64;
422   const uint64_t kIdleThreshold = 10;
423
424   alignas(hardware_destructive_interference_size) Mutex m_;
425
426   alignas(hardware_destructive_interference_size)
427       folly::SaturatingSemaphore<true, Atom> pending_;
428   Atom<bool> shutdown_{false};
429
430   alignas(hardware_destructive_interference_size) uint32_t numRecs_;
431   uint32_t maxOps_;
432   Atom<size_t> recs_;
433   bool dedicated_;
434   std::thread combiner_;
435   Pool recsPool_;
436
437   alignas(hardware_destructive_interference_size) uint64_t uncombined_ = 0;
438   uint64_t combined_ = 0;
439   uint64_t passes_ = 0;
440   uint64_t sessions_ = 0;
441
442   template <typename OpFunc, typename FillFunc, typename ResFn>
443   void requestOp(
444       OpFunc&& opFn,
445       const FillFunc& fillFn,
446       const ResFn& resFn,
447       Rec* rec,
448       bool syncop,
449       const bool custom) {
450     std::unique_lock<Mutex> l(this->m_, std::defer_lock);
451     if (l.try_lock()) {
452       // No contention
453       ++uncombined_;
454       tryCombining();
455       opFn();
456       return;
457     }
458
459     // Try FC
460     bool tc = (rec != nullptr);
461     if (!tc) {
462       // if an async op doesn't have a thread-cached record then turn
463       // it into a synchronous op.
464       syncop = true;
465       rec = allocRec();
466     }
467     if (rec == nullptr) {
468       // Can't use FC - Must acquire lock
469       l.lock();
470       ++uncombined_;
471       tryCombining();
472       opFn();
473       return;
474     }
475
476     // Use FC
477     // Wait if record is in use
478     awaitDone(*rec);
479     rec->clearDone();
480     // Fill record
481     if (custom) {
482       // Fill the request (custom)
483       Req& req = rec->getReq();
484       fillFn(req);
485       rec->clearFn();
486     } else {
487       rec->setFn(std::forward<OpFunc>(opFn));
488     }
489     // Indicate that record is valid
490     assert(!rec->isValid());
491     rec->setValid();
492     // end of combining critical path
493     setPending();
494     // store-load order setValid before isDisconnected
495     std::atomic_thread_fence(std::memory_order_seq_cst);
496     if (rec->isDisconnected()) {
497       rec->clearDisconnected();
498       pushRec(rec->getIndex());
499       setPending();
500     }
501     // If synchronous wait for the request to be completed
502     if (syncop) {
503       awaitDone(*rec);
504       if (custom) {
505         Req& req = rec->getReq();
506         resFn(req); // Extract the result (custom)
507       }
508       if (!tc) {
509         freeRec(rec); // Free the temporary record.
510       }
511     }
512   }
513
514   void pushRec(size_t idx) {
515     Rec& rec = recsPool_[idx];
516     while (true) {
517       auto head = recs_.load(std::memory_order_acquire);
518       rec.setNext(head); // there shouldn't be a data race here
519       if (recs_.compare_exchange_weak(head, idx)) {
520         return;
521       }
522     }
523   }
524
525   size_t getRecsHead() {
526     return recs_.load(std::memory_order_acquire);
527   }
528
529   size_t nextIndex(size_t idx) {
530     return recsPool_[idx].getNext();
531   }
532
533   void clearPending() {
534     pending_.reset();
535   }
536
537   void setPending() {
538     pending_.post();
539   }
540
541   bool isPending() const {
542     return pending_.ready();
543   }
544
545   void awaitPending() {
546     pending_.wait();
547   }
548
549   uint64_t combiningSession() {
550     uint64_t combined = 0;
551     do {
552       uint64_t count = static_cast<T*>(this)->combiningPass();
553       if (count == 0) {
554         break;
555       }
556       combined += count;
557       ++this->passes_;
558     } while (combined < this->maxOps_);
559     return combined;
560   }
561
562   void tryCombining() {
563     if (!dedicated_) {
564       while (isPending()) {
565         clearPending();
566         ++sessions_;
567         combined_ += combiningSession();
568       }
569     }
570   }
571
572   void dedicatedCombining() {
573     while (true) {
574       awaitPending();
575       clearPending();
576       if (shutdown_.load()) {
577         break;
578       }
579       while (true) {
580         uint64_t count;
581         ++sessions_;
582         {
583           std::lock_guard<Mutex> guard(m_);
584           count = combiningSession();
585           combined_ += count;
586         }
587         if (count < maxOps_) {
588           break;
589         }
590       }
591     }
592   }
593
594   void awaitDone(Rec& rec) {
595     if (dedicated_) {
596       rec.awaitDone();
597     } else {
598       awaitDoneTryLock(rec);
599     }
600   }
601
602   /// Waits for the request to be done and occasionally tries to
603   /// acquire the lock and to do combining. Used only in the absence
604   /// of a dedicated combiner.
605   void awaitDoneTryLock(Rec& rec) {
606     assert(!dedicated_);
607     int count = 0;
608     while (!rec.isDone()) {
609       if (count == 0) {
610         std::unique_lock<Mutex> l(m_, std::defer_lock);
611         if (l.try_lock()) {
612           setPending();
613           tryCombining();
614         }
615       } else {
616         folly::asm_volatile_pause();
617         if (++count == 1000) {
618           count = 0;
619         }
620       }
621     }
622   }
623
624   void shutdown() {
625     shutdown_.store(true);
626     setPending();
627   }
628
629   /// The following member functions may be overridden for customization
630
631   void combinedOp(Req&) {
632     throw std::runtime_error(
633         "FlatCombining::combinedOp(Req&) must be overridden in the derived"
634         " class if called.");
635   }
636
637   void processReq(Rec& rec) {
638     SavedFn& opFn = rec.getFn();
639     if (opFn) {
640       // simple interface
641       opFn();
642     } else {
643       // custom interface
644       Req& req = rec.getReq();
645       static_cast<T*>(this)->combinedOp(req); // defined in derived class
646     }
647     rec.setLast(passes_);
648     rec.complete();
649   }
650
651   uint64_t combiningPass() {
652     uint64_t count = 0;
653     auto idx = getRecsHead();
654     Rec* prev = nullptr;
655     while (idx != NULL_INDEX) {
656       Rec& rec = recsPool_[idx];
657       auto next = rec.getNext();
658       bool valid = rec.isValid();
659       if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
660           (prev != nullptr)) {
661         // Disconnect
662         prev->setNext(next);
663         rec.setDisconnected();
664         // store-load order setDisconnected before isValid
665         std::atomic_thread_fence(std::memory_order_seq_cst);
666         valid = rec.isValid();
667       } else {
668         prev = &rec;
669       }
670       if (valid) {
671         processReq(rec);
672         ++count;
673       }
674       idx = next;
675     }
676     return count;
677   }
678 };
679
680 } // namespace folly