add clang flags so that libcds will be compiled with llvm pass
[c11concurrency-benchmarks.git] / gdax-orderbook-hpp / gdax-orderbook.hpp
1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
3
4 #include <cstring>
5 #include <future>
6 #include <iostream>
7 #include <string>
8 #include <fstream>
9
10 #include <cds/container/skip_list_map_hp.h>
11 #include <cds/gc/hp.h>
12 #include <cds/init.h>
13
14 #include <rapidjson/document.h>
15
16 #include <websocketpp/client.hpp>
17 #include <websocketpp/concurrency/none.hpp>
18 #include <websocketpp/config/asio_client.hpp>
19 #define FILENAME "example90.txt"
20
21 /**
22  * A copy of the GDAX order book for the currency pair product given during
23  * construction, exposed as two maps, one for bids and one for offers, each
24  * mapping price levels to order quantities, continually updated in real time
25  * via the `level2` channel of the Websocket feed of the GDAX API.
26  *
27  * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
28  * process them into the maps.
29  *
30  * To ensure high performance, implemented using concurrent data structures
31  * from libcds.  The price->quantity maps are instances of
32  * cds::container::SkipListMap, whose doc says it is lock-free.
33  */
34 class GDAXOrderBook {
35 private:
36     // libcds requires paired Initialize/Terminate calls
37     struct CDSInitializer {
38         CDSInitializer() { cds::Initialize(); }
39         ~CDSInitializer() { cds::Terminate(); }
40     } m_cdsInitializer;
41
42     // a libcds garbage collector is required for our map structures
43     cds::gc::HP m_cdsGarbageCollector;
44
45 public:
46     /**
47      * libcds requires each and every thread to first "attach" itself to the
48      * lib before using any data structures. This method is used internally,
49      * and is called during construction on behalf of the constructing thread,
50      * and should be called by any additional client threads that access the
51      * price->quanitity maps.
52      */
53     static void ensureThreadAttached()
54     {
55         if (cds::threading::Manager::isThreadAttached() == false)
56             cds::threading::Manager::attachThread();
57     }
58         //std::ofstream myfile;
59         std::ifstream myfile;
60     GDAXOrderBook(std::string const& product = "BTC-USD")
61         : m_cdsGarbageCollector(67*2),
62             // per SkipListMap doc, 67 hazard pointers per instance
63           m_threadTerminator(
64             std::async(
65                 std::launch::async,
66                 &GDAXOrderBook::handleUpdates,
67                 this,
68                 product))
69     {
70         ensureThreadAttached();
71         m_bookInitialized.get_future().wait();
72     }
73     
74    
75
76     using Price = unsigned int; // cents
77     using Size = double;
78     using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
79     using bids_map_t =
80         cds::container::SkipListMap<
81             cds::gc::HP,
82             Price,
83             Size,
84             // reverse map ordering so best (highest) bid is at begin()
85             typename cds::container::skip_list::make_traits<
86                 cds::opt::less<std::greater<Price>>>::type>;
87     // *map_t::get(Price) returns an std::pair<Price, Size>*
88     bids_map_t bids;
89     offers_map_t offers;
90
91     ~GDAXOrderBook() { m_client.stop(); myfile.close(); }
92
93 private:
94     struct websocketppConfig
95         : public websocketpp::config::asio_tls_client
96     {
97         typedef websocketpp::concurrency::none concurrency_type;
98             // we only have one thread using the WebSocket
99     };
100     using websocketclient_t = websocketpp::client<websocketppConfig>;
101     websocketclient_t m_client;
102
103     std::promise<void> m_bookInitialized; // to signal constructor to finish
104
105     std::future<void> m_threadTerminator; // for graceful thread destruction
106
107     /**
108      * Initiates WebSocket connection, subscribes to order book updates for the
109      * given product, installs a message handler which will receive updates
110      * and process them into the maps, and starts the asio event loop.
111      */
112     void handleUpdates(std::string const& product)
113     {
114         ensureThreadAttached();
115
116         rapidjson::Document json;
117
118         try {
119             m_client.clear_access_channels(websocketpp::log::alevel::all);
120             m_client.set_access_channels(
121                 websocketpp::log::alevel::connect |
122                 websocketpp::log::alevel::disconnect);
123
124             m_client.clear_error_channels(websocketpp::log::elevel::all);
125             m_client.set_error_channels(
126                 websocketpp::log::elevel::info |
127                 websocketpp::log::elevel::warn |
128                 websocketpp::log::elevel::rerror |
129                 websocketpp::log::elevel::fatal);
130
131             m_client.init_asio();
132
133             
134
135             m_client.set_tls_init_handler(
136                 [](websocketpp::connection_hdl)
137                 {
138                     websocketpp::lib::shared_ptr<boost::asio::ssl::context>
139                         context = websocketpp::lib::make_shared<
140                             boost::asio::ssl::context>(
141                             boost::asio::ssl::context::tlsv1);
142
143                     try {
144                         context->set_options(
145                             boost::asio::ssl::context::default_workarounds |
146                             boost::asio::ssl::context::no_sslv2 |
147                             boost::asio::ssl::context::no_sslv3 |
148                             boost::asio::ssl::context::single_dh_use);
149                     } catch (std::exception& e) {
150                         std::cerr << "set_tls_init_handler() failed to set"
151                             " context options: " << e.what() << std::endl;
152                     }
153                     return context;
154                 });
155
156             m_client.set_open_handler(
157                 [this, &product](websocketpp::connection_hdl handle)
158                 {
159                     // subscribe to updates to product's order book
160                     websocketpp::lib::error_code errorCode;
161                     this->m_client.send(handle,
162                         "{"
163                             "\"type\": \"subscribe\","
164                             "\"product_ids\": [" "\""+product+"\"" "],"
165                             "\"channels\": [" "\"level2\"" "]"
166                         "}", websocketpp::frame::opcode::text, errorCode);
167                     if (errorCode) {
168                         std::cerr << "error sending subscription: " +
169                             errorCode.message() << std::endl;
170                     }
171                 });
172
173             m_client.set_message_handler(
174                 [this, &json] (websocketpp::connection_hdl,
175                                websocketppConfig::message_type::ptr msg)
176                 {
177
178                 /*if(!myfile.is_open())
179                         myfile.open(FILENAME);
180                 if(myfile.is_open())
181                   {
182                     myfile <<msg->get_payload().c_str()<<std::endl;
183                   }
184                   else std::cout << "Unable to open file";*/
185
186                         json.Parse(msg->get_payload().c_str());
187                     const char *const type = json["type"].GetString();
188                     if ( strcmp(type, "l2update") == 0 )
189                     {
190                         processUpdates(json, bids, offers);
191                     }
192                     else if ( strcmp(type, "snapshot") == 0 )
193                     {
194                         processSnapshot(json, bids, offers, m_bookInitialized);
195                     }
196                 });
197
198             websocketpp::lib::error_code errorCode;
199
200
201             if(!myfile.is_open())
202                 myfile.open(FILENAME);
203             std::string line;
204             while ( std::getline(myfile,line) )
205             {
206                     json.Parse(line.c_str());
207                     const char *const type = json["type"].GetString();
208                     if ( strcmp(type, "l2update") == 0 )
209                     {
210                         processUpdates(json, bids, offers);
211                     }
212                     else if ( strcmp(type, "snapshot") == 0 )
213                     {
214                         processSnapshot(json, bids, offers, m_bookInitialized);
215                     }
216             }
217
218          /*   auto connection =
219                 m_client.get_connection("wss://ws-feed.gdax.com", errorCode);
220             if (errorCode) {
221                 std::cerr << "failed websocketclient_t::get_connection(): " <<
222                     errorCode.message() << std::endl;
223             }
224         
225             m_client.connect(connection);
226
227             m_client.run();*/
228             
229         } catch (websocketpp::exception const & e) {
230             std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
231           
232                    
233                   
234         }
235     }
236
237     /**
238      * Simply delegates snapshot processing to a helper function (different
239      * template instantiations of the same function, one for each type of map
240      * (bid, offer)), and signals when the snapshot has been processed.
241      */
242     static void processSnapshot(
243         rapidjson::Document & json,
244         bids_map_t & bids,
245         offers_map_t & offers,
246         std::promise<void> & finished)
247     {
248         processSnapshotHalf(json, "bids", bids);
249         processSnapshotHalf(json, "asks", offers);
250         finished.set_value();
251     }
252
253     /**
254      * Helper to permit code re-use on either type of map (bids or offers).
255      * Traverses already-parsed json document and inserts initial-price
256      * snapshots for entire half (bids or offers) of the order book.
257      */
258     template<typename map_t>
259     static void processSnapshotHalf(
260         rapidjson::Document const& json,
261         const char *const bidsOrOffers,
262         map_t & map)
263     {
264         for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
265         {
266             Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
267             Size   size = std::stod(json[bidsOrOffers][j][1].GetString());
268
269             map.insert(price, size);
270         }
271     }
272
273     /**
274      * Traverses already-parsed json document, and, assuming it's a "l2update"
275      * document, updates price->quantity maps based on the order book changes
276      * that have occurred.
277      */
278     static void processUpdates(
279         rapidjson::Document & json,
280         bids_map_t & bids,
281         offers_map_t & offers)
282     {
283         for (auto i = 0 ; i < json["changes"].Size() ; ++i)
284         {
285             const char* buyOrSell = json["changes"][i][0].GetString(),
286                       * price     = json["changes"][i][1].GetString(),
287                       * size      = json["changes"][i][2].GetString();
288
289             if ( strcmp(buyOrSell, "buy") == 0 )
290             {
291                 updateMap(price, size, bids);
292             }
293             else
294             {
295                 updateMap(price, size, offers);
296             }
297         }
298     }
299
300     /**
301      * Helper to permit code re-use on either type of map (bids or offers).
302      * Simply updates a single map entry with the specified price/size.
303      */
304     template<typename map_t>
305     static void updateMap(
306         const char *const price,
307         const char *const size,
308         map_t & map)
309     {
310         if (std::stod(size) == 0) { map.erase(std::stod(price)); }
311         else
312         {
313             map.update(
314                 std::stod(price)*100,
315                 [size](bool & bNew,
316                        std::pair<const Price, Size> & pair)
317                 {
318                     pair.second = std::stod(size);
319                 });
320         }
321     }
322 };
323
324 #endif // GDAX_ORDERBOOK_HPP