add missing include to ThreadId.h
[folly.git] / folly / test / MPMCPipelineTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/MPMCPipeline.h>
18
19 #include <thread>
20 #include <vector>
21
22 #include <glog/logging.h>
23
24 #include <folly/Conv.h>
25 #include <folly/portability/GTest.h>
26
27 namespace folly { namespace test {
28
29 TEST(MPMCPipeline, Trivial) {
30   MPMCPipeline<int, std::string> a(2, 2);
31   EXPECT_EQ(0, a.sizeGuess());
32   a.blockingWrite(42);
33   EXPECT_EQ(1, a.sizeGuess());
34
35   int val;
36   auto ticket = a.blockingReadStage<0>(val);
37   EXPECT_EQ(42, val);
38   EXPECT_EQ(1, a.sizeGuess());
39
40   a.blockingWriteStage<0>(ticket, "hello world");
41   EXPECT_EQ(1, a.sizeGuess());
42
43   std::string s;
44
45   a.blockingRead(s);
46   EXPECT_EQ("hello world", s);
47   EXPECT_EQ(0, a.sizeGuess());
48 }
49
50 TEST(MPMCPipeline, TrivialAmplification) {
51   MPMCPipeline<int, MPMCPipelineStage<std::string, 2>> a(2, 2);
52   EXPECT_EQ(0, a.sizeGuess());
53   a.blockingWrite(42);
54   EXPECT_EQ(2, a.sizeGuess());
55
56   int val;
57   auto ticket = a.blockingReadStage<0>(val);
58   EXPECT_EQ(42, val);
59   EXPECT_EQ(2, a.sizeGuess());
60
61   a.blockingWriteStage<0>(ticket, "hello world");
62   EXPECT_EQ(2, a.sizeGuess());
63   a.blockingWriteStage<0>(ticket, "goodbye");
64   EXPECT_EQ(2, a.sizeGuess());
65
66   std::string s;
67
68   a.blockingRead(s);
69   EXPECT_EQ("hello world", s);
70   EXPECT_EQ(1, a.sizeGuess());
71
72   a.blockingRead(s);
73   EXPECT_EQ("goodbye", s);
74   EXPECT_EQ(0, a.sizeGuess());
75 }
76
77 TEST(MPMCPipeline, MultiThreaded) {
78   constexpr size_t numThreadsPerStage = 6;
79   MPMCPipeline<int, std::string, std::string> a(5, 5, 5);
80
81   std::vector<std::thread> threads;
82   threads.reserve(numThreadsPerStage * 2 + 1);
83   for (size_t i = 0; i < numThreadsPerStage; ++i) {
84     threads.emplace_back([&a] {
85       for (;;) {
86         int val;
87         auto ticket = a.blockingReadStage<0>(val);
88         if (val == -1) {  // stop
89           // We still need to propagate
90           a.blockingWriteStage<0>(ticket, "");
91           break;
92         }
93         a.blockingWriteStage<0>(
94             ticket, folly::to<std::string>(val, " hello"));
95       }
96     });
97   }
98
99   for (size_t i = 0; i < numThreadsPerStage; ++i) {
100     threads.emplace_back([&a] {
101       for (;;) {
102         std::string val;
103         auto ticket = a.blockingReadStage<1>(val);
104         if (val.empty()) {  // stop
105           // We still need to propagate
106           a.blockingWriteStage<1>(ticket, "");
107           break;
108         }
109         a.blockingWriteStage<1>(
110             ticket, folly::to<std::string>(val, " world"));
111       }
112     });
113   }
114
115   std::vector<std::string> results;
116   threads.emplace_back([&a, &results] () {
117     for (;;) {
118       std::string val;
119       a.blockingRead(val);
120       if (val.empty()) {
121         break;
122       }
123       results.push_back(val);
124     }
125   });
126
127   constexpr size_t numValues = 1000;
128   for (size_t i = 0; i < numValues; ++i) {
129     a.blockingWrite(i);
130   }
131   for (size_t i = 0; i < numThreadsPerStage; ++i) {
132     a.blockingWrite(-1);
133   }
134
135   for (auto& t : threads) {
136     t.join();
137   }
138
139   // The consumer thread dequeued the first empty string, there should be
140   // numThreadsPerStage - 1 left.
141   EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess());
142   for (size_t i = 0; i < numThreadsPerStage - 1; ++i) {
143     std::string val;
144     a.blockingRead(val);
145     EXPECT_TRUE(val.empty());
146   }
147   {
148     std::string tmp;
149     EXPECT_FALSE(a.read(tmp));
150   }
151   EXPECT_EQ(0, a.sizeGuess());
152
153   EXPECT_EQ(numValues, results.size());
154   for (size_t i = 0; i < results.size(); ++i) {
155     EXPECT_EQ(folly::to<std::string>(i, " hello world"), results[i]);
156   }
157 }
158
159 }}  // namespaces
160
161 int main(int argc, char *argv[]) {
162   testing::InitGoogleTest(&argc, argv);
163   gflags::ParseCommandLineFlags(&argc, &argv, true);
164   return RUN_ALL_TESTS();
165 }