File indexing completed on 2024-06-16 05:06:54
0001 /* 0002 SPDX-FileCopyrightText: 2018 Ivan Čukić <ivan.cukic(at)kde.org> 0003 0004 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL 0005 */ 0006 0007 #ifndef VOY_OPERATIONS_MERGE_H 0008 #define VOY_OPERATIONS_MERGE_H 0009 0010 // STL 0011 #include <functional> 0012 #include <tuple> 0013 0014 // Self 0015 #include "../utils.h" 0016 #include "../dsl.h" 0017 0018 namespace voy { 0019 0020 using voy::utils::non_copyable; 0021 0022 using voy::dsl::source_node_tag; 0023 0024 /** 0025 * merge is a class that combines multiple streams into one. 0026 * 0027 * Class design: 0028 * - The class behaves like a source stream -- when the source streams 0029 * are mergged, the rest of the program will see the merged stream 0030 * as a single source of values 0031 * - Internally, it needs to connect to the source streams in order to 0032 * be able to pass the values on which means that it creates a 0033 * sink for each of the source streams and connects to them 0034 * 0035 */ 0036 template <typename... Sources> 0037 class merge { 0038 public: 0039 using node_category = source_node_tag; 0040 0041 explicit merge(Sources... sources) 0042 : m_sources{std::make_tuple(std::move(sources)...)} 0043 { 0044 } 0045 0046 template <typename Cont> 0047 class node: non_copyable { 0048 private: 0049 0050 // Connections take ownership of the connected objects. 0051 // We can not allow this to happen as the merged stream 0052 // connects to several streams at once. Instead, we 0053 // will give just a dummy reference for the connections 0054 // to take ownership of. Now, both the node and the 0055 // references can be moved to new instances, so this 0056 // is a bit more complicated than it seems. 0057 0058 struct impl { 0059 struct internal_connection_base; 0060 std::array<std::unique_ptr<internal_connection_base>, sizeof...(Sources)> sources; 0061 0062 mutable int sources_active; 0063 Cont continuation; 0064 0065 // When connecting the sources, we want to type-erase the 0066 // connection and just keep a pointer to it to extend 0067 // its lifetime 0068 struct internal_connection_base { //_ 0069 public: 0070 virtual ~internal_connection_base() {} 0071 }; 0072 0073 template <typename T> 0074 struct internal_connection: internal_connection_base { 0075 public: 0076 internal_connection(T connection) 0077 : connection{std::move(connection)} 0078 { 0079 // utils::print_types<T>(); 0080 } 0081 0082 T connection; 0083 }; 0084 0085 struct ref { 0086 using node_category = sink_node_tag; 0087 0088 ref(impl* _this) 0089 : m_this{_this} 0090 { 0091 } 0092 0093 template <typename T> 0094 void operator() (T&& value) const 0095 { 0096 m_this->operator()(voy_fwd(value)); 0097 } 0098 0099 void init() 0100 { 0101 // TODO: Should we react to init? 0102 } 0103 0104 void notify_ended() const 0105 { 0106 m_this->notify_ended(); 0107 } 0108 0109 impl* m_this; 0110 }; 0111 0112 // Creates and instance of the type-erased connection 0113 template <typename Source> 0114 std::unique_ptr<internal_connection_base> create_connection( 0115 size_t index, 0116 Source&& source) 0117 { 0118 return 0119 std::make_unique< 0120 internal_connection< 0121 decltype(dsl::detail::connect_streams( 0122 std::move(source), 0123 ref(this))) 0124 > 0125 > 0126 ( 0127 dsl::detail::connect_streams( 0128 std::move(source), 0129 ref(this)) 0130 ) 0131 ; 0132 } 0133 0134 template <size_t... Idx> 0135 decltype(sources) create_connections(std::tuple<Sources...> sources, 0136 std::index_sequence<Idx...>) 0137 { 0138 return { create_connection(Idx, std::get<Idx>(std::move(sources)))... }; 0139 } //^ 0140 0141 impl(std::tuple<Sources...> sources, Cont continuation) 0142 : sources{create_connections(std::move(sources), std::index_sequence_for<Sources...>())} 0143 , sources_active{sizeof...(Sources)} 0144 , continuation{std::move(continuation)} 0145 { 0146 } 0147 0148 void init() 0149 { 0150 } 0151 0152 void notify_ended() const 0153 { 0154 if (0 == --sources_active) { 0155 continuation.notify_ended(); 0156 } 0157 } 0158 0159 template <typename T> 0160 void operator() (T&& value) 0161 { 0162 voy_fwd_invoke(continuation, value); 0163 } 0164 0165 ~impl() = default; 0166 impl(const impl&) = delete; 0167 impl(impl&&) = delete; 0168 impl& operator=(impl) = delete; 0169 }; 0170 std::unique_ptr<impl> d; 0171 0172 public: 0173 node(std::tuple<Sources...> sources, Cont&& continuation) 0174 : d{std::make_unique<impl>(std::move(sources), std::move(continuation))} 0175 { 0176 } 0177 0178 ~node() = default; 0179 node(node&& other) noexcept = default; 0180 node& operator=(node other) = delete; 0181 0182 void init() 0183 { 0184 d->init(); 0185 } 0186 0187 template <typename T> 0188 void operator() (T&& value) const 0189 { 0190 voy_fwd_invoke(d->m_continuation, value); 0191 } 0192 0193 void notify_ended() const 0194 { 0195 d->notify_ended(); 0196 } 0197 0198 }; 0199 0200 template <typename Cont> 0201 auto with_continuation(Cont&& cont) && 0202 { 0203 return node<Cont>(std::move(m_sources), voy_fwd(cont)); 0204 } 0205 0206 private: 0207 std::tuple<Sources...> m_sources; 0208 }; 0209 0210 } // namespace voy 0211 0212 #endif // include guard 0213