35410a51b37892aa67b093e3ddff38b637fe1e1b
[c11concurrency-benchmarks.git] / gdax-orderbook-hpp / gdax-orderbook.hpp
1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
3
4 #include <cstring>
5 #include <future>
6 #include <iostream>
7 #include <string>
8 #include <fstream>
9
10 #include <cds/container/skip_list_map_hp.h>
11 #include <cds/gc/hp.h>
12 #include <cds/init.h>
13
14 #include <rapidjson/document.h>
15
16 #include <websocketpp/client.hpp>
17 #include <websocketpp/concurrency/none.hpp>
18 #include <websocketpp/config/asio_client.hpp>
19
20 /**
21  * A copy of the GDAX order book for the currency pair product given during
22  * construction, exposed as two maps, one for bids and one for offers, each
23  * mapping price levels to order quantities, continually updated in real time
24  * via the `level2` channel of the Websocket feed of the GDAX API.
25  *
26  * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
27  * process them into the maps.
28  *
29  * To ensure high performance, implemented using concurrent data structures
30  * from libcds.  The price->quantity maps are instances of
31  * cds::container::SkipListMap, whose doc says it is lock-free.
32  */
33 class GDAXOrderBook {
34 private:
35     // libcds requires paired Initialize/Terminate calls
36     struct CDSInitializer {
37         CDSInitializer() { cds::Initialize(); }
38         ~CDSInitializer() { cds::Terminate(); }
39     } m_cdsInitializer;
40
41     // a libcds garbage collector is required for our map structures
42     cds::gc::HP m_cdsGarbageCollector;
43
44 public:
45     /**
46      * libcds requires each and every thread to first "attach" itself to the
47      * lib before using any data structures. This method is used internally,
48      * and is called during construction on behalf of the constructing thread,
49      * and should be called by any additional client threads that access the
50      * price->quanitity maps.
51      */
52     static void ensureThreadAttached()
53     {
54         if (cds::threading::Manager::isThreadAttached() == false)
55             cds::threading::Manager::attachThread();
56     }
57         //std::ofstream myfile;
58         std::ifstream myfile;
59     GDAXOrderBook(std::string const& product = "BTC-USD")
60         : m_cdsGarbageCollector(67*2),
61             // per SkipListMap doc, 67 hazard pointers per instance
62           m_threadTerminator(
63             std::async(
64                 std::launch::async,
65                 &GDAXOrderBook::handleUpdates,
66                 this,
67                 product))
68     {
69         ensureThreadAttached();
70         m_bookInitialized.get_future().wait();
71     }
72     
73    
74
75     using Price = unsigned int; // cents
76     using Size = double;
77     using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
78     using bids_map_t =
79         cds::container::SkipListMap<
80             cds::gc::HP,
81             Price,
82             Size,
83             // reverse map ordering so best (highest) bid is at begin()
84             typename cds::container::skip_list::make_traits<
85                 cds::opt::less<std::greater<Price>>>::type>;
86     // *map_t::get(Price) returns an std::pair<Price, Size>*
87     bids_map_t bids;
88     offers_map_t offers;
89
90     ~GDAXOrderBook() { m_client.stop(); myfile.close(); }
91
92 private:
93     struct websocketppConfig
94         : public websocketpp::config::asio_tls_client
95     {
96         typedef websocketpp::concurrency::none concurrency_type;
97             // we only have one thread using the WebSocket
98     };
99     using websocketclient_t = websocketpp::client<websocketppConfig>;
100     websocketclient_t m_client;
101
102     std::promise<void> m_bookInitialized; // to signal constructor to finish
103
104     std::future<void> m_threadTerminator; // for graceful thread destruction
105
106     /**
107      * Initiates WebSocket connection, subscribes to order book updates for the
108      * given product, installs a message handler which will receive updates
109      * and process them into the maps, and starts the asio event loop.
110      */
111     void handleUpdates(std::string const& product)
112     {
113         ensureThreadAttached();
114
115         rapidjson::Document json;
116
117         try {
118             m_client.clear_access_channels(websocketpp::log::alevel::all);
119             m_client.set_access_channels(
120                 websocketpp::log::alevel::connect |
121                 websocketpp::log::alevel::disconnect);
122
123             m_client.clear_error_channels(websocketpp::log::elevel::all);
124             m_client.set_error_channels(
125                 websocketpp::log::elevel::info |
126                 websocketpp::log::elevel::warn |
127                 websocketpp::log::elevel::rerror |
128                 websocketpp::log::elevel::fatal);
129
130             m_client.init_asio();
131
132             
133
134             m_client.set_tls_init_handler(
135                 [](websocketpp::connection_hdl)
136                 {
137                     websocketpp::lib::shared_ptr<boost::asio::ssl::context>
138                         context = websocketpp::lib::make_shared<
139                             boost::asio::ssl::context>(
140                             boost::asio::ssl::context::tlsv1);
141
142                     try {
143                         context->set_options(
144                             boost::asio::ssl::context::default_workarounds |
145                             boost::asio::ssl::context::no_sslv2 |
146                             boost::asio::ssl::context::no_sslv3 |
147                             boost::asio::ssl::context::single_dh_use);
148                     } catch (std::exception& e) {
149                         std::cerr << "set_tls_init_handler() failed to set"
150                             " context options: " << e.what() << std::endl;
151                     }
152                     return context;
153                 });
154
155             m_client.set_open_handler(
156                 [this, &product](websocketpp::connection_hdl handle)
157                 {
158                     // subscribe to updates to product's order book
159                     websocketpp::lib::error_code errorCode;
160                     this->m_client.send(handle,
161                         "{"
162                             "\"type\": \"subscribe\","
163                             "\"product_ids\": [" "\""+product+"\"" "],"
164                             "\"channels\": [" "\"level2\"" "]"
165                         "}", websocketpp::frame::opcode::text, errorCode);
166                     if (errorCode) {
167                         std::cerr << "error sending subscription: " +
168                             errorCode.message() << std::endl;
169                     }
170                 });
171
172             m_client.set_message_handler(
173                 [this, &json] (websocketpp::connection_hdl,
174                                websocketppConfig::message_type::ptr msg)
175                 {
176
177                 /*if(!myfile.is_open())
178                         myfile.open("example.txt");
179                 if(myfile.is_open())
180                   {
181                     myfile <<msg->get_payload().c_str()<<std::endl;
182                   }
183                   else std::cout << "Unable to open file";*/
184
185                         json.Parse(msg->get_payload().c_str());
186                     const char *const type = json["type"].GetString();
187                     if ( strcmp(type, "l2update") == 0 )
188                     {
189                         processUpdates(json, bids, offers);
190                     }
191                     else if ( strcmp(type, "snapshot") == 0 )
192                     {
193                         processSnapshot(json, bids, offers, m_bookInitialized);
194                     }
195                 });
196
197             websocketpp::lib::error_code errorCode;
198
199
200             if(!myfile.is_open())
201                 myfile.open("example.txt");
202             std::string line;
203             while ( std::getline(myfile,line) )
204             {
205                     json.Parse(line.c_str());
206                     const char *const type = json["type"].GetString();
207                     if ( strcmp(type, "l2update") == 0 )
208                     {
209                         processUpdates(json, bids, offers);
210                     }
211                     else if ( strcmp(type, "snapshot") == 0 )
212                     {
213                         processSnapshot(json, bids, offers, m_bookInitialized);
214                     }
215             }
216
217          /*   auto connection =
218                 m_client.get_connection("ws://fsdf.sdfj", errorCode);
219             if (errorCode) {
220                 std::cerr << "failed websocketclient_t::get_connection(): " <<
221                     errorCode.message() << std::endl;
222             }
223         
224             m_client.connect(connection);
225
226             m_client.run();*/
227             
228         } catch (websocketpp::exception const & e) {
229             std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
230           
231                    
232                   
233         }
234     }
235
236     /**
237      * Simply delegates snapshot processing to a helper function (different
238      * template instantiations of the same function, one for each type of map
239      * (bid, offer)), and signals when the snapshot has been processed.
240      */
241     static void processSnapshot(
242         rapidjson::Document & json,
243         bids_map_t & bids,
244         offers_map_t & offers,
245         std::promise<void> & finished)
246     {
247         processSnapshotHalf(json, "bids", bids);
248         processSnapshotHalf(json, "asks", offers);
249         finished.set_value();
250     }
251
252     /**
253      * Helper to permit code re-use on either type of map (bids or offers).
254      * Traverses already-parsed json document and inserts initial-price
255      * snapshots for entire half (bids or offers) of the order book.
256      */
257     template<typename map_t>
258     static void processSnapshotHalf(
259         rapidjson::Document const& json,
260         const char *const bidsOrOffers,
261         map_t & map)
262     {
263         for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
264         {
265             Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
266             Size   size = std::stod(json[bidsOrOffers][j][1].GetString());
267
268             map.insert(price, size);
269         }
270     }
271
272     /**
273      * Traverses already-parsed json document, and, assuming it's a "l2update"
274      * document, updates price->quantity maps based on the order book changes
275      * that have occurred.
276      */
277     static void processUpdates(
278         rapidjson::Document & json,
279         bids_map_t & bids,
280         offers_map_t & offers)
281     {
282         for (auto i = 0 ; i < json["changes"].Size() ; ++i)
283         {
284             const char* buyOrSell = json["changes"][i][0].GetString(),
285                       * price     = json["changes"][i][1].GetString(),
286                       * size      = json["changes"][i][2].GetString();
287
288             if ( strcmp(buyOrSell, "buy") == 0 )
289             {
290                 updateMap(price, size, bids);
291             }
292             else
293             {
294                 updateMap(price, size, offers);
295             }
296         }
297     }
298
299     /**
300      * Helper to permit code re-use on either type of map (bids or offers).
301      * Simply updates a single map entry with the specified price/size.
302      */
303     template<typename map_t>
304     static void updateMap(
305         const char *const price,
306         const char *const size,
307         map_t & map)
308     {
309         if (std::stod(size) == 0) { map.erase(std::stod(price)); }
310         else
311         {
312             map.update(
313                 std::stod(price)*100,
314                 [size](bool & bNew,
315                        std::pair<const Price, Size> & pair)
316                 {
317                     pair.second = std::stod(size);
318                 });
319         }
320     }
321 };
322
323 #endif // GDAX_ORDERBOOK_HPP