361 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			361 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
This file is part of Telegram Desktop,
 | 
						|
the official desktop version of Telegram messaging app, see https://telegram.org
 | 
						|
 | 
						|
Telegram Desktop is free software: you can redistribute it and/or modify
 | 
						|
it under the terms of the GNU General Public License as published by
 | 
						|
the Free Software Foundation, either version 3 of the License, or
 | 
						|
(at your option) any later version.
 | 
						|
 | 
						|
It is distributed in the hope that it will be useful,
 | 
						|
but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
GNU General Public License for more details.
 | 
						|
 | 
						|
In addition, as a special exception, the copyright holders give permission
 | 
						|
to link the code of portions of this program with the OpenSSL library.
 | 
						|
 | 
						|
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
 | 
						|
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
 | 
						|
*/
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include "base/optional.h"
 | 
						|
#include <rpl/map.h>
 | 
						|
#include <rpl/producer.h>
 | 
						|
#include <rpl/details/type_list.h>
 | 
						|
#include <rpl/details/callable.h>
 | 
						|
#include <rpl/mappers.h>
 | 
						|
#include <rpl/complete.h>
 | 
						|
 | 
						|
namespace rpl {
 | 
						|
namespace details {
 | 
						|
 | 
						|
template <typename ...Values>
 | 
						|
struct combine_state {
 | 
						|
	combine_state() : accumulated(std::tuple<base::optional<Values>...>()) {
 | 
						|
	}
 | 
						|
	base::optional<std::tuple<base::optional<Values>...>> accumulated;
 | 
						|
	base::optional<std::tuple<Values...>> latest;
 | 
						|
	int invalid = sizeof...(Values);
 | 
						|
	int working = sizeof...(Values);
 | 
						|
};
 | 
						|
 | 
						|
template <typename ...Values, std::size_t ...I>
 | 
						|
inline std::tuple<Values...> combine_make_first(
 | 
						|
		std::tuple<base::optional<Values>...> &&accumulated,
 | 
						|
		std::index_sequence<I...>) {
 | 
						|
	return std::make_tuple(std::move(*std::get<I>(accumulated))...);
 | 
						|
}
 | 
						|
 | 
						|
template <size_t Index, typename consumer_type, typename ...Values>
 | 
						|
class combine_subscribe_one {
 | 
						|
public:
 | 
						|
	combine_subscribe_one(
 | 
						|
		const consumer_type &consumer,
 | 
						|
		combine_state<Values...> *state)
 | 
						|
	: _consumer(consumer)
 | 
						|
	, _state(state) {
 | 
						|
	}
 | 
						|
 | 
						|
	template <typename Value, typename Error, typename Generator>
 | 
						|
	void subscribe(producer<Value, Error, Generator> &&producer) {
 | 
						|
		_consumer.add_lifetime(std::move(producer).start(
 | 
						|
			[consumer = _consumer, state = _state](Value &&value) {
 | 
						|
			if (!state->accumulated) {
 | 
						|
				std::get<Index>(*state->latest) = std::move(value);
 | 
						|
				consumer.put_next_copy(*state->latest);
 | 
						|
			} else {
 | 
						|
				auto &accumulated = std::get<Index>(
 | 
						|
					*state->accumulated);
 | 
						|
				if (accumulated) {
 | 
						|
					accumulated = std::move(value);
 | 
						|
				} else {
 | 
						|
					accumulated = std::move(value);
 | 
						|
					if (!--state->invalid) {
 | 
						|
						constexpr auto kArity = sizeof...(Values);
 | 
						|
						state->latest = combine_make_first(
 | 
						|
							std::move(*state->accumulated),
 | 
						|
							std::make_index_sequence<kArity>());
 | 
						|
						state->accumulated = base::none;
 | 
						|
						consumer.put_next_copy(*state->latest);
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}, [consumer = _consumer](auto &&error) {
 | 
						|
			consumer.put_error_forward(
 | 
						|
				std::forward<decltype(error)>(error));
 | 
						|
		}, [consumer = _consumer, state = _state] {
 | 
						|
			if (!--state->working) {
 | 
						|
				consumer.put_done();
 | 
						|
			}
 | 
						|
		}));
 | 
						|
	}
 | 
						|
 | 
						|
private:
 | 
						|
	const consumer_type &_consumer;
 | 
						|
	combine_state<Values...> *_state = nullptr;
 | 
						|
 | 
						|
};
 | 
						|
 | 
						|
template <
 | 
						|
	typename consumer_type,
 | 
						|
	typename ...Values,
 | 
						|
	typename ...Errors,
 | 
						|
	typename ...Generators,
 | 
						|
	std::size_t ...I>
 | 
						|
inline void combine_subscribe(
 | 
						|
		const consumer_type &consumer,
 | 
						|
		combine_state<Values...> *state,
 | 
						|
		std::index_sequence<I...>,
 | 
						|
		std::tuple<producer<Values, Errors, Generators>...> &&saved) {
 | 
						|
	auto consume = { (
 | 
						|
		combine_subscribe_one<I, consumer_type, Values...>(
 | 
						|
			consumer,
 | 
						|
			state
 | 
						|
		).subscribe(std::get<I>(std::move(saved))), 0)... };
 | 
						|
	(void)consume;
 | 
						|
}
 | 
						|
 | 
						|
template <typename ...Producers>
 | 
						|
class combine_implementation_helper;
 | 
						|
 | 
						|
template <typename ...Producers>
 | 
						|
combine_implementation_helper<std::decay_t<Producers>...>
 | 
						|
make_combine_implementation_helper(Producers &&...producers) {
 | 
						|
	return combine_implementation_helper<std::decay_t<Producers>...>(
 | 
						|
		std::forward<Producers>(producers)...);
 | 
						|
}
 | 
						|
 | 
						|
template <
 | 
						|
	typename ...Values,
 | 
						|
	typename ...Errors,
 | 
						|
	typename ...Generators>
 | 
						|
class combine_implementation_helper<producer<Values, Errors, Generators>...> {
 | 
						|
public:
 | 
						|
	using CombinedValue = std::tuple<Values...>;
 | 
						|
	using CombinedError = normalized_variant_t<Errors...>;
 | 
						|
 | 
						|
	combine_implementation_helper(
 | 
						|
		producer<Values, Errors, Generators> &&...producers)
 | 
						|
	: _saved(std::make_tuple(std::move(producers)...)) {
 | 
						|
	}
 | 
						|
 | 
						|
	template <typename Handlers>
 | 
						|
	lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
 | 
						|
		auto state = consumer.template make_state<
 | 
						|
			combine_state<Values...>>();
 | 
						|
		constexpr auto kArity = sizeof...(Values);
 | 
						|
		combine_subscribe(
 | 
						|
			consumer,
 | 
						|
			state,
 | 
						|
			std::make_index_sequence<kArity>(),
 | 
						|
			std::move(_saved));
 | 
						|
 | 
						|
		return lifetime();
 | 
						|
	}
 | 
						|
 | 
						|
private:
 | 
						|
	std::tuple<producer<Values, Errors, Generators>...> _saved;
 | 
						|
 | 
						|
};
 | 
						|
 | 
						|
template <
 | 
						|
	typename ...Values,
 | 
						|
	typename ...Errors,
 | 
						|
	typename ...Generators>
 | 
						|
inline auto combine_implementation(
 | 
						|
		producer<Values, Errors, Generators> &&...producers) {
 | 
						|
	using CombinedValue = std::tuple<Values...>;
 | 
						|
	using CombinedError = normalized_variant_t<Errors...>;
 | 
						|
 | 
						|
	return make_producer<CombinedValue, CombinedError>(
 | 
						|
		make_combine_implementation_helper(std::move(producers)...));
 | 
						|
}
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
struct combine_just_producers : std::false_type {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
constexpr bool combine_just_producers_v
 | 
						|
	= combine_just_producers<Args...>::value;
 | 
						|
 | 
						|
template <
 | 
						|
	typename ...Values,
 | 
						|
	typename ...Errors,
 | 
						|
	typename ...Generators>
 | 
						|
struct combine_just_producers<
 | 
						|
		producer<Values, Errors, Generators>...>
 | 
						|
	: std::true_type {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
struct combine_just_producers_list
 | 
						|
	: type_list::extract_to_t<ArgsList, combine_just_producers> {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
struct combine_result_type;
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
using combine_result_type_t
 | 
						|
	= typename combine_result_type<Args...>::type;
 | 
						|
 | 
						|
template <
 | 
						|
	typename ...Values,
 | 
						|
	typename ...Errors,
 | 
						|
	typename ...Generators>
 | 
						|
struct combine_result_type<producer<Values, Errors, Generators>...> {
 | 
						|
	using type = std::tuple<Values...>;
 | 
						|
};
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
struct combine_result_type_list
 | 
						|
	: type_list::extract_to_t<ArgsList, combine_result_type> {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
using combine_result_type_list_t
 | 
						|
	= typename combine_result_type_list<ArgsList>::type;
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
using combine_producers_no_mapper_t
 | 
						|
	= type_list::chop_last_t<ArgsList>;
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
constexpr bool combine_is_good_mapper(std::true_type) {
 | 
						|
	return is_callable_v<
 | 
						|
		type_list::last_t<ArgsList>,
 | 
						|
		combine_result_type_list_t<
 | 
						|
			combine_producers_no_mapper_t<ArgsList>
 | 
						|
		>>;
 | 
						|
}
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
constexpr bool combine_is_good_mapper(std::false_type) {
 | 
						|
	return false;
 | 
						|
}
 | 
						|
 | 
						|
template <typename ArgsList>
 | 
						|
struct combine_producers_with_mapper_list : std::bool_constant<
 | 
						|
	combine_is_good_mapper<ArgsList>(
 | 
						|
		combine_just_producers_list<
 | 
						|
			combine_producers_no_mapper_t<ArgsList>
 | 
						|
		>())> {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
struct combine_producers_with_mapper
 | 
						|
	: combine_producers_with_mapper_list<type_list::list<Args...>> {
 | 
						|
};
 | 
						|
 | 
						|
template <typename ...Args>
 | 
						|
constexpr bool combine_producers_with_mapper_v
 | 
						|
	 = combine_producers_with_mapper<Args...>::value;
 | 
						|
 | 
						|
template <typename ...Producers, std::size_t ...I>
 | 
						|
inline decltype(auto) combine_call(
 | 
						|
		std::index_sequence<I...>,
 | 
						|
		Producers &&...producers) {
 | 
						|
	return combine_implementation(
 | 
						|
		argument_mapper<I>::call(std::move(producers)...)...);
 | 
						|
}
 | 
						|
 | 
						|
} // namespace details
 | 
						|
 | 
						|
template <
 | 
						|
	typename ...Args,
 | 
						|
	typename = std::enable_if_t<
 | 
						|
		details::combine_just_producers_v<Args...>
 | 
						|
	|| details::combine_producers_with_mapper_v<Args...>>>
 | 
						|
inline decltype(auto) combine(Args &&...args) {
 | 
						|
	if constexpr (details::combine_just_producers_v<Args...>) {
 | 
						|
		return details::combine_implementation(std::move(args)...);
 | 
						|
	} else if constexpr (details::combine_producers_with_mapper_v<Args...>) {
 | 
						|
		constexpr auto kProducersCount = sizeof...(Args) - 1;
 | 
						|
		return details::combine_call(
 | 
						|
			std::make_index_sequence<kProducersCount>(),
 | 
						|
			std::forward<Args>(args)...)
 | 
						|
				| map(details::argument_mapper<kProducersCount>::call(
 | 
						|
					std::forward<Args>(args)...));
 | 
						|
	} else {
 | 
						|
		static_assert(false_(args...), "Bad combine() call.");
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
namespace details {
 | 
						|
 | 
						|
template <typename Value>
 | 
						|
struct combine_vector_state {
 | 
						|
	std::vector<base::optional<Value>> accumulated;
 | 
						|
	std::vector<Value> latest;
 | 
						|
	int invalid = 0;
 | 
						|
	int working = 0;
 | 
						|
};
 | 
						|
 | 
						|
} // namespace details
 | 
						|
 | 
						|
template <typename Value, typename Error, typename Generator>
 | 
						|
inline auto combine(
 | 
						|
		std::vector<producer<Value, Error, Generator>> &&producers) {
 | 
						|
	using state_type = details::combine_vector_state<Value>;
 | 
						|
	return make_producer<std::vector<Value>, Error>([
 | 
						|
		producers = std::move(producers)
 | 
						|
	](const auto &consumer) mutable {
 | 
						|
		auto count = producers.size();
 | 
						|
		auto state = consumer.template make_state<state_type>();
 | 
						|
		state->accumulated.resize(count);
 | 
						|
		state->invalid = count;
 | 
						|
		state->working = count;
 | 
						|
		for (auto index = 0; index != count; ++index) {
 | 
						|
			auto &producer = producers[index];
 | 
						|
			consumer.add_lifetime(std::move(producer).start(
 | 
						|
			[consumer, state, index](Value &&value) {
 | 
						|
				if (state->accumulated.empty()) {
 | 
						|
					state->latest[index] = std::move(value);
 | 
						|
					consumer.put_next_copy(state->latest);
 | 
						|
				} else if (state->accumulated[index]) {
 | 
						|
					state->accumulated[index] = std::move(value);
 | 
						|
				} else {
 | 
						|
					state->accumulated[index] = std::move(value);
 | 
						|
					if (!--state->invalid) {
 | 
						|
						state->latest.reserve(
 | 
						|
							state->accumulated.size());
 | 
						|
						for (auto &&value : state->accumulated) {
 | 
						|
							state->latest.push_back(
 | 
						|
								std::move(*value));
 | 
						|
						}
 | 
						|
						details::take(state->accumulated);
 | 
						|
						consumer.put_next_copy(state->latest);
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}, [consumer](auto &&error) {
 | 
						|
				consumer.put_error_forward(
 | 
						|
					std::forward<decltype(error)>(error));
 | 
						|
			}, [consumer, state] {
 | 
						|
				if (!--state->working) {
 | 
						|
					consumer.put_done();
 | 
						|
				}
 | 
						|
			}));
 | 
						|
		}
 | 
						|
		if (!count) {
 | 
						|
			consumer.put_done();
 | 
						|
		}
 | 
						|
		return lifetime();
 | 
						|
	});
 | 
						|
}
 | 
						|
 | 
						|
template <
 | 
						|
	typename Value,
 | 
						|
	typename Error,
 | 
						|
	typename Generator,
 | 
						|
	typename Mapper>
 | 
						|
inline auto combine(
 | 
						|
		std::vector<producer<Value, Error, Generator>> &&producers,
 | 
						|
		Mapper &&mapper) {
 | 
						|
	return combine(std::move(producers))
 | 
						|
		| map(std::forward<Mapper>(mapper));
 | 
						|
}
 | 
						|
 | 
						|
} // namespace rpl
 |