File indexing completed on 2025-10-26 05:11:10

0001 /*
0002     SPDX-FileCopyrightText: 2018 Ivan Čukić <ivan.cukic(at)kde.org>
0003 
0004     SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
0005 */
0006 
0007 #ifndef VOY_OPERATIONS_MERGE_H
0008 #define VOY_OPERATIONS_MERGE_H
0009 
0010 // STL
0011 #include <functional>
0012 #include <tuple>
0013 
0014 // Self
0015 #include "../utils.h"
0016 #include "../dsl.h"
0017 
0018 namespace voy {
0019 
0020 using voy::utils::non_copyable;
0021 
0022 using voy::dsl::source_node_tag;
0023 
0024 /**
0025  * merge is a class that combines multiple streams into one.
0026  *
0027  * Class design:
0028  *  - The class behaves like a source stream -- when the source streams
0029  *    are mergged, the rest of the program will see the merged stream
0030  *    as a single source of values
0031  *  - Internally, it needs to connect to the source streams in order to
0032  *    be able to pass the values on which means that it creates a
0033  *    sink for each of the source streams and connects to them
0034  *
0035  */
0036 template <typename... Sources>
0037 class merge {
0038 public:
0039     using node_category = source_node_tag;
0040 
0041     explicit merge(Sources... sources)
0042         : m_sources{std::make_tuple(std::move(sources)...)}
0043     {
0044     }
0045 
0046     template <typename Cont>
0047     class node: non_copyable {
0048     private:
0049 
0050         // Connections take ownership of the connected objects.
0051         // We can not allow this to happen as the merged stream
0052         // connects to several streams at once. Instead, we
0053         // will give just a dummy reference for the connections
0054         // to take ownership of. Now, both the node and the
0055         // references can be moved to new instances, so this
0056         // is a bit more complicated than it seems.
0057 
0058         struct impl {
0059             struct internal_connection_base;
0060             std::array<std::unique_ptr<internal_connection_base>, sizeof...(Sources)> sources;
0061 
0062             mutable int sources_active;
0063             Cont continuation;
0064 
0065             // When connecting the sources, we want to type-erase the
0066             // connection and just keep a pointer to it to extend
0067             // its lifetime
0068             struct internal_connection_base { //_
0069             public:
0070                 virtual ~internal_connection_base() {}
0071             };
0072 
0073             template <typename T>
0074             struct internal_connection: internal_connection_base {
0075             public:
0076                 internal_connection(T connection)
0077                     : connection{std::move(connection)}
0078                 {
0079                     // utils::print_types<T>();
0080                 }
0081 
0082                 T connection;
0083             };
0084 
0085             struct ref {
0086                 using node_category = sink_node_tag;
0087 
0088                 ref(impl* _this)
0089                     : m_this{_this}
0090                 {
0091                 }
0092 
0093                 template <typename T>
0094                 void operator() (T&& value) const
0095                 {
0096                     m_this->operator()(voy_fwd(value));
0097                 }
0098 
0099                 void init()
0100                 {
0101                     // TODO: Should we react to init?
0102                 }
0103 
0104                 void notify_ended() const
0105                 {
0106                     m_this->notify_ended();
0107                 }
0108 
0109                 impl* m_this;
0110             };
0111 
0112             // Creates and instance of the type-erased connection
0113             template <typename Source>
0114             std::unique_ptr<internal_connection_base> create_connection(
0115                     size_t index,
0116                     Source&& source)
0117             {
0118                 return
0119                     std::make_unique<
0120                         internal_connection<
0121                             decltype(dsl::detail::connect_streams(
0122                                 std::move(source),
0123                                 ref(this)))
0124                         >
0125                     >
0126                         (
0127                             dsl::detail::connect_streams(
0128                                 std::move(source),
0129                                 ref(this))
0130                         )
0131                 ;
0132             }
0133 
0134             template <size_t... Idx>
0135             decltype(sources) create_connections(std::tuple<Sources...> sources,
0136                                                  std::index_sequence<Idx...>)
0137             {
0138                 return { create_connection(Idx, std::get<Idx>(std::move(sources)))... };
0139             } //^
0140 
0141             impl(std::tuple<Sources...> sources, Cont continuation)
0142                 : sources{create_connections(std::move(sources), std::index_sequence_for<Sources...>())}
0143                 , sources_active{sizeof...(Sources)}
0144                 , continuation{std::move(continuation)}
0145             {
0146             }
0147 
0148             void init()
0149             {
0150             }
0151 
0152             void notify_ended() const
0153             {
0154                 if (0 == --sources_active) {
0155                     continuation.notify_ended();
0156                 }
0157             }
0158 
0159             template <typename T>
0160             void operator() (T&& value)
0161             {
0162                 voy_fwd_invoke(continuation, value);
0163             }
0164 
0165             ~impl() = default;
0166             impl(const impl&) = delete;
0167             impl(impl&&) = delete;
0168             impl& operator=(impl) = delete;
0169         };
0170         std::unique_ptr<impl> d;
0171 
0172     public:
0173         node(std::tuple<Sources...> sources, Cont&& continuation)
0174             : d{std::make_unique<impl>(std::move(sources), std::move(continuation))}
0175         {
0176         }
0177 
0178         ~node() = default;
0179         node(node&& other) noexcept = default;
0180         node& operator=(node other) = delete;
0181 
0182         void init()
0183         {
0184             d->init();
0185         }
0186 
0187         template <typename T>
0188         void operator() (T&& value) const
0189         {
0190             voy_fwd_invoke(d->m_continuation, value);
0191         }
0192 
0193         void notify_ended() const
0194         {
0195             d->notify_ended();
0196         }
0197 
0198     };
0199 
0200     template <typename Cont>
0201     auto with_continuation(Cont&& cont) &&
0202     {
0203         return node<Cont>(std::move(m_sources), voy_fwd(cont));
0204     }
0205 
0206 private:
0207     std::tuple<Sources...> m_sources;
0208 };
0209 
0210 } // namespace voy
0211 
0212 #endif // include guard
0213