200 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			200 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
| This file is part of Telegram Desktop,
 | |
| the official desktop application for the Telegram messaging service.
 | |
| 
 | |
| For license and copyright information please follow this link:
 | |
| https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
 | |
| */
 | |
| #include "mtproto/mtproto_concurrent_sender.h"
 | |
| 
 | |
| #include "mtproto/mtp_instance.h"
 | |
| #include "mtproto/mtproto_response.h"
 | |
| #include "mtproto/facade.h"
 | |
| 
 | |
| namespace MTP {
 | |
| 
 | |
| class ConcurrentSender::HandlerMaker final {
 | |
| public:
 | |
| 	static DoneHandler MakeDone(
 | |
| 		not_null<ConcurrentSender*> sender,
 | |
| 		Fn<void(FnMut<void()>)> runner);
 | |
| 	static FailHandler MakeFail(
 | |
| 		not_null<ConcurrentSender*> sender,
 | |
| 		Fn<void(FnMut<void()>)> runner,
 | |
| 		FailSkipPolicy skipPolicy);
 | |
| };
 | |
| 
 | |
| DoneHandler ConcurrentSender::HandlerMaker::MakeDone(
 | |
| 		not_null<ConcurrentSender*> sender,
 | |
| 		Fn<void(FnMut<void()>)> runner) {
 | |
| 	return [
 | |
| 		weak = base::make_weak(sender),
 | |
| 		runner = std::move(runner)
 | |
| 	](const Response &response) mutable {
 | |
| 		runner([=]() mutable {
 | |
| 			if (const auto strong = weak.get()) {
 | |
| 				strong->senderRequestDone(
 | |
| 					response.requestId,
 | |
| 					bytes::make_span(response.reply));
 | |
| 			}
 | |
| 		});
 | |
| 		return true;
 | |
| 	};
 | |
| }
 | |
| 
 | |
| FailHandler ConcurrentSender::HandlerMaker::MakeFail(
 | |
| 		not_null<ConcurrentSender*> sender,
 | |
| 		Fn<void(FnMut<void()>)> runner,
 | |
| 		FailSkipPolicy skipPolicy) {
 | |
| 	return [
 | |
| 		weak = base::make_weak(sender),
 | |
| 		runner = std::move(runner),
 | |
| 		skipPolicy
 | |
| 	](const Error &error, const Response &response) mutable {
 | |
| 		if (skipPolicy == FailSkipPolicy::Simple) {
 | |
| 			if (IsDefaultHandledError(error)) {
 | |
| 				return false;
 | |
| 			}
 | |
| 		} else if (skipPolicy == FailSkipPolicy::HandleFlood) {
 | |
| 			if (IsDefaultHandledError(error) && !IsFloodError(error)) {
 | |
| 				return false;
 | |
| 			}
 | |
| 		}
 | |
| 		runner([=, requestId = response.requestId]() mutable {
 | |
| 			if (const auto strong = weak.get()) {
 | |
| 				strong->senderRequestFail(requestId, error);
 | |
| 			}
 | |
| 		});
 | |
| 		return true;
 | |
| 	};
 | |
| }
 | |
| 
 | |
| template <typename Method>
 | |
| auto ConcurrentSender::with_instance(Method &&method)
 | |
| -> std::enable_if_t<is_callable_v<Method, not_null<Instance*>>> {
 | |
| 	crl::on_main([
 | |
| 		weak = _weak,
 | |
| 		method = std::forward<Method>(method)
 | |
| 	]() mutable {
 | |
| 		if (const auto instance = weak.data()) {
 | |
| 			std::move(method)(instance);
 | |
| 		}
 | |
| 	});
 | |
| }
 | |
| 
 | |
| ConcurrentSender::RequestBuilder::RequestBuilder(
 | |
| 	not_null<ConcurrentSender*> sender,
 | |
| 	details::SerializedRequest &&serialized) noexcept
 | |
| : _sender(sender)
 | |
| , _serialized(std::move(serialized)) {
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::RequestBuilder::setToDC(ShiftedDcId dcId) noexcept {
 | |
| 	_dcId = dcId;
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::RequestBuilder::setCanWait(crl::time ms) noexcept {
 | |
| 	_canWait = ms;
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::RequestBuilder::setFailSkipPolicy(
 | |
| 		FailSkipPolicy policy) noexcept {
 | |
| 	_failSkipPolicy = policy;
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::RequestBuilder::setAfter(
 | |
| 		mtpRequestId requestId) noexcept {
 | |
| 	_afterRequestId = requestId;
 | |
| }
 | |
| 
 | |
| mtpRequestId ConcurrentSender::RequestBuilder::send() {
 | |
| 	const auto requestId = details::GetNextRequestId();
 | |
| 	const auto dcId = _dcId;
 | |
| 	const auto msCanWait = _canWait;
 | |
| 	const auto afterRequestId = _afterRequestId;
 | |
| 
 | |
| 	_sender->senderRequestRegister(requestId, std::move(_handlers));
 | |
| 	_sender->with_instance([
 | |
| 		=,
 | |
| 		request = std::move(_serialized),
 | |
| 		done = HandlerMaker::MakeDone(_sender, _sender->_runner),
 | |
| 		fail = HandlerMaker::MakeFail(
 | |
| 			_sender,
 | |
| 			_sender->_runner,
 | |
| 			_failSkipPolicy)
 | |
| 	](not_null<Instance*> instance) mutable {
 | |
| 		instance->sendSerialized(
 | |
| 			requestId,
 | |
| 			std::move(request),
 | |
| 			ResponseHandler{ std::move(done), std::move(fail) },
 | |
| 			dcId,
 | |
| 			msCanWait,
 | |
| 			afterRequestId);
 | |
| 	});
 | |
| 
 | |
| 	return requestId;
 | |
| }
 | |
| 
 | |
| ConcurrentSender::ConcurrentSender(
 | |
| 	QPointer<Instance> weak,
 | |
| 	Fn<void(FnMut<void()>)> runner)
 | |
| : _weak(weak)
 | |
| , _runner(runner) {
 | |
| }
 | |
| 
 | |
| ConcurrentSender::~ConcurrentSender() {
 | |
| 	senderRequestCancelAll();
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestRegister(
 | |
| 		mtpRequestId requestId,
 | |
| 		Handlers &&handlers) {
 | |
| 	_requests.emplace(requestId, std::move(handlers));
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestDone(
 | |
| 		mtpRequestId requestId,
 | |
| 		bytes::const_span result) {
 | |
| 	if (auto handlers = _requests.take(requestId)) {
 | |
| 		if (!handlers->done(requestId, result)) {
 | |
| 			handlers->fail(
 | |
| 				requestId,
 | |
| 				Error::Local(
 | |
| 					"RESPONSE_PARSE_FAILED",
 | |
| 					"ConcurrentSender::senderRequestDone"));
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestFail(
 | |
| 		mtpRequestId requestId,
 | |
| 		const Error &error) {
 | |
| 	if (auto handlers = _requests.take(requestId)) {
 | |
| 		handlers->fail(requestId, error);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestCancel(mtpRequestId requestId) {
 | |
| 	senderRequestDetach(requestId);
 | |
| 	with_instance([=](not_null<Instance*> instance) {
 | |
| 		instance->cancel(requestId);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestCancelAll() {
 | |
| 	auto list = std::vector<mtpRequestId>(_requests.size());
 | |
| 	for (const auto &pair : base::take(_requests)) {
 | |
| 		list.push_back(pair.first);
 | |
| 	}
 | |
| 	with_instance([list = std::move(list)](not_null<Instance*> instance) {
 | |
| 		for (const auto requestId : list) {
 | |
| 			instance->cancel(requestId);
 | |
| 		}
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void ConcurrentSender::senderRequestDetach(mtpRequestId requestId) {
 | |
| 	_requests.erase(requestId);
 | |
| }
 | |
| 
 | |
| } // namespace MTP
 | 
