1 #include <websocketpp/config/asio_no_tls.hpp>
3 #include <websocketpp/server.hpp>
8 /*#include <boost/thread.hpp>
9 #include <boost/thread/mutex.hpp>
10 #include <boost/thread/condition_variable.hpp>*/
11 #include <websocketpp/common/thread.hpp>
13 typedef websocketpp::server<websocketpp::config::asio> server;
15 using websocketpp::connection_hdl;
16 using websocketpp::lib::placeholders::_1;
17 using websocketpp::lib::placeholders::_2;
18 using websocketpp::lib::bind;
20 using websocketpp::lib::thread;
21 using websocketpp::lib::mutex;
22 using websocketpp::lib::lock_guard;
23 using websocketpp::lib::unique_lock;
24 using websocketpp::lib::condition_variable;
26 /* on_open insert connection_hdl into channel
27 * on_close remove connection_hdl from channel
28 * on_message queue send to all channels
38 action(action_type t, connection_hdl h) : type(t), hdl(h) {}
39 action(action_type t, connection_hdl h, server::message_ptr m)
40 : type(t), hdl(h), msg(m) {}
43 websocketpp::connection_hdl hdl;
44 server::message_ptr msg;
47 class broadcast_server {
50 // Initialize Asio Transport
53 // Register handler callbacks
54 m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
55 m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
56 m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
59 void run(uint16_t port) {
60 // listen on specified port
61 m_server.listen(port);
63 // Start the server accept loop
64 m_server.start_accept();
66 // Start the ASIO io_service run loop
69 } catch (const std::exception & e) {
70 std::cout << e.what() << std::endl;
74 void on_open(connection_hdl hdl) {
76 lock_guard<mutex> guard(m_action_lock);
77 //std::cout << "on_open" << std::endl;
78 m_actions.push(action(SUBSCRIBE,hdl));
80 m_action_cond.notify_one();
83 void on_close(connection_hdl hdl) {
85 lock_guard<mutex> guard(m_action_lock);
86 //std::cout << "on_close" << std::endl;
87 m_actions.push(action(UNSUBSCRIBE,hdl));
89 m_action_cond.notify_one();
92 void on_message(connection_hdl hdl, server::message_ptr msg) {
93 // queue message up for sending by processing thread
95 lock_guard<mutex> guard(m_action_lock);
96 //std::cout << "on_message" << std::endl;
97 m_actions.push(action(MESSAGE,hdl,msg));
99 m_action_cond.notify_one();
102 void process_messages() {
104 unique_lock<mutex> lock(m_action_lock);
106 while(m_actions.empty()) {
107 m_action_cond.wait(lock);
110 action a = m_actions.front();
115 if (a.type == SUBSCRIBE) {
116 lock_guard<mutex> guard(m_connection_lock);
117 m_connections.insert(a.hdl);
118 } else if (a.type == UNSUBSCRIBE) {
119 lock_guard<mutex> guard(m_connection_lock);
120 m_connections.erase(a.hdl);
121 } else if (a.type == MESSAGE) {
122 lock_guard<mutex> guard(m_connection_lock);
124 con_list::iterator it;
125 for (it = m_connections.begin(); it != m_connections.end(); ++it) {
126 m_server.send(*it,a.msg);
134 typedef std::set<connection_hdl,std::owner_less<connection_hdl> > con_list;
137 con_list m_connections;
138 std::queue<action> m_actions;
141 mutex m_connection_lock;
142 condition_variable m_action_cond;
147 broadcast_server server_instance;
149 // Start a thread to run the processing loop
150 thread t(bind(&broadcast_server::process_messages,&server_instance));
152 // Run the asio loop with the main thread
153 server_instance.run(9002);
157 } catch (websocketpp::exception const & e) {
158 std::cout << e.what() << std::endl;