470 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			470 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
| This file is part of Telegram Desktop,
 | |
| the official desktop application for the Telegram messaging service.
 | |
| 
 | |
| For license and copyright information please follow this link:
 | |
| https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
 | |
| */
 | |
| #include "mtproto/dedicated_file_loader.h"
 | |
| 
 | |
| #include "mtproto/facade.h"
 | |
| #include "main/main_account.h" // Account::sessionChanges.
 | |
| #include "main/main_session.h" // Session::account.
 | |
| #include "core/application.h"
 | |
| #include "base/call_delayed.h"
 | |
| 
 | |
| namespace MTP {
 | |
| namespace {
 | |
| 
 | |
| std::optional<MTPInputChannel> ExtractChannel(
 | |
| 		const MTPcontacts_ResolvedPeer &result) {
 | |
| 	const auto &data = result.c_contacts_resolvedPeer();
 | |
| 	if (const auto peer = peerFromMTP(data.vpeer())) {
 | |
| 		for (const auto &chat : data.vchats().v) {
 | |
| 			if (chat.type() == mtpc_channel) {
 | |
| 				const auto &channel = chat.c_channel();
 | |
| 				if (peer == peerFromChannel(channel.vid())) {
 | |
| 					return MTP_inputChannel(
 | |
| 						channel.vid(),
 | |
| 						MTP_long(channel.vaccess_hash().value_or_empty()));
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return std::nullopt;
 | |
| }
 | |
| 
 | |
| std::optional<DedicatedLoader::File> ParseFile(
 | |
| 		const MTPmessages_Messages &result) {
 | |
| 	const auto message = GetMessagesElement(result);
 | |
| 	if (!message || message->type() != mtpc_message) {
 | |
| 		LOG(("Update Error: MTP file message not found."));
 | |
| 		return std::nullopt;
 | |
| 	}
 | |
| 	const auto &data = message->c_message();
 | |
| 	const auto media = data.vmedia();
 | |
| 	if (!media || media->type() != mtpc_messageMediaDocument) {
 | |
| 		LOG(("Update Error: MTP file media not found."));
 | |
| 		return std::nullopt;
 | |
| 	}
 | |
| 	const auto &inner = media->c_messageMediaDocument();
 | |
| 	const auto document = inner.vdocument();
 | |
| 	if (!document || document->type() != mtpc_document) {
 | |
| 		LOG(("Update Error: MTP file not found."));
 | |
| 		return std::nullopt;
 | |
| 	}
 | |
| 	const auto &fields = document->c_document();
 | |
| 	const auto name = [&] {
 | |
| 		for (const auto &attribute : fields.vattributes().v) {
 | |
| 			if (attribute.type() == mtpc_documentAttributeFilename) {
 | |
| 				const auto &data = attribute.c_documentAttributeFilename();
 | |
| 				return qs(data.vfile_name());
 | |
| 			}
 | |
| 		}
 | |
| 		return QString();
 | |
| 	}();
 | |
| 	if (name.isEmpty()) {
 | |
| 		LOG(("Update Error: MTP file name not found."));
 | |
| 		return std::nullopt;
 | |
| 	}
 | |
| 	const auto size = int64(fields.vsize().v);
 | |
| 	if (size <= 0) {
 | |
| 		LOG(("Update Error: MTP file size is invalid."));
 | |
| 		return std::nullopt;
 | |
| 	}
 | |
| 	const auto location = MTP_inputDocumentFileLocation(
 | |
| 		fields.vid(),
 | |
| 		fields.vaccess_hash(),
 | |
| 		fields.vfile_reference(),
 | |
| 		MTP_string());
 | |
| 	return DedicatedLoader::File{ name, size, fields.vdc_id().v, location };
 | |
| }
 | |
| 
 | |
| } // namespace
 | |
| 
 | |
| WeakInstance::WeakInstance(base::weak_ptr<Main::Session> session)
 | |
| : _session(session)
 | |
| , _instance(_session ? &_session->account().mtp() : nullptr) {
 | |
| 	if (!valid()) {
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	connect(_instance, &QObject::destroyed, this, [=] {
 | |
| 		_instance = nullptr;
 | |
| 		_session = nullptr;
 | |
| 		die();
 | |
| 	});
 | |
| 	_session->account().sessionChanges(
 | |
| 	) | rpl::filter([](Main::Session *session) {
 | |
| 		return !session;
 | |
| 	}) | rpl::start_with_next([=] {
 | |
| 		die();
 | |
| 	}, _lifetime);
 | |
| }
 | |
| 
 | |
| base::weak_ptr<Main::Session> WeakInstance::session() const {
 | |
| 	return _session;
 | |
| }
 | |
| 
 | |
| bool WeakInstance::valid() const {
 | |
| 	return (_session != nullptr);
 | |
| }
 | |
| 
 | |
| Instance *WeakInstance::instance() const {
 | |
| 	return _instance;
 | |
| }
 | |
| 
 | |
| void WeakInstance::die() {
 | |
| 	for (const auto &[requestId, fail] : base::take(_requests)) {
 | |
| 		if (_instance) {
 | |
| 			_instance->cancel(requestId);
 | |
| 		}
 | |
| 		fail(Error::Local(
 | |
| 			"UNAVAILABLE",
 | |
| 			"MTP instance is not available."));
 | |
| 	}
 | |
| }
 | |
| 
 | |
| bool WeakInstance::removeRequest(mtpRequestId requestId) {
 | |
| 	if (const auto i = _requests.find(requestId); i != end(_requests)) {
 | |
| 		_requests.erase(i);
 | |
| 		return true;
 | |
| 	}
 | |
| 	return false;
 | |
| }
 | |
| 
 | |
| void WeakInstance::reportUnavailable(
 | |
| 		Fn<void(const Error &error)> callback) {
 | |
| 	InvokeQueued(this, [=] {
 | |
| 		callback(Error::Local(
 | |
| 			"UNAVAILABLE",
 | |
| 			"MTP instance is not available."));
 | |
| 	});
 | |
| }
 | |
| 
 | |
| WeakInstance::~WeakInstance() {
 | |
| 	if (_instance) {
 | |
| 		for (const auto &[requestId, fail] : base::take(_requests)) {
 | |
| 			_instance->cancel(requestId);
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| AbstractDedicatedLoader::AbstractDedicatedLoader(
 | |
| 	const QString &filepath,
 | |
| 	int chunkSize)
 | |
| : _filepath(filepath)
 | |
| , _chunkSize(chunkSize) {
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::start() {
 | |
| 	if (!validateOutput()
 | |
| 		|| (!_output.isOpen() && !_output.open(QIODevice::Append))) {
 | |
| 		QFile(_filepath).remove();
 | |
| 		threadSafeFailed();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	LOG(("Update Info: Starting loading '%1' from %2 offset."
 | |
| 		).arg(_filepath
 | |
| 		).arg(alreadySize()));
 | |
| 	startLoading();
 | |
| }
 | |
| 
 | |
| int64 AbstractDedicatedLoader::alreadySize() const {
 | |
| 	QMutexLocker lock(&_sizesMutex);
 | |
| 	return _alreadySize;
 | |
| }
 | |
| 
 | |
| int64 AbstractDedicatedLoader::totalSize() const {
 | |
| 	QMutexLocker lock(&_sizesMutex);
 | |
| 	return _totalSize;
 | |
| }
 | |
| 
 | |
| rpl::producer<QString> AbstractDedicatedLoader::ready() const {
 | |
| 	return _ready.events();
 | |
| }
 | |
| 
 | |
| auto AbstractDedicatedLoader::progress() const -> rpl::producer<Progress> {
 | |
| 	return _progress.events();
 | |
| }
 | |
| 
 | |
| rpl::producer<> AbstractDedicatedLoader::failed() const {
 | |
| 	return _failed.events();
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::wipeFolder() {
 | |
| 	QFileInfo info(_filepath);
 | |
| 	const auto dir = info.dir();
 | |
| 	const auto all = dir.entryInfoList(QDir::Files);
 | |
| 	for (auto i = all.begin(), e = all.end(); i != e; ++i) {
 | |
| 		if (i->absoluteFilePath() != info.absoluteFilePath()) {
 | |
| 			QFile::remove(i->absoluteFilePath());
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| bool AbstractDedicatedLoader::validateOutput() {
 | |
| 	if (_filepath.isEmpty()) {
 | |
| 		return false;
 | |
| 	}
 | |
| 
 | |
| 	QFileInfo info(_filepath);
 | |
| 	const auto dir = info.dir();
 | |
| 	if (!dir.exists()) {
 | |
| 		dir.mkdir(dir.absolutePath());
 | |
| 	}
 | |
| 	_output.setFileName(_filepath);
 | |
| 
 | |
| 	if (!info.exists()) {
 | |
| 		return true;
 | |
| 	}
 | |
| 	const auto fullSize = info.size();
 | |
| 	if (fullSize < _chunkSize || fullSize > kMaxFileSize) {
 | |
| 		return _output.remove();
 | |
| 	}
 | |
| 	const auto goodSize = int64((fullSize % _chunkSize)
 | |
| 		? (fullSize - (fullSize % _chunkSize))
 | |
| 		: fullSize);
 | |
| 	if (_output.resize(goodSize)) {
 | |
| 		_alreadySize = goodSize;
 | |
| 		return true;
 | |
| 	}
 | |
| 	return false;
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::threadSafeProgress(Progress progress) {
 | |
| 	crl::on_main(this, [=] {
 | |
| 		_progress.fire_copy(progress);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::threadSafeReady() {
 | |
| 	crl::on_main(this, [=] {
 | |
| 		_ready.fire_copy(_filepath);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::threadSafeFailed() {
 | |
| 	crl::on_main(this, [=] {
 | |
| 		_failed.fire({});
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void AbstractDedicatedLoader::writeChunk(bytes::const_span data, int totalSize) {
 | |
| 	const auto size = data.size();
 | |
| 	if (size > 0) {
 | |
| 		const auto written = _output.write(QByteArray::fromRawData(
 | |
| 			reinterpret_cast<const char*>(data.data()),
 | |
| 			size));
 | |
| 		if (written != size) {
 | |
| 			threadSafeFailed();
 | |
| 			return;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	const auto progress = [&] {
 | |
| 		QMutexLocker lock(&_sizesMutex);
 | |
| 		if (!_totalSize) {
 | |
| 			_totalSize = totalSize;
 | |
| 		}
 | |
| 		_alreadySize += size;
 | |
| 		return Progress { _alreadySize, _totalSize };
 | |
| 	}();
 | |
| 
 | |
| 	if (progress.size > 0 && progress.already >= progress.size) {
 | |
| 		_output.close();
 | |
| 		threadSafeReady();
 | |
| 	} else {
 | |
| 		threadSafeProgress(progress);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| rpl::lifetime &AbstractDedicatedLoader::lifetime() {
 | |
| 	return _lifetime;
 | |
| }
 | |
| 
 | |
| DedicatedLoader::DedicatedLoader(
 | |
| 	base::weak_ptr<Main::Session> session,
 | |
| 	const QString &folder,
 | |
| 	const File &file)
 | |
| : AbstractDedicatedLoader(folder + '/' + file.name, kChunkSize)
 | |
| , _size(file.size)
 | |
| , _dcId(file.dcId)
 | |
| , _location(file.location)
 | |
| , _mtp(session) {
 | |
| 	Expects(_size > 0);
 | |
| }
 | |
| 
 | |
| void DedicatedLoader::startLoading() {
 | |
| 	if (!_mtp.valid()) {
 | |
| 		LOG(("Update Error: MTP is unavailable."));
 | |
| 		threadSafeFailed();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	LOG(("Update Info: Loading using MTP from '%1'.").arg(_dcId));
 | |
| 	_offset = alreadySize();
 | |
| 	writeChunk({}, _size);
 | |
| 	sendRequest();
 | |
| }
 | |
| 
 | |
| void DedicatedLoader::sendRequest() {
 | |
| 	if (_requests.size() >= kRequestsCount || _offset >= _size) {
 | |
| 		return;
 | |
| 	}
 | |
| 	const auto offset = _offset;
 | |
| 	_requests.push_back({ offset });
 | |
| 	_mtp.send(
 | |
| 		MTPupload_GetFile(
 | |
| 			MTP_flags(0),
 | |
| 			_location,
 | |
| 			MTP_long(offset),
 | |
| 			MTP_int(kChunkSize)),
 | |
| 		[=](const MTPupload_File &result) { gotPart(offset, result); },
 | |
| 		failHandler(),
 | |
| 		MTP::updaterDcId(_dcId));
 | |
| 	_offset += kChunkSize;
 | |
| 
 | |
| 	if (_requests.size() < kRequestsCount) {
 | |
| 		base::call_delayed(kNextRequestDelay, this, [=] { sendRequest(); });
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void DedicatedLoader::gotPart(int offset, const MTPupload_File &result) {
 | |
| 	Expects(!_requests.empty());
 | |
| 
 | |
| 	if (result.type() == mtpc_upload_fileCdnRedirect) {
 | |
| 		LOG(("Update Error: MTP does not support cdn right now."));
 | |
| 		threadSafeFailed();
 | |
| 		return;
 | |
| 	}
 | |
| 	const auto &data = result.c_upload_file();
 | |
| 	if (data.vbytes().v.isEmpty()) {
 | |
| 		LOG(("Update Error: MTP empty part received."));
 | |
| 		threadSafeFailed();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	const auto i = ranges::find(
 | |
| 		_requests,
 | |
| 		offset,
 | |
| 		[](const Request &request) { return request.offset; });
 | |
| 	Assert(i != end(_requests));
 | |
| 
 | |
| 	i->bytes = data.vbytes().v;
 | |
| 	while (!_requests.empty() && !_requests.front().bytes.isEmpty()) {
 | |
| 		writeChunk(bytes::make_span(_requests.front().bytes), _size);
 | |
| 		_requests.pop_front();
 | |
| 	}
 | |
| 	sendRequest();
 | |
| }
 | |
| 
 | |
| Fn<void(const Error &)> DedicatedLoader::failHandler() {
 | |
| 	return [=](const Error &error) {
 | |
| 		LOG(("Update Error: MTP load failed with '%1'"
 | |
| 			).arg(QString::number(error.code()) + ':' + error.type()));
 | |
| 		threadSafeFailed();
 | |
| 	};
 | |
| }
 | |
| 
 | |
| void ResolveChannel(
 | |
| 		not_null<MTP::WeakInstance*> mtp,
 | |
| 		const QString &username,
 | |
| 		Fn<void(const MTPInputChannel &channel)> done,
 | |
| 		Fn<void()> fail) {
 | |
| 	const auto failed = [&] {
 | |
| 		LOG(("Dedicated MTP Error: Channel '%1' resolve failed."
 | |
| 			).arg(username));
 | |
| 		fail();
 | |
| 	};
 | |
| 	const auto session = mtp->session();
 | |
| 	if (!mtp->valid()) {
 | |
| 		failed();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	struct ResolveResult {
 | |
| 		base::weak_ptr<Main::Session> session;
 | |
| 		MTPInputChannel channel;
 | |
| 	};
 | |
| 	static std::map<QString, ResolveResult> ResolveCache;
 | |
| 
 | |
| 	const auto i = ResolveCache.find(username);
 | |
| 	if (i != end(ResolveCache)) {
 | |
| 		if (i->second.session.get() == session.get()) {
 | |
| 			done(i->second.channel);
 | |
| 			return;
 | |
| 		}
 | |
| 		ResolveCache.erase(i);
 | |
| 	}
 | |
| 
 | |
| 	const auto doneHandler = [=](const MTPcontacts_ResolvedPeer &result) {
 | |
| 		Expects(result.type() == mtpc_contacts_resolvedPeer);
 | |
| 
 | |
| 		if (const auto channel = ExtractChannel(result)) {
 | |
| 			ResolveCache.emplace(
 | |
| 				username,
 | |
| 				ResolveResult { session, *channel });
 | |
| 			done(*channel);
 | |
| 		} else {
 | |
| 			failed();
 | |
| 		}
 | |
| 	};
 | |
| 	const auto failHandler = [=](const Error &error) {
 | |
| 		LOG(("Dedicated MTP Error: Resolve failed with '%1'"
 | |
| 			).arg(QString::number(error.code()) + ':' + error.type()));
 | |
| 		fail();
 | |
| 	};
 | |
| 	mtp->send(
 | |
| 		MTPcontacts_ResolveUsername(MTP_string(username)),
 | |
| 		doneHandler,
 | |
| 		failHandler);
 | |
| }
 | |
| 
 | |
| std::optional<MTPMessage> GetMessagesElement(
 | |
| 		const MTPmessages_Messages &list) {
 | |
| 	return list.match([&](const MTPDmessages_messagesNotModified &) {
 | |
| 		return std::optional<MTPMessage>(std::nullopt);
 | |
| 	}, [&](const auto &data) {
 | |
| 		return data.vmessages().v.isEmpty()
 | |
| 			? std::nullopt
 | |
| 			: std::make_optional(data.vmessages().v[0]);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void StartDedicatedLoader(
 | |
| 		not_null<MTP::WeakInstance*> mtp,
 | |
| 		const DedicatedLoader::Location &location,
 | |
| 		const QString &folder,
 | |
| 		Fn<void(std::unique_ptr<DedicatedLoader>)> ready) {
 | |
| 	const auto doneHandler = [=](const MTPmessages_Messages &result) {
 | |
| 		const auto file = ParseFile(result);
 | |
| 		ready(file
 | |
| 			? std::make_unique<MTP::DedicatedLoader>(
 | |
| 				mtp->session(),
 | |
| 				folder,
 | |
| 				*file)
 | |
| 			: nullptr);
 | |
| 	};
 | |
| 	const auto failHandler = [=](const Error &error) {
 | |
| 		LOG(("Update Error: MTP check failed with '%1'"
 | |
| 			).arg(QString::number(error.code()) + ':' + error.type()));
 | |
| 		ready(nullptr);
 | |
| 	};
 | |
| 
 | |
| 	const auto &[username, postId] = location;
 | |
| 	ResolveChannel(mtp, username, [=, postId = postId](
 | |
| 			const MTPInputChannel &channel) {
 | |
| 		mtp->send(
 | |
| 			MTPchannels_GetMessages(
 | |
| 				channel,
 | |
| 				MTP_vector<MTPInputMessage>(
 | |
| 					1,
 | |
| 					MTP_inputMessageID(MTP_int(postId)))),
 | |
| 			doneHandler,
 | |
| 			failHandler);
 | |
| 	}, [=] { ready(nullptr); });
 | |
| }
 | |
| 
 | |
| } // namespace MTP
 | 
