Adding support for in-place use of ProducerConsumerQueue.
[folly.git] / folly / test / ProducerConsumerQueueTest.cpp
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/ProducerConsumerQueue.h"
18
19 #include <gtest/gtest.h>
20 #include <vector>
21 #include <atomic>
22 #include <chrono>
23 #include <memory>
24 #include <thread>
25 #include <glog/logging.h>
26
27 //////////////////////////////////////////////////////////////////////
28
29 namespace {
30
31 template<class T> struct TestTraits {
32   T limit() const { return 1 << 24; }
33   T generate() const { return rand() % 26; }
34 };
35
36 template<> struct TestTraits<std::string> {
37   int limit() const { return 1 << 22; }
38   std::string generate() const { return std::string(12, ' '); }
39 };
40
41 template<class QueueType, size_t Size, bool Pop = false>
42 struct PerfTest {
43   typedef typename QueueType::value_type T;
44
45   explicit PerfTest() : queue_(Size), done_(false) {}
46
47   void operator()() {
48     using namespace std::chrono;
49     auto const startTime = system_clock::now();
50
51     std::thread producer([this] { this->producer(); });
52     std::thread consumer([this] { this->consumer(); });
53
54     producer.join();
55     done_ = true;
56     consumer.join();
57
58     auto duration = duration_cast<milliseconds>(
59       system_clock::now() - startTime);
60     LOG(INFO) << "     done: " << duration.count() << "ms";
61   }
62
63   void producer() {
64     for (int i = 0; i < traits_.limit(); ++i) {
65       while (!queue_.write(traits_.generate())) {
66       }
67     }
68   }
69
70   void consumer() {
71     /*static*/ if (Pop) {
72       while (!done_) {
73         if (queue_.frontPtr()) {
74           queue_.popFront();
75         }
76       }
77     } else {
78       while (!done_) {
79         T data;
80         queue_.read(data);
81       }
82     }
83   }
84
85   QueueType queue_;
86   std::atomic<bool> done_;
87   TestTraits<T> traits_;
88 };
89
90 template<class TestType> void doTest(const char* name) {
91   LOG(INFO) << "  testing: " << name;
92   std::unique_ptr<TestType> const t(new TestType());
93   (*t)();
94 }
95
96 template<class T, bool Pop = false>
97 void perfTestType(const char* type) {
98   const size_t size = 0xfffe;
99
100   LOG(INFO) << "Type: " << type;
101   doTest<PerfTest<folly::ProducerConsumerQueue<T>,size,Pop> >(
102     "ProducerConsumerQueue");
103 }
104
105 template<class QueueType, size_t Size, bool Pop>
106 struct CorrectnessTest {
107   typedef typename QueueType::value_type T;
108
109   explicit CorrectnessTest()
110     : queue_(Size)
111     , done_(false)
112   {
113     const size_t testSize = traits_.limit();
114     testData_.reserve(testSize);
115     for (int i = 0; i < testSize; ++i) {
116       testData_.push_back(traits_.generate());
117     }
118   }
119
120   void operator()() {
121     std::thread producer([this] { this->producer(); });
122     std::thread consumer([this] { this->consumer(); });
123
124     producer.join();
125     done_ = true;
126     consumer.join();
127   }
128
129   void producer() {
130     for (auto& data : testData_) {
131       while (!queue_.write(data)) {
132       }
133     }
134   }
135
136   void consumer() {
137     if (Pop) {
138       consumerPop();
139     } else {
140       consumerRead();
141     }
142   }
143
144   void consumerPop() {
145     for (auto expect : testData_) {
146     again:
147       T* data;
148       if (!(data = queue_.frontPtr())) {
149         if (done_) {
150           // Try one more read; unless there's a bug in the queue class
151           // there should still be more data sitting in the queue even
152           // though the producer thread exited.
153           if (!(data = queue_.frontPtr())) {
154             EXPECT_TRUE(0 && "Finished too early ...");
155             return;
156           }
157         } else {
158           goto again;
159         }
160       } else {
161         queue_.popFront();
162       }
163
164       EXPECT_EQ(*data, expect);
165     }
166   }
167
168   void consumerRead() {
169     for (auto expect : testData_) {
170     again:
171       T data;
172       if (!queue_.read(data)) {
173         if (done_) {
174           // Try one more read; unless there's a bug in the queue class
175           // there should still be more data sitting in the queue even
176           // though the producer thread exited.
177           if (!queue_.read(data)) {
178             EXPECT_TRUE(0 && "Finished too early ...");
179             return;
180           }
181         } else {
182           goto again;
183         }
184       }
185       EXPECT_EQ(data, expect);
186     }
187   }
188
189   std::vector<T> testData_;
190   QueueType queue_;
191   TestTraits<T> traits_;
192   std::atomic<bool> done_;
193 };
194
195 template<class T, bool Pop = false>
196 void correctnessTestType(const std::string& type) {
197   LOG(INFO) << "Type: " << type;
198   doTest<CorrectnessTest<folly::ProducerConsumerQueue<T>,0xfffe,Pop> >(
199     "ProducerConsumerQueue");
200 }
201
202 struct DtorChecker {
203   static int numInstances;
204   DtorChecker() { ++numInstances; }
205   DtorChecker(const DtorChecker& o) { ++numInstances; }
206   ~DtorChecker() { --numInstances; }
207 };
208
209 int DtorChecker::numInstances = 0;
210
211 }
212
213 //////////////////////////////////////////////////////////////////////
214
215 TEST(PCQ, QueueCorrectness) {
216   correctnessTestType<std::string,true>("string (front+pop)");
217   correctnessTestType<std::string>("string");
218   correctnessTestType<int>("int");
219   correctnessTestType<unsigned long long>("unsigned long long");
220 }
221
222 TEST(PCQ, PerfTest) {
223   perfTestType<std::string,true>("string (front+pop)");
224   perfTestType<std::string>("string");
225   perfTestType<int>("int");
226   perfTestType<unsigned long long>("unsigned long long");
227 }
228
229 TEST(PCQ, Destructor) {
230   // Test that orphaned elements in a ProducerConsumerQueue are
231   // destroyed.
232   {
233     folly::ProducerConsumerQueue<DtorChecker> queue(1024);
234     for (int i = 0; i < 10; ++i) {
235       EXPECT_TRUE(queue.write(DtorChecker()));
236     }
237
238     EXPECT_EQ(DtorChecker::numInstances, 10);
239
240     {
241       DtorChecker ignore;
242       EXPECT_TRUE(queue.read(ignore));
243       EXPECT_TRUE(queue.read(ignore));
244     }
245
246     EXPECT_EQ(DtorChecker::numInstances, 8);
247   }
248
249   EXPECT_EQ(DtorChecker::numInstances, 0);
250
251   // Test the same thing in the case that the queue write pointer has
252   // wrapped, but the read one hasn't.
253   {
254     folly::ProducerConsumerQueue<DtorChecker> queue(4);
255     for (int i = 0; i < 3; ++i) {
256       EXPECT_TRUE(queue.write(DtorChecker()));
257     }
258     EXPECT_EQ(DtorChecker::numInstances, 3);
259     {
260       DtorChecker ignore;
261       EXPECT_TRUE(queue.read(ignore));
262     }
263     EXPECT_EQ(DtorChecker::numInstances, 2);
264     EXPECT_TRUE(queue.write(DtorChecker()));
265     EXPECT_EQ(DtorChecker::numInstances, 3);
266   }
267   EXPECT_EQ(DtorChecker::numInstances, 0);
268 }