Add missing include for flock()
[folly.git] / folly / wangle / rx / test / RxTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/rx/Observer.h>
18 #include <folly/wangle/rx/Subject.h>
19 #include <gtest/gtest.h>
20
21 using namespace folly::wangle;
22
23 static std::unique_ptr<Observer<int>> incrementer(int& counter) {
24   return Observer<int>::create([&] (int x) {
25     counter++;
26   });
27 }
28
29 TEST(RxTest, Observe) {
30   Subject<int> subject;
31   auto count = 0;
32   subject.observe(incrementer(count));
33   subject.onNext(1);
34   EXPECT_EQ(1, count);
35 }
36
37 TEST(RxTest, ObserveInline) {
38   Subject<int> subject;
39   auto count = 0;
40   auto o = incrementer(count).release();
41   subject.observe(o);
42   subject.onNext(1);
43   EXPECT_EQ(1, count);
44   delete o;
45 }
46
47 TEST(RxTest, Subscription) {
48   Subject<int> subject;
49   auto count = 0;
50   {
51     auto s = subject.subscribe(incrementer(count));
52     subject.onNext(1);
53   }
54   // The subscription has gone out of scope so no one should get this.
55   subject.onNext(2);
56   EXPECT_EQ(1, count);
57 }
58
59 TEST(RxTest, SubscriptionMove) {
60   Subject<int> subject;
61   auto count = 0;
62   auto s = subject.subscribe(incrementer(count));
63   auto s2 = subject.subscribe(incrementer(count));
64   s2 = std::move(s);
65   subject.onNext(1);
66   Subscription<int> s3(std::move(s2));
67   subject.onNext(2);
68   EXPECT_EQ(2, count);
69 }
70
71 TEST(RxTest, SubscriptionOutlivesSubject) {
72   Subscription<int> s;
73   {
74     Subject<int> subject;
75     s = subject.subscribe(Observer<int>::create([](int){}));
76   }
77   // Don't explode when s is destroyed
78 }
79
80 TEST(RxTest, SubscribeDuringCallback) {
81   // A subscriber who was subscribed in the course of a callback should get
82   // subsequent updates but not the current update.
83   Subject<int> subject;
84   int outerCount = 0, innerCount = 0;
85   Subscription<int> s1, s2;
86   s1 = subject.subscribe(Observer<int>::create([&] (int x) {
87     outerCount++;
88     s2 = subject.subscribe(incrementer(innerCount));
89   }));
90   subject.onNext(42);
91   subject.onNext(0xDEADBEEF);
92   EXPECT_EQ(2, outerCount);
93   EXPECT_EQ(1, innerCount);
94 }
95
96 TEST(RxTest, ObserveDuringCallback) {
97   Subject<int> subject;
98   int outerCount = 0, innerCount = 0;
99   subject.observe(Observer<int>::create([&] (int x) {
100     outerCount++;
101     subject.observe(incrementer(innerCount));
102   }));
103   subject.onNext(42);
104   subject.onNext(0xDEADBEEF);
105   EXPECT_EQ(2, outerCount);
106   EXPECT_EQ(1, innerCount);
107 }
108
109 TEST(RxTest, ObserveInlineDuringCallback) {
110   Subject<int> subject;
111   int outerCount = 0, innerCount = 0;
112   auto innerO = incrementer(innerCount).release();
113   auto outerO = Observer<int>::create([&] (int x) {
114     outerCount++;
115     subject.observe(innerO);
116   }).release();
117   subject.observe(outerO);
118   subject.onNext(42);
119   subject.onNext(0xDEADBEEF);
120   EXPECT_EQ(2, outerCount);
121   EXPECT_EQ(1, innerCount);
122   delete innerO;
123   delete outerO;
124 }
125
126 TEST(RxTest, UnsubscribeDuringCallback) {
127   // A subscriber who was unsubscribed in the course of a callback should get
128   // the current update but not subsequent ones
129   Subject<int> subject;
130   int count1 = 0, count2 = 0;
131   auto s1 = subject.subscribe(incrementer(count1));
132   auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
133     count2++;
134     s1.~Subscription();
135   }));
136   subject.onNext(1);
137   subject.onNext(2);
138   EXPECT_EQ(1, count1);
139   EXPECT_EQ(2, count2);
140 }
141
142 TEST(RxTest, SubscribeUnsubscribeDuringCallback) {
143   // A subscriber who was subscribed and unsubscribed in the course of a
144   // callback should not get any updates
145   Subject<int> subject;
146   int outerCount = 0, innerCount = 0;
147   auto s2 = subject.subscribe(Observer<int>::create([&] (int x) {
148     outerCount++;
149     auto s2 = subject.subscribe(incrementer(innerCount));
150   }));
151   subject.onNext(1);
152   subject.onNext(2);
153   EXPECT_EQ(2, outerCount);
154   EXPECT_EQ(0, innerCount);
155 }
156
157 // Move only type
158 typedef std::unique_ptr<int> MO;
159 static MO makeMO() { return folly::make_unique<int>(1); }
160 template <typename T>
161 static ObserverPtr<T> makeMOObserver() {
162   return Observer<T>::create([](const T& mo) {
163     EXPECT_EQ(1, *mo);
164   });
165 }
166
167 TEST(RxTest, MoveOnlyRvalue) {
168   Subject<MO> subject;
169   auto s1 = subject.subscribe(makeMOObserver<MO>());
170   auto s2 = subject.subscribe(makeMOObserver<MO>());
171   auto mo = makeMO();
172   // Can't bind lvalues to rvalue references
173   // subject.onNext(mo);
174   subject.onNext(std::move(mo));
175   subject.onNext(makeMO());
176 }
177
178 // Copy only type
179 struct CO {
180   CO() = default;
181   CO(const CO&) = default;
182   CO(CO&&) = delete;
183 };
184
185 template <typename T>
186 static ObserverPtr<T> makeCOObserver() {
187   return Observer<T>::create([](const T& mo) {});
188 }
189
190 TEST(RxTest, CopyOnly) {
191   Subject<CO> subject;
192   auto s1 = subject.subscribe(makeCOObserver<CO>());
193   CO co;
194   subject.onNext(co);
195 }