File indexing completed on 2024-12-15 05:02:06
0001 /* 0002 SPDX-FileCopyrightText: 2018 Ivan Čukić <ivan.cukic(at)kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL 0005 */ 0006 0007 #ifndef VOY_ZMQ_SERVICE_H 0008 #define VOY_ZMQ_SERVICE_H 0009 0010 // STL 0011 #include <memory> 0012 #include <string> 0013 #include <iostream> 0014 0015 // Boost and ZMQ 0016 #include <boost/asio.hpp> 0017 #include <azmq/socket.hpp> 0018 0019 // Self 0020 #include "../utils.h" 0021 #include "../traits.h" 0022 #include "../dsl/node_tags.h" 0023 #include "../dsl/node_traits.h" 0024 #include "../engine/asio/service.h" 0025 0026 #include "../../utils/debug.h" 0027 0028 namespace voy::zmq { 0029 0030 using voy::utils::non_copyable; 0031 0032 using voy::dsl::continuator_base, 0033 voy::dsl::source_node_tag; 0034 0035 namespace policy { 0036 0037 struct spread { 0038 template <typename Socket> 0039 static void publisher_init(Socket& socket, const std::string& path) 0040 { 0041 debug::out() << "spread/publisher binding to " << path; 0042 socket.bind(path); 0043 } 0044 0045 template <typename Socket> 0046 static void subscriber_init(Socket& socket, const std::string& path) 0047 { 0048 debug::out() << "spread/subscriber connecting to " << path; 0049 socket.connect(path); 0050 socket.set_option(azmq::socket::subscribe("")); 0051 } 0052 }; 0053 0054 struct collect { 0055 template <typename Socket> 0056 static void publisher_init(Socket& socket, const std::string& path) 0057 { 0058 debug::out() << "collect/publisher connecting to " << path; 0059 socket.connect(path); 0060 } 0061 0062 template <typename Socket> 0063 static void subscriber_init(Socket& socket, const std::string& path) 0064 { 0065 debug::out() << "collect/publisher binding to " << path; 0066 socket.bind(path); 0067 socket.set_option(azmq::socket::subscribe("")); 0068 } 0069 }; 0070 } 0071 0072 0073 std::string ipc(const std::string& id) 0074 { 0075 static const char *userenv = getenv("USER"); 0076 std::string user = userenv ? userenv : "unknown-user"; 0077 return "ipc:///tmp/voy-zmq-" + std::string(user) + "-" + id; 0078 } 0079 0080 0081 template <typename Policy = policy::spread> 0082 class publisher: non_copyable { 0083 public: 0084 using node_category = sink_node_tag; 0085 0086 explicit publisher(std::string path) 0087 : m_path{std::move(path)} 0088 { 0089 } 0090 0091 template <typename Msg> 0092 void operator()(Msg&& value) const 0093 { 0094 // std::cerr << "Sending... " << value << std::endl; 0095 if (m_socket) { 0096 m_socket->async_send(voy_fwd(azmq::message(value)), 0097 [] (auto, auto) {}); 0098 } 0099 } 0100 0101 void init() 0102 { 0103 m_socket = std::make_unique<azmq::pub_socket>(engine::asio::service::instance()); 0104 0105 Policy::publisher_init(*m_socket, m_path); 0106 } 0107 0108 void notify_ended() const 0109 { 0110 m_socket.reset(); 0111 } 0112 0113 private: 0114 std::string m_path; 0115 mutable std::unique_ptr<azmq::pub_socket> m_socket; 0116 0117 }; 0118 0119 0120 template <typename Policy = policy::spread> 0121 class subscriber: non_copyable { 0122 public: 0123 using node_category = source_node_tag; 0124 0125 explicit subscriber(std::string path) 0126 : m_path{std::move(path)} 0127 { 0128 } 0129 0130 template <typename Cont> 0131 class node: public continuator_base<Cont>, non_copyable { 0132 using base = continuator_base<Cont>; 0133 0134 private: 0135 struct impl { 0136 impl(node* parent, const std::string& path) 0137 : q{parent} 0138 , socket(engine::asio::service::instance()) 0139 { 0140 std::cerr << "AZMQ: Connecting to " << path << " ...\n"; 0141 Policy::subscriber_init(socket, path); 0142 0143 read_next(); 0144 } 0145 0146 void read_next() 0147 { 0148 socket.async_receive([this] (const boost::system::error_code& error, 0149 azmq::message& msg, 0150 size_t bytes_transferred) { 0151 if (!error) { 0152 q->base::emit(msg.string()); 0153 // q->base::emit(msg); 0154 read_next(); 0155 } 0156 }); 0157 } 0158 0159 node* const q; 0160 azmq::sub_socket socket; 0161 boost::asio::streambuf data; 0162 }; 0163 0164 std::unique_ptr<impl> d; 0165 std::string m_path; 0166 0167 public: 0168 node(std::string path, Cont&& cont) 0169 : base{std::move(cont)} 0170 , m_path{std::move(path)} 0171 { 0172 } 0173 0174 void init() 0175 { 0176 d = std::make_unique<impl>(this, m_path); 0177 base::init(); 0178 } 0179 }; 0180 0181 template <typename Cont> 0182 auto with_continuation(Cont&& cont) && 0183 { 0184 return node<Cont>(std::move(m_path), voy_fwd(cont)); 0185 } 0186 0187 private: 0188 std::string m_path; 0189 0190 }; 0191 0192 } // namespace voy::zmq 0193 0194 #endif // include guard 0195