447 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			447 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
This file is part of Telegram Desktop,
 | 
						|
the official desktop application for the Telegram messaging service.
 | 
						|
 | 
						|
For license and copyright information please follow this link:
 | 
						|
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
 | 
						|
*/
 | 
						|
#include "data/data_search_controller.h"
 | 
						|
 | 
						|
#include "main/main_session.h"
 | 
						|
#include "data/data_session.h"
 | 
						|
#include "data/data_messages.h"
 | 
						|
#include "data/data_channel.h"
 | 
						|
#include "data/data_histories.h"
 | 
						|
#include "history/history.h"
 | 
						|
#include "history/history_item.h"
 | 
						|
#include "apiwrap.h"
 | 
						|
 | 
						|
namespace Api {
 | 
						|
namespace {
 | 
						|
 | 
						|
constexpr auto kSharedMediaLimit = 100;
 | 
						|
constexpr auto kFirstSharedMediaLimit = 0;
 | 
						|
constexpr auto kDefaultSearchTimeoutMs = crl::time(200);
 | 
						|
 | 
						|
} // namespace
 | 
						|
 | 
						|
std::optional<SearchRequest> PrepareSearchRequest(
 | 
						|
		not_null<PeerData*> peer,
 | 
						|
		MsgId topicRootId,
 | 
						|
		Storage::SharedMediaType type,
 | 
						|
		const QString &query,
 | 
						|
		MsgId messageId,
 | 
						|
		Data::LoadDirection direction) {
 | 
						|
	const auto filter = [&] {
 | 
						|
		using Type = Storage::SharedMediaType;
 | 
						|
		switch (type) {
 | 
						|
		case Type::Photo:
 | 
						|
			return MTP_inputMessagesFilterPhotos();
 | 
						|
		case Type::Video:
 | 
						|
			return MTP_inputMessagesFilterVideo();
 | 
						|
		case Type::PhotoVideo:
 | 
						|
			return MTP_inputMessagesFilterPhotoVideo();
 | 
						|
		case Type::MusicFile:
 | 
						|
			return MTP_inputMessagesFilterMusic();
 | 
						|
		case Type::File:
 | 
						|
			return MTP_inputMessagesFilterDocument();
 | 
						|
		case Type::VoiceFile:
 | 
						|
			return MTP_inputMessagesFilterVoice();
 | 
						|
		case Type::RoundVoiceFile:
 | 
						|
			return MTP_inputMessagesFilterRoundVoice();
 | 
						|
		case Type::RoundFile:
 | 
						|
			return MTP_inputMessagesFilterRoundVideo();
 | 
						|
		case Type::GIF:
 | 
						|
			return MTP_inputMessagesFilterGif();
 | 
						|
		case Type::Link:
 | 
						|
			return MTP_inputMessagesFilterUrl();
 | 
						|
		case Type::ChatPhoto:
 | 
						|
			return MTP_inputMessagesFilterChatPhotos();
 | 
						|
		case Type::Pinned:
 | 
						|
			return MTP_inputMessagesFilterPinned();
 | 
						|
		}
 | 
						|
		return MTP_inputMessagesFilterEmpty();
 | 
						|
	}();
 | 
						|
	if (query.isEmpty() && filter.type() == mtpc_inputMessagesFilterEmpty) {
 | 
						|
		return std::nullopt;
 | 
						|
	}
 | 
						|
 | 
						|
	const auto minId = 0;
 | 
						|
	const auto maxId = 0;
 | 
						|
	const auto limit = messageId ? kSharedMediaLimit : kFirstSharedMediaLimit;
 | 
						|
	const auto offsetId = [&] {
 | 
						|
		switch (direction) {
 | 
						|
		case Data::LoadDirection::Before:
 | 
						|
		case Data::LoadDirection::Around: return messageId;
 | 
						|
		case Data::LoadDirection::After: return messageId + 1;
 | 
						|
		}
 | 
						|
		Unexpected("Direction in PrepareSearchRequest");
 | 
						|
	}();
 | 
						|
	const auto addOffset = [&] {
 | 
						|
		switch (direction) {
 | 
						|
		case Data::LoadDirection::Before: return 0;
 | 
						|
		case Data::LoadDirection::Around: return -limit / 2;
 | 
						|
		case Data::LoadDirection::After: return -limit;
 | 
						|
		}
 | 
						|
		Unexpected("Direction in PrepareSearchRequest");
 | 
						|
	}();
 | 
						|
	const auto hash = uint64(0);
 | 
						|
 | 
						|
	const auto mtpOffsetId = int(std::clamp(
 | 
						|
		offsetId.bare,
 | 
						|
		int64(0),
 | 
						|
		int64(0x3FFFFFFF)));
 | 
						|
	using Flag = MTPmessages_Search::Flag;
 | 
						|
	return MTPmessages_Search(
 | 
						|
		MTP_flags(topicRootId ? Flag::f_top_msg_id : Flag(0)),
 | 
						|
		peer->input,
 | 
						|
		MTP_string(query),
 | 
						|
		MTP_inputPeerEmpty(),
 | 
						|
		MTP_int(topicRootId),
 | 
						|
		filter,
 | 
						|
		MTP_int(0), // min_date
 | 
						|
		MTP_int(0), // max_date
 | 
						|
		MTP_int(mtpOffsetId),
 | 
						|
		MTP_int(addOffset),
 | 
						|
		MTP_int(limit),
 | 
						|
		MTP_int(maxId),
 | 
						|
		MTP_int(minId),
 | 
						|
		MTP_long(hash));
 | 
						|
}
 | 
						|
 | 
						|
SearchResult ParseSearchResult(
 | 
						|
		not_null<PeerData*> peer,
 | 
						|
		Storage::SharedMediaType type,
 | 
						|
		MsgId messageId,
 | 
						|
		Data::LoadDirection direction,
 | 
						|
		const SearchRequestResult &data) {
 | 
						|
	auto result = SearchResult();
 | 
						|
	result.noSkipRange = MsgRange{ messageId, messageId };
 | 
						|
 | 
						|
	auto messages = [&] {
 | 
						|
		switch (data.type()) {
 | 
						|
		case mtpc_messages_messages: {
 | 
						|
			auto &d = data.c_messages_messages();
 | 
						|
			peer->owner().processUsers(d.vusers());
 | 
						|
			peer->owner().processChats(d.vchats());
 | 
						|
			result.fullCount = d.vmessages().v.size();
 | 
						|
			return &d.vmessages().v;
 | 
						|
		} break;
 | 
						|
 | 
						|
		case mtpc_messages_messagesSlice: {
 | 
						|
			auto &d = data.c_messages_messagesSlice();
 | 
						|
			peer->owner().processUsers(d.vusers());
 | 
						|
			peer->owner().processChats(d.vchats());
 | 
						|
			result.fullCount = d.vcount().v;
 | 
						|
			return &d.vmessages().v;
 | 
						|
		} break;
 | 
						|
 | 
						|
		case mtpc_messages_channelMessages: {
 | 
						|
			const auto &d = data.c_messages_channelMessages();
 | 
						|
			if (const auto channel = peer->asChannel()) {
 | 
						|
				channel->ptsReceived(d.vpts().v);
 | 
						|
				channel->processTopics(d.vtopics());
 | 
						|
			} else {
 | 
						|
				LOG(("API Error: received messages.channelMessages when "
 | 
						|
					"no channel was passed! (ParseSearchResult)"));
 | 
						|
			}
 | 
						|
			peer->owner().processUsers(d.vusers());
 | 
						|
			peer->owner().processChats(d.vchats());
 | 
						|
			result.fullCount = d.vcount().v;
 | 
						|
			return &d.vmessages().v;
 | 
						|
		} break;
 | 
						|
 | 
						|
		case mtpc_messages_messagesNotModified: {
 | 
						|
			LOG(("API Error: received messages.messagesNotModified! "
 | 
						|
				"(ParseSearchResult)"));
 | 
						|
			return (const QVector<MTPMessage>*)nullptr;
 | 
						|
		} break;
 | 
						|
		}
 | 
						|
		Unexpected("messages.Messages type in ParseSearchResult()");
 | 
						|
	}();
 | 
						|
 | 
						|
	if (!messages) {
 | 
						|
		return result;
 | 
						|
	}
 | 
						|
 | 
						|
	const auto addType = NewMessageType::Existing;
 | 
						|
	result.messageIds.reserve(messages->size());
 | 
						|
	for (const auto &message : *messages) {
 | 
						|
		const auto item = peer->owner().addNewMessage(
 | 
						|
			message,
 | 
						|
			MessageFlags(),
 | 
						|
			addType);
 | 
						|
		if (item) {
 | 
						|
			const auto itemId = item->id;
 | 
						|
			if ((type == Storage::SharedMediaType::kCount)
 | 
						|
				|| item->sharedMediaTypes().test(type)) {
 | 
						|
				result.messageIds.push_back(itemId);
 | 
						|
			}
 | 
						|
			accumulate_min(result.noSkipRange.from, itemId);
 | 
						|
			accumulate_max(result.noSkipRange.till, itemId);
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if (messageId && result.messageIds.empty()) {
 | 
						|
		result.noSkipRange = [&]() -> MsgRange {
 | 
						|
			switch (direction) {
 | 
						|
			case Data::LoadDirection::Before: // All old loaded.
 | 
						|
				return { 0, result.noSkipRange.till };
 | 
						|
			case Data::LoadDirection::Around: // All loaded.
 | 
						|
				return { 0, ServerMaxMsgId };
 | 
						|
			case Data::LoadDirection::After: // All new loaded.
 | 
						|
				return { result.noSkipRange.from, ServerMaxMsgId };
 | 
						|
			}
 | 
						|
			Unexpected("Direction in ParseSearchResult");
 | 
						|
		}();
 | 
						|
	}
 | 
						|
	return result;
 | 
						|
}
 | 
						|
 | 
						|
SearchController::CacheEntry::CacheEntry(
 | 
						|
	not_null<Main::Session*> session,
 | 
						|
	const Query &query)
 | 
						|
: peerData(session->data().peer(query.peerId))
 | 
						|
, migratedData(query.migratedPeerId
 | 
						|
	? base::make_optional(Data(session->data().peer(query.migratedPeerId)))
 | 
						|
	: std::nullopt) {
 | 
						|
}
 | 
						|
 | 
						|
SearchController::SearchController(not_null<Main::Session*> session)
 | 
						|
: _session(session) {
 | 
						|
}
 | 
						|
 | 
						|
bool SearchController::hasInCache(const Query &query) const {
 | 
						|
	return query.query.isEmpty() || _cache.contains(query);
 | 
						|
}
 | 
						|
 | 
						|
void SearchController::setQuery(const Query &query) {
 | 
						|
	if (query.query.isEmpty()) {
 | 
						|
		_cache.clear();
 | 
						|
		_current = _cache.end();
 | 
						|
	} else {
 | 
						|
		_current = _cache.find(query);
 | 
						|
	}
 | 
						|
	if (_current == _cache.end()) {
 | 
						|
		_current = _cache.emplace(
 | 
						|
			query,
 | 
						|
			std::make_unique<CacheEntry>(_session, query)).first;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
rpl::producer<SparseIdsMergedSlice> SearchController::idsSlice(
 | 
						|
		SparseIdsMergedSlice::UniversalMsgId aroundId,
 | 
						|
		int limitBefore,
 | 
						|
		int limitAfter) {
 | 
						|
	Expects(_current != _cache.cend());
 | 
						|
 | 
						|
	auto query = (const Query&)_current->first;
 | 
						|
	auto createSimpleViewer = [=](
 | 
						|
			PeerId peerId,
 | 
						|
			MsgId topicRootId,
 | 
						|
			SparseIdsSlice::Key simpleKey,
 | 
						|
			int limitBefore,
 | 
						|
			int limitAfter) {
 | 
						|
		return simpleIdsSlice(
 | 
						|
			peerId,
 | 
						|
			topicRootId,
 | 
						|
			simpleKey,
 | 
						|
			query,
 | 
						|
			limitBefore,
 | 
						|
			limitAfter);
 | 
						|
	};
 | 
						|
	return SparseIdsMergedSlice::CreateViewer(
 | 
						|
		SparseIdsMergedSlice::Key(
 | 
						|
			query.peerId,
 | 
						|
			query.topicRootId,
 | 
						|
			query.migratedPeerId,
 | 
						|
			aroundId),
 | 
						|
		limitBefore,
 | 
						|
		limitAfter,
 | 
						|
		std::move(createSimpleViewer));
 | 
						|
}
 | 
						|
 | 
						|
rpl::producer<SparseIdsSlice> SearchController::simpleIdsSlice(
 | 
						|
		PeerId peerId,
 | 
						|
		MsgId topicRootId,
 | 
						|
		MsgId aroundId,
 | 
						|
		const Query &query,
 | 
						|
		int limitBefore,
 | 
						|
		int limitAfter) {
 | 
						|
	Expects(peerId != 0);
 | 
						|
	Expects(IsServerMsgId(aroundId) || (aroundId == 0));
 | 
						|
	Expects((aroundId != 0)
 | 
						|
		|| (limitBefore == 0 && limitAfter == 0));
 | 
						|
	Expects((query.peerId == peerId && query.topicRootId == topicRootId)
 | 
						|
		|| (query.migratedPeerId == peerId && MsgId(0) == topicRootId));
 | 
						|
 | 
						|
	auto it = _cache.find(query);
 | 
						|
	if (it == _cache.end()) {
 | 
						|
		return [=](auto) { return rpl::lifetime(); };
 | 
						|
	}
 | 
						|
 | 
						|
	auto listData = (peerId == query.peerId)
 | 
						|
		? &it->second->peerData
 | 
						|
		: &*it->second->migratedData;
 | 
						|
	return [=](auto consumer) {
 | 
						|
		auto lifetime = rpl::lifetime();
 | 
						|
		auto builder = lifetime.make_state<SparseIdsSliceBuilder>(
 | 
						|
			aroundId,
 | 
						|
			limitBefore,
 | 
						|
			limitAfter);
 | 
						|
		builder->insufficientAround(
 | 
						|
		) | rpl::start_with_next([=](
 | 
						|
				const SparseIdsSliceBuilder::AroundData &data) {
 | 
						|
			requestMore(data, query, listData);
 | 
						|
		}, lifetime);
 | 
						|
 | 
						|
		auto pushNextSnapshot = [=] {
 | 
						|
			consumer.put_next(builder->snapshot());
 | 
						|
		};
 | 
						|
 | 
						|
		listData->list.sliceUpdated(
 | 
						|
		) | rpl::filter([=](const SliceUpdate &update) {
 | 
						|
			return builder->applyUpdate(update);
 | 
						|
		}) | rpl::start_with_next(pushNextSnapshot, lifetime);
 | 
						|
 | 
						|
		_session->data().itemRemoved(
 | 
						|
		) | rpl::filter([=](not_null<const HistoryItem*> item) {
 | 
						|
			return (item->history()->peer->id == peerId)
 | 
						|
				&& (!topicRootId || item->topicRootId() == topicRootId);
 | 
						|
		}) | rpl::filter([=](not_null<const HistoryItem*> item) {
 | 
						|
			return builder->removeOne(item->id);
 | 
						|
		}) | rpl::start_with_next(pushNextSnapshot, lifetime);
 | 
						|
 | 
						|
		_session->data().historyCleared(
 | 
						|
		) | rpl::filter([=](not_null<const History*> history) {
 | 
						|
			return (history->peer->id == peerId);
 | 
						|
		}) | rpl::filter([=] {
 | 
						|
			return builder->removeAll();
 | 
						|
		}) | rpl::start_with_next(pushNextSnapshot, lifetime);
 | 
						|
 | 
						|
		using Result = Storage::SparseIdsListResult;
 | 
						|
		listData->list.query(Storage::SparseIdsListQuery(
 | 
						|
			aroundId,
 | 
						|
			limitBefore,
 | 
						|
			limitAfter
 | 
						|
		)) | rpl::filter([=](const Result &result) {
 | 
						|
			return builder->applyInitial(result);
 | 
						|
		}) | rpl::start_with_next_done(
 | 
						|
			pushNextSnapshot,
 | 
						|
			[=] { builder->checkInsufficient(); },
 | 
						|
			lifetime);
 | 
						|
 | 
						|
		return lifetime;
 | 
						|
	};
 | 
						|
}
 | 
						|
 | 
						|
auto SearchController::saveState() -> SavedState {
 | 
						|
	auto result = SavedState();
 | 
						|
	if (_current != _cache.end()) {
 | 
						|
		result.query = _current->first;
 | 
						|
		result.peerList = std::move(_current->second->peerData.list);
 | 
						|
		if (auto &migrated = _current->second->migratedData) {
 | 
						|
			result.migratedList = std::move(migrated->list);
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return result;
 | 
						|
}
 | 
						|
 | 
						|
void SearchController::restoreState(SavedState &&state) {
 | 
						|
	if (!state.query.peerId) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	auto it = _cache.find(state.query);
 | 
						|
	if (it == _cache.end()) {
 | 
						|
		it = _cache.emplace(
 | 
						|
			state.query,
 | 
						|
			std::make_unique<CacheEntry>(_session, state.query)).first;
 | 
						|
	}
 | 
						|
	auto replace = Data(it->second->peerData.peer);
 | 
						|
	replace.list = std::move(state.peerList);
 | 
						|
	it->second->peerData = std::move(replace);
 | 
						|
	if (auto &migrated = state.migratedList) {
 | 
						|
		Assert(it->second->migratedData.has_value());
 | 
						|
		auto replace = Data(it->second->migratedData->peer);
 | 
						|
		replace.list = std::move(*migrated);
 | 
						|
		it->second->migratedData = std::move(replace);
 | 
						|
	}
 | 
						|
	_current = it;
 | 
						|
}
 | 
						|
 | 
						|
void SearchController::requestMore(
 | 
						|
		const SparseIdsSliceBuilder::AroundData &key,
 | 
						|
		const Query &query,
 | 
						|
		Data *listData) {
 | 
						|
	if (listData->requests.contains(key)) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	auto prepared = PrepareSearchRequest(
 | 
						|
		listData->peer,
 | 
						|
		query.topicRootId,
 | 
						|
		query.type,
 | 
						|
		query.query,
 | 
						|
		key.aroundId,
 | 
						|
		key.direction);
 | 
						|
	if (!prepared) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	auto &histories = _session->data().histories();
 | 
						|
	const auto type = ::Data::Histories::RequestType::History;
 | 
						|
	const auto history = _session->data().history(listData->peer);
 | 
						|
	auto requestId = histories.sendRequest(history, type, [=](Fn<void()> finish) {
 | 
						|
		return _session->api().request(
 | 
						|
			std::move(*prepared)
 | 
						|
		).done([=](const SearchRequestResult &result) {
 | 
						|
			listData->requests.remove(key);
 | 
						|
			auto parsed = ParseSearchResult(
 | 
						|
				listData->peer,
 | 
						|
				query.type,
 | 
						|
				key.aroundId,
 | 
						|
				key.direction,
 | 
						|
				result);
 | 
						|
			listData->list.addSlice(
 | 
						|
				std::move(parsed.messageIds),
 | 
						|
				parsed.noSkipRange,
 | 
						|
				parsed.fullCount);
 | 
						|
			finish();
 | 
						|
		}).fail([=] {
 | 
						|
			finish();
 | 
						|
		}).send();
 | 
						|
	});
 | 
						|
	listData->requests.emplace(key, [=] {
 | 
						|
		_session->data().histories().cancelRequest(requestId);
 | 
						|
	});
 | 
						|
}
 | 
						|
 | 
						|
DelayedSearchController::DelayedSearchController(
 | 
						|
	not_null<Main::Session*> session)
 | 
						|
: _controller(session) {
 | 
						|
	_timer.setCallback([this] { setQueryFast(_nextQuery); });
 | 
						|
}
 | 
						|
 | 
						|
void DelayedSearchController::setQuery(const Query &query) {
 | 
						|
	setQuery(query, kDefaultSearchTimeoutMs);
 | 
						|
}
 | 
						|
 | 
						|
void DelayedSearchController::setQuery(
 | 
						|
		const Query &query,
 | 
						|
		crl::time delay) {
 | 
						|
	if (currentQuery() == query) {
 | 
						|
		_timer.cancel();
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	if (_controller.hasInCache(query)) {
 | 
						|
		setQueryFast(query);
 | 
						|
	} else {
 | 
						|
		_nextQuery = query;
 | 
						|
		_timer.callOnce(delay);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void DelayedSearchController::setQueryFast(const Query &query) {
 | 
						|
	_controller.setQuery(query);
 | 
						|
	_currentQueryChanges.fire_copy(query.query);
 | 
						|
}
 | 
						|
 | 
						|
} // namespace Api
 |