File indexing completed on 2024-12-15 05:02:06

0001 /*
0002     SPDX-FileCopyrightText: 2018 Ivan Čukić <ivan.cukic(at)kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
0005 */
0006 
0007 #ifndef VOY_ZMQ_SERVICE_H
0008 #define VOY_ZMQ_SERVICE_H
0009 
0010 // STL
0011 #include <memory>
0012 #include <string>
0013 #include <iostream>
0014 
0015 // Boost and ZMQ
0016 #include <boost/asio.hpp>
0017 #include <azmq/socket.hpp>
0018 
0019 // Self
0020 #include "../utils.h"
0021 #include "../traits.h"
0022 #include "../dsl/node_tags.h"
0023 #include "../dsl/node_traits.h"
0024 #include "../engine/asio/service.h"
0025 
0026 #include "../../utils/debug.h"
0027 
0028 namespace voy::zmq {
0029 
0030 using voy::utils::non_copyable;
0031 
0032 using voy::dsl::continuator_base,
0033       voy::dsl::source_node_tag;
0034 
0035 namespace policy {
0036 
0037     struct spread {
0038         template <typename Socket>
0039         static void publisher_init(Socket& socket, const std::string& path)
0040         {
0041             debug::out() << "spread/publisher binding to " << path;
0042             socket.bind(path);
0043         }
0044 
0045         template <typename Socket>
0046         static void subscriber_init(Socket& socket, const std::string& path)
0047         {
0048             debug::out() << "spread/subscriber connecting to " << path;
0049             socket.connect(path);
0050             socket.set_option(azmq::socket::subscribe(""));
0051         }
0052     };
0053 
0054     struct collect {
0055         template <typename Socket>
0056         static void publisher_init(Socket& socket, const std::string& path)
0057         {
0058             debug::out() << "collect/publisher connecting to " << path;
0059             socket.connect(path);
0060         }
0061 
0062         template <typename Socket>
0063         static void subscriber_init(Socket& socket, const std::string& path)
0064         {
0065             debug::out() << "collect/publisher binding to " << path;
0066             socket.bind(path);
0067             socket.set_option(azmq::socket::subscribe(""));
0068         }
0069     };
0070 }
0071 
0072 
0073 std::string ipc(const std::string& id)
0074 {
0075     static const char *userenv = getenv("USER");
0076     std::string user = userenv ? userenv : "unknown-user";
0077     return "ipc:///tmp/voy-zmq-" + std::string(user) + "-" + id;
0078 }
0079 
0080 
0081 template <typename Policy = policy::spread>
0082 class publisher: non_copyable {
0083 public:
0084     using node_category = sink_node_tag;
0085 
0086     explicit publisher(std::string path)
0087         : m_path{std::move(path)}
0088     {
0089     }
0090 
0091     template <typename Msg>
0092     void operator()(Msg&& value) const
0093     {
0094         // std::cerr << "Sending... " << value << std::endl;
0095         if (m_socket) {
0096             m_socket->async_send(voy_fwd(azmq::message(value)),
0097                     [] (auto, auto) {});
0098         }
0099     }
0100 
0101     void init()
0102     {
0103         m_socket = std::make_unique<azmq::pub_socket>(engine::asio::service::instance());
0104 
0105         Policy::publisher_init(*m_socket, m_path);
0106     }
0107 
0108     void notify_ended() const
0109     {
0110         m_socket.reset();
0111     }
0112 
0113 private:
0114     std::string m_path;
0115     mutable std::unique_ptr<azmq::pub_socket> m_socket;
0116 
0117 };
0118 
0119 
0120 template <typename Policy = policy::spread>
0121 class subscriber: non_copyable {
0122 public:
0123     using node_category = source_node_tag;
0124 
0125     explicit subscriber(std::string path)
0126         : m_path{std::move(path)}
0127     {
0128     }
0129 
0130     template <typename Cont>
0131     class node: public continuator_base<Cont>, non_copyable {
0132         using base = continuator_base<Cont>;
0133 
0134     private:
0135         struct impl {
0136             impl(node* parent, const std::string& path)
0137                 : q{parent}
0138                 , socket(engine::asio::service::instance())
0139             {
0140                 std::cerr << "AZMQ: Connecting to " << path << " ...\n";
0141                 Policy::subscriber_init(socket, path);
0142 
0143                 read_next();
0144             }
0145 
0146             void read_next()
0147             {
0148                 socket.async_receive([this] (const boost::system::error_code& error,
0149                                              azmq::message& msg,
0150                                              size_t bytes_transferred) {
0151                         if (!error) {
0152                             q->base::emit(msg.string());
0153                             // q->base::emit(msg);
0154                             read_next();
0155                         }
0156                     });
0157             }
0158 
0159             node* const q;
0160             azmq::sub_socket socket;
0161             boost::asio::streambuf data;
0162         };
0163 
0164         std::unique_ptr<impl> d;
0165         std::string m_path;
0166 
0167     public:
0168         node(std::string path, Cont&& cont)
0169             : base{std::move(cont)}
0170             , m_path{std::move(path)}
0171         {
0172         }
0173 
0174         void init()
0175         {
0176             d = std::make_unique<impl>(this, m_path);
0177             base::init();
0178         }
0179     };
0180 
0181     template <typename Cont>
0182     auto with_continuation(Cont&& cont) &&
0183     {
0184         return node<Cont>(std::move(m_path), voy_fwd(cont));
0185     }
0186 
0187 private:
0188     std::string m_path;
0189 
0190 };
0191 
0192 } // namespace voy::zmq
0193 
0194 #endif // include guard
0195