/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
void openConsumer() { ++consumers_; }
void closeInputProducer() {
- int64_t producers = producers_--;
+ size_t producers = producers_--;
CHECK(producers);
if (producers == 1) { // last producer
wakeConsumer_.notifyAll();
}
void closeOutputConsumer() {
- int64_t consumers = consumers_--;
+ size_t consumers = consumers_--;
CHECK(consumers);
if (consumers == 1) { // last consumer
wakeProducer_.notifyAll();
public:
explicit Sub(Sink sink) : sink_(sink) {}
- template <class Value,
- class Source,
- class Result =
- decltype(std::declval<Sink>().compose(std::declval<Source>())),
- class Just = Just<typename std::decay<Result>::type>>
+ template <
+ class Value,
+ class Source,
+ class Result =
+ decltype(std::declval<Sink>().compose(std::declval<Source>())),
+ class Just = SingleCopy<typename std::decay<Result>::type>>
Just compose(const GenImpl<Value, Source>& source) const {
return Just(source | sink_);
}
public:
Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
- template <class Input,
- class Source,
- class InputDecayed = typename std::decay<Input>::type,
- class Composed =
- decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
- class Output = typename Composed::ValueType,
- class OutputDecayed = typename std::decay<Output>::type>
+ template <
+ class Input,
+ class Source,
+ class InputDecayed = typename std::decay<Input>::type,
+ class Composed =
+ decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
+ class Output = typename Composed::ValueType,
+ class OutputDecayed = typename std::decay<Output>::type>
class Generator : public GenImpl<OutputDecayed&&,
Generator<Input,
Source,
void work() {
puller_ | *ops_ | pusher_;
- };
+ }
public:
Executor(size_t threads, const Ops* ops)
ops_(ops) {
inQueue_.openProducer();
outQueue_.openConsumer();
- for (int t = 0; t < threads; ++t) {
+ for (size_t t = 0; t < threads; ++t) {
inQueue_.openConsumer();
outQueue_.openProducer();
workers_.emplace_back([this] {
Generator(Source source, Ops ops, size_t threads)
: source_(std::move(source)),
ops_(std::move(ops)),
- threads_(threads
- ?: std::max<size_t>(1, sysconf(_SC_NPROCESSORS_CONF))) {}
+ threads_(
+ threads
+ ? threads
+ : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
template <class Handler>
bool apply(Handler&& handler) const {
Range<Iterator> range_;
public:
- ChunkedRangeSource() {}
+ ChunkedRangeSource() = default;
ChunkedRangeSource(int chunkSize, Range<Iterator> range)
: chunkSize_(chunkSize), range_(std::move(range)) {}