X-Git-Url: http://plrg.eecs.uci.edu/git/?p=c11concurrency-benchmarks.git;a=blobdiff_plain;f=gdax-orderbook-hpp%2Fdemo%2Fdependencies%2Frapidjson-1.1.0%2Fexample%2Fparsebyparts%2Fparsebyparts.cpp;fp=gdax-orderbook-hpp%2Fdemo%2Fdependencies%2Frapidjson-1.1.0%2Fexample%2Fparsebyparts%2Fparsebyparts.cpp;h=57eed005dea431056634782c9f8c61a8b8a008fe;hp=0000000000000000000000000000000000000000;hb=4223430d9168223029c7639149025c79e69b4f37;hpb=7ea7751a31c0388bf888052517be181a2989b113;ds=sidebyside diff --git a/gdax-orderbook-hpp/demo/dependencies/rapidjson-1.1.0/example/parsebyparts/parsebyparts.cpp b/gdax-orderbook-hpp/demo/dependencies/rapidjson-1.1.0/example/parsebyparts/parsebyparts.cpp new file mode 100644 index 0000000..57eed00 --- /dev/null +++ b/gdax-orderbook-hpp/demo/dependencies/rapidjson-1.1.0/example/parsebyparts/parsebyparts.cpp @@ -0,0 +1,173 @@ +// Example of parsing JSON to document by parts. + +// Using C++11 threads +// Temporarily disable for clang (older version) due to incompatibility with libstdc++ +#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__) + +#include "rapidjson/document.h" +#include "rapidjson/error/en.h" +#include "rapidjson/writer.h" +#include "rapidjson/ostreamwrapper.h" +#include +#include +#include +#include + +using namespace rapidjson; + +template +class AsyncDocumentParser { +public: + AsyncDocumentParser(Document& d) + : stream_(*this) + , d_(d) + , parseThread_(&AsyncDocumentParser::Parse, this) + , mutex_() + , notEmpty_() + , finish_() + , completed_() + {} + + ~AsyncDocumentParser() { + if (!parseThread_.joinable()) + return; + + { + std::unique_lock lock(mutex_); + + // Wait until the buffer is read up (or parsing is completed) + while (!stream_.Empty() && !completed_) + finish_.wait(lock); + + // Automatically append '\0' as the terminator in the stream. + static const char terminator[] = ""; + stream_.src_ = terminator; + stream_.end_ = terminator + 1; + notEmpty_.notify_one(); // unblock the AsyncStringStream + } + + parseThread_.join(); + } + + void ParsePart(const char* buffer, size_t length) { + std::unique_lock lock(mutex_); + + // Wait until the buffer is read up (or parsing is completed) + while (!stream_.Empty() && !completed_) + finish_.wait(lock); + + // Stop further parsing if the parsing process is completed. + if (completed_) + return; + + // Set the buffer to stream and unblock the AsyncStringStream + stream_.src_ = buffer; + stream_.end_ = buffer + length; + notEmpty_.notify_one(); + } + +private: + void Parse() { + d_.ParseStream(stream_); + + // The stream may not be fully read, notify finish anyway to unblock ParsePart() + std::unique_lock lock(mutex_); + completed_ = true; // Parsing process is completed + finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting. + } + + struct AsyncStringStream { + typedef char Ch; + + AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {} + + char Peek() const { + std::unique_lock lock(parser_.mutex_); + + // If nothing in stream, block to wait. + while (Empty()) + parser_.notEmpty_.wait(lock); + + return *src_; + } + + char Take() { + std::unique_lock lock(parser_.mutex_); + + // If nothing in stream, block to wait. + while (Empty()) + parser_.notEmpty_.wait(lock); + + count_++; + char c = *src_++; + + // If all stream is read up, notify that the stream is finish. + if (Empty()) + parser_.finish_.notify_one(); + + return c; + } + + size_t Tell() const { return count_; } + + // Not implemented + char* PutBegin() { return 0; } + void Put(char) {} + void Flush() {} + size_t PutEnd(char*) { return 0; } + + bool Empty() const { return src_ == end_; } + + AsyncDocumentParser& parser_; + const char* src_; //!< Current read position. + const char* end_; //!< End of buffer + size_t count_; //!< Number of characters taken so far. + }; + + AsyncStringStream stream_; + Document& d_; + std::thread parseThread_; + std::mutex mutex_; + std::condition_variable notEmpty_; + std::condition_variable finish_; + bool completed_; +}; + +int main() { + Document d; + + { + AsyncDocumentParser<> parser(d); + + const char json1[] = " { \"hello\" : \"world\", \"t\" : tr"; + //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error + const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14"; + const char json3[] = "16, \"a\":[1, 2, 3, 4] } "; + + parser.ParsePart(json1, sizeof(json1) - 1); + parser.ParsePart(json2, sizeof(json2) - 1); + parser.ParsePart(json3, sizeof(json3) - 1); + } + + if (d.HasParseError()) { + std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl; + return EXIT_FAILURE; + } + + // Stringify the JSON to cout + OStreamWrapper os(std::cout); + Writer writer(os); + d.Accept(writer); + std::cout << std::endl; + + return EXIT_SUCCESS; +} + +#else // Not supporting C++11 + +#include +int main() { + std::cout << "This example requires C++11 compiler" << std::endl; +} + +#endif