fixed adding file problem
[c11concurrency-benchmarks.git] / gdax-orderbook-hpp / gdax-orderbook.hpp
1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
3
4 #include <cstring>
5 #include <future>
6 #include <iostream>
7 #include <string>
8
9 #include <cds/container/skip_list_map_hp.h>
10 #include <cds/gc/hp.h>
11 #include <cds/init.h>
12
13 #include <rapidjson/document.h>
14
15 #include <websocketpp/client.hpp>
16 #include <websocketpp/concurrency/none.hpp>
17 #include <websocketpp/config/asio_client.hpp>
18
19 /**
20  * A copy of the GDAX order book for the currency pair product given during
21  * construction, exposed as two maps, one for bids and one for offers, each
22  * mapping price levels to order quantities, continually updated in real time
23  * via the `level2` channel of the Websocket feed of the GDAX API.
24  *
25  * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
26  * process them into the maps.
27  *
28  * To ensure high performance, implemented using concurrent data structures
29  * from libcds.  The price->quantity maps are instances of
30  * cds::container::SkipListMap, whose doc says it is lock-free.
31  */
32 class GDAXOrderBook {
33 private:
34     // libcds requires paired Initialize/Terminate calls
35     struct CDSInitializer {
36         CDSInitializer() { cds::Initialize(); }
37         ~CDSInitializer() { cds::Terminate(); }
38     } m_cdsInitializer;
39
40     // a libcds garbage collector is required for our map structures
41     cds::gc::HP m_cdsGarbageCollector;
42
43 public:
44     /**
45      * libcds requires each and every thread to first "attach" itself to the
46      * lib before using any data structures. This method is used internally,
47      * and is called during construction on behalf of the constructing thread,
48      * and should be called by any additional client threads that access the
49      * price->quanitity maps.
50      */
51     static void ensureThreadAttached()
52     {
53         if (cds::threading::Manager::isThreadAttached() == false)
54             cds::threading::Manager::attachThread();
55     }
56
57     GDAXOrderBook(std::string const& product = "BTC-USD")
58         : m_cdsGarbageCollector(67*2),
59             // per SkipListMap doc, 67 hazard pointers per instance
60           m_threadTerminator(
61             std::async(
62                 std::launch::async,
63                 &GDAXOrderBook::handleUpdates,
64                 this,
65                 product))
66     {
67         ensureThreadAttached();
68         m_bookInitialized.get_future().wait();
69     }
70
71     using Price = unsigned int; // cents
72     using Size = double;
73     using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
74     using bids_map_t =
75         cds::container::SkipListMap<
76             cds::gc::HP,
77             Price,
78             Size,
79             // reverse map ordering so best (highest) bid is at begin()
80             typename cds::container::skip_list::make_traits<
81                 cds::opt::less<std::greater<Price>>>::type>;
82     // *map_t::get(Price) returns an std::pair<Price, Size>*
83     bids_map_t bids;
84     offers_map_t offers;
85
86     ~GDAXOrderBook() { m_client.stop(); }
87
88 private:
89     struct websocketppConfig
90         : public websocketpp::config::asio_tls_client
91     {
92         typedef websocketpp::concurrency::none concurrency_type;
93             // we only have one thread using the WebSocket
94     };
95     using websocketclient_t = websocketpp::client<websocketppConfig>;
96     websocketclient_t m_client;
97
98     std::promise<void> m_bookInitialized; // to signal constructor to finish
99
100     std::future<void> m_threadTerminator; // for graceful thread destruction
101
102     /**
103      * Initiates WebSocket connection, subscribes to order book updates for the
104      * given product, installs a message handler which will receive updates
105      * and process them into the maps, and starts the asio event loop.
106      */
107     void handleUpdates(std::string const& product)
108     {
109         ensureThreadAttached();
110
111         rapidjson::Document json;
112
113         try {
114             m_client.clear_access_channels(websocketpp::log::alevel::all);
115             m_client.set_access_channels(
116                 websocketpp::log::alevel::connect |
117                 websocketpp::log::alevel::disconnect);
118
119             m_client.clear_error_channels(websocketpp::log::elevel::all);
120             m_client.set_error_channels(
121                 websocketpp::log::elevel::info |
122                 websocketpp::log::elevel::warn |
123                 websocketpp::log::elevel::rerror |
124                 websocketpp::log::elevel::fatal);
125
126             m_client.init_asio();
127
128             m_client.set_tls_init_handler(
129                 [](websocketpp::connection_hdl)
130                 {
131                     websocketpp::lib::shared_ptr<boost::asio::ssl::context>
132                         context = websocketpp::lib::make_shared<
133                             boost::asio::ssl::context>(
134                             boost::asio::ssl::context::tlsv1);
135
136                     try {
137                         context->set_options(
138                             boost::asio::ssl::context::default_workarounds |
139                             boost::asio::ssl::context::no_sslv2 |
140                             boost::asio::ssl::context::no_sslv3 |
141                             boost::asio::ssl::context::single_dh_use);
142                     } catch (std::exception& e) {
143                         std::cerr << "set_tls_init_handler() failed to set"
144                             " context options: " << e.what() << std::endl;
145                     }
146                     return context;
147                 });
148
149             m_client.set_open_handler(
150                 [this, &product](websocketpp::connection_hdl handle)
151                 {
152                     // subscribe to updates to product's order book
153                     websocketpp::lib::error_code errorCode;
154                     this->m_client.send(handle,
155                         "{"
156                             "\"type\": \"subscribe\","
157                             "\"product_ids\": [" "\""+product+"\"" "],"
158                             "\"channels\": [" "\"level2\"" "]"
159                         "}", websocketpp::frame::opcode::text, errorCode);
160                     if (errorCode) {
161                         std::cerr << "error sending subscription: " +
162                             errorCode.message() << std::endl;
163                     }
164                 });
165
166             m_client.set_message_handler(
167                 [this, &json] (websocketpp::connection_hdl,
168                                websocketppConfig::message_type::ptr msg)
169                 {
170                     json.Parse(msg->get_payload().c_str());
171                     const char *const type = json["type"].GetString();
172                     if ( strcmp(type, "l2update") == 0 )
173                     {
174                         processUpdates(json, bids, offers);
175                     }
176                     else if ( strcmp(type, "snapshot") == 0 )
177                     {
178                         processSnapshot(json, bids, offers, m_bookInitialized);
179                     }
180                 });
181
182             websocketpp::lib::error_code errorCode;
183             auto connection =
184                 m_client.get_connection("wss://ws-feed.gdax.com", errorCode);
185             if (errorCode) {
186                 std::cerr << "failed websocketclient_t::get_connection(): " <<
187                     errorCode.message() << std::endl;
188             }
189
190             m_client.connect(connection);
191
192             m_client.run();
193         } catch (websocketpp::exception const & e) {
194             std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
195         }
196     }
197
198     /**
199      * Simply delegates snapshot processing to a helper function (different
200      * template instantiations of the same function, one for each type of map
201      * (bid, offer)), and signals when the snapshot has been processed.
202      */
203     static void processSnapshot(
204         rapidjson::Document & json,
205         bids_map_t & bids,
206         offers_map_t & offers,
207         std::promise<void> & finished)
208     {
209         processSnapshotHalf(json, "bids", bids);
210         processSnapshotHalf(json, "asks", offers);
211         finished.set_value();
212     }
213
214     /**
215      * Helper to permit code re-use on either type of map (bids or offers).
216      * Traverses already-parsed json document and inserts initial-price
217      * snapshots for entire half (bids or offers) of the order book.
218      */
219     template<typename map_t>
220     static void processSnapshotHalf(
221         rapidjson::Document const& json,
222         const char *const bidsOrOffers,
223         map_t & map)
224     {
225         for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
226         {
227             Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
228             Size   size = std::stod(json[bidsOrOffers][j][1].GetString());
229
230             map.insert(price, size);
231         }
232     }
233
234     /**
235      * Traverses already-parsed json document, and, assuming it's a "l2update"
236      * document, updates price->quantity maps based on the order book changes
237      * that have occurred.
238      */
239     static void processUpdates(
240         rapidjson::Document & json,
241         bids_map_t & bids,
242         offers_map_t & offers)
243     {
244         for (auto i = 0 ; i < json["changes"].Size() ; ++i)
245         {
246             const char* buyOrSell = json["changes"][i][0].GetString(),
247                       * price     = json["changes"][i][1].GetString(),
248                       * size      = json["changes"][i][2].GetString();
249
250             if ( strcmp(buyOrSell, "buy") == 0 )
251             {
252                 updateMap(price, size, bids);
253             }
254             else
255             {
256                 updateMap(price, size, offers);
257             }
258         }
259     }
260
261     /**
262      * Helper to permit code re-use on either type of map (bids or offers).
263      * Simply updates a single map entry with the specified price/size.
264      */
265     template<typename map_t>
266     static void updateMap(
267         const char *const price,
268         const char *const size,
269         map_t & map)
270     {
271         if (std::stod(size) == 0) { map.erase(std::stod(price)); }
272         else
273         {
274             map.update(
275                 std::stod(price)*100,
276                 [size](bool & bNew,
277                        std::pair<const Price, Size> & pair)
278                 {
279                     pair.second = std::stod(size);
280                 });
281         }
282     }
283 };
284
285 #endif // GDAX_ORDERBOOK_HPP