1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
10 #include <cds/container/skip_list_map_hp.h>
11 #include <cds/gc/hp.h>
14 #include <rapidjson/document.h>
16 #include <websocketpp/client.hpp>
17 #include <websocketpp/concurrency/none.hpp>
18 #include <websocketpp/config/asio_client.hpp>
21 * A copy of the GDAX order book for the currency pair product given during
22 * construction, exposed as two maps, one for bids and one for offers, each
23 * mapping price levels to order quantities, continually updated in real time
24 * via the `level2` channel of the Websocket feed of the GDAX API.
26 * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
27 * process them into the maps.
29 * To ensure high performance, implemented using concurrent data structures
30 * from libcds. The price->quantity maps are instances of
31 * cds::container::SkipListMap, whose doc says it is lock-free.
35 // libcds requires paired Initialize/Terminate calls
36 struct CDSInitializer {
37 CDSInitializer() { cds::Initialize(); }
38 ~CDSInitializer() { cds::Terminate(); }
41 // a libcds garbage collector is required for our map structures
42 cds::gc::HP m_cdsGarbageCollector;
46 * libcds requires each and every thread to first "attach" itself to the
47 * lib before using any data structures. This method is used internally,
48 * and is called during construction on behalf of the constructing thread,
49 * and should be called by any additional client threads that access the
50 * price->quanitity maps.
52 static void ensureThreadAttached()
54 if (cds::threading::Manager::isThreadAttached() == false)
55 cds::threading::Manager::attachThread();
57 //std::ofstream myfile;
59 GDAXOrderBook(std::string const& product = "BTC-USD")
60 : m_cdsGarbageCollector(67*2),
61 // per SkipListMap doc, 67 hazard pointers per instance
65 &GDAXOrderBook::handleUpdates,
69 ensureThreadAttached();
70 m_bookInitialized.get_future().wait();
75 using Price = unsigned int; // cents
77 using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
79 cds::container::SkipListMap<
83 // reverse map ordering so best (highest) bid is at begin()
84 typename cds::container::skip_list::make_traits<
85 cds::opt::less<std::greater<Price>>>::type>;
86 // *map_t::get(Price) returns an std::pair<Price, Size>*
90 ~GDAXOrderBook() { m_client.stop(); myfile.close(); }
93 struct websocketppConfig
94 : public websocketpp::config::asio_tls_client
96 typedef websocketpp::concurrency::none concurrency_type;
97 // we only have one thread using the WebSocket
99 using websocketclient_t = websocketpp::client<websocketppConfig>;
100 websocketclient_t m_client;
102 std::promise<void> m_bookInitialized; // to signal constructor to finish
104 std::future<void> m_threadTerminator; // for graceful thread destruction
107 * Initiates WebSocket connection, subscribes to order book updates for the
108 * given product, installs a message handler which will receive updates
109 * and process them into the maps, and starts the asio event loop.
111 void handleUpdates(std::string const& product)
113 ensureThreadAttached();
115 rapidjson::Document json;
118 m_client.clear_access_channels(websocketpp::log::alevel::all);
119 m_client.set_access_channels(
120 websocketpp::log::alevel::connect |
121 websocketpp::log::alevel::disconnect);
123 m_client.clear_error_channels(websocketpp::log::elevel::all);
124 m_client.set_error_channels(
125 websocketpp::log::elevel::info |
126 websocketpp::log::elevel::warn |
127 websocketpp::log::elevel::rerror |
128 websocketpp::log::elevel::fatal);
130 m_client.init_asio();
134 m_client.set_tls_init_handler(
135 [](websocketpp::connection_hdl)
137 websocketpp::lib::shared_ptr<boost::asio::ssl::context>
138 context = websocketpp::lib::make_shared<
139 boost::asio::ssl::context>(
140 boost::asio::ssl::context::tlsv1);
143 context->set_options(
144 boost::asio::ssl::context::default_workarounds |
145 boost::asio::ssl::context::no_sslv2 |
146 boost::asio::ssl::context::no_sslv3 |
147 boost::asio::ssl::context::single_dh_use);
148 } catch (std::exception& e) {
149 std::cerr << "set_tls_init_handler() failed to set"
150 " context options: " << e.what() << std::endl;
155 m_client.set_open_handler(
156 [this, &product](websocketpp::connection_hdl handle)
158 // subscribe to updates to product's order book
159 websocketpp::lib::error_code errorCode;
160 this->m_client.send(handle,
162 "\"type\": \"subscribe\","
163 "\"product_ids\": [" "\""+product+"\"" "],"
164 "\"channels\": [" "\"level2\"" "]"
165 "}", websocketpp::frame::opcode::text, errorCode);
167 std::cerr << "error sending subscription: " +
168 errorCode.message() << std::endl;
172 m_client.set_message_handler(
173 [this, &json] (websocketpp::connection_hdl,
174 websocketppConfig::message_type::ptr msg)
177 /*if(!myfile.is_open())
178 myfile.open("example.txt");
181 myfile <<msg->get_payload().c_str()<<std::endl;
183 else std::cout << "Unable to open file";*/
185 json.Parse(msg->get_payload().c_str());
186 const char *const type = json["type"].GetString();
187 if ( strcmp(type, "l2update") == 0 )
189 processUpdates(json, bids, offers);
191 else if ( strcmp(type, "snapshot") == 0 )
193 processSnapshot(json, bids, offers, m_bookInitialized);
197 websocketpp::lib::error_code errorCode;
200 if(!myfile.is_open())
201 myfile.open("example.txt");
203 while ( std::getline(myfile,line) )
205 json.Parse(line.c_str());
206 const char *const type = json["type"].GetString();
207 if ( strcmp(type, "l2update") == 0 )
209 processUpdates(json, bids, offers);
211 else if ( strcmp(type, "snapshot") == 0 )
213 processSnapshot(json, bids, offers, m_bookInitialized);
218 m_client.get_connection("ws://fsdf.sdfj", errorCode);
220 std::cerr << "failed websocketclient_t::get_connection(): " <<
221 errorCode.message() << std::endl;
224 m_client.connect(connection);
228 } catch (websocketpp::exception const & e) {
229 std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
237 * Simply delegates snapshot processing to a helper function (different
238 * template instantiations of the same function, one for each type of map
239 * (bid, offer)), and signals when the snapshot has been processed.
241 static void processSnapshot(
242 rapidjson::Document & json,
244 offers_map_t & offers,
245 std::promise<void> & finished)
247 processSnapshotHalf(json, "bids", bids);
248 processSnapshotHalf(json, "asks", offers);
249 finished.set_value();
253 * Helper to permit code re-use on either type of map (bids or offers).
254 * Traverses already-parsed json document and inserts initial-price
255 * snapshots for entire half (bids or offers) of the order book.
257 template<typename map_t>
258 static void processSnapshotHalf(
259 rapidjson::Document const& json,
260 const char *const bidsOrOffers,
263 for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
265 Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
266 Size size = std::stod(json[bidsOrOffers][j][1].GetString());
268 map.insert(price, size);
273 * Traverses already-parsed json document, and, assuming it's a "l2update"
274 * document, updates price->quantity maps based on the order book changes
275 * that have occurred.
277 static void processUpdates(
278 rapidjson::Document & json,
280 offers_map_t & offers)
282 for (auto i = 0 ; i < json["changes"].Size() ; ++i)
284 const char* buyOrSell = json["changes"][i][0].GetString(),
285 * price = json["changes"][i][1].GetString(),
286 * size = json["changes"][i][2].GetString();
288 if ( strcmp(buyOrSell, "buy") == 0 )
290 updateMap(price, size, bids);
294 updateMap(price, size, offers);
300 * Helper to permit code re-use on either type of map (bids or offers).
301 * Simply updates a single map entry with the specified price/size.
303 template<typename map_t>
304 static void updateMap(
305 const char *const price,
306 const char *const size,
309 if (std::stod(size) == 0) { map.erase(std::stod(price)); }
313 std::stod(price)*100,
315 std::pair<const Price, Size> & pair)
317 pair.second = std::stod(size);
323 #endif // GDAX_ORDERBOOK_HPP