470 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			470 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
This file is part of Telegram Desktop,
 | 
						|
the official desktop application for the Telegram messaging service.
 | 
						|
 | 
						|
For license and copyright information please follow this link:
 | 
						|
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
 | 
						|
*/
 | 
						|
#include "media/streaming/media_streaming_file.h"
 | 
						|
 | 
						|
#include "media/streaming/media_streaming_loader.h"
 | 
						|
#include "media/streaming/media_streaming_file_delegate.h"
 | 
						|
#include "ffmpeg/ffmpeg_utility.h"
 | 
						|
 | 
						|
namespace Media {
 | 
						|
namespace Streaming {
 | 
						|
namespace {
 | 
						|
 | 
						|
constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
 | 
						|
constexpr auto kMaxQueuedPackets = 1024;
 | 
						|
 | 
						|
[[nodiscard]] bool UnreliableFormatDuration(
 | 
						|
		not_null<AVFormatContext*> format,
 | 
						|
		not_null<AVStream*> stream) {
 | 
						|
	return stream->codec
 | 
						|
		&& (stream->codec->codec_id == AV_CODEC_ID_VP9)
 | 
						|
		&& format->iformat
 | 
						|
		&& format->iformat->name
 | 
						|
		&& QString::fromLatin1(
 | 
						|
			format->iformat->name
 | 
						|
		).split(QChar(',')).contains(u"webm");
 | 
						|
}
 | 
						|
 | 
						|
} // namespace
 | 
						|
 | 
						|
File::Context::Context(
 | 
						|
	not_null<FileDelegate*> delegate,
 | 
						|
	not_null<Reader*> reader)
 | 
						|
: _delegate(delegate)
 | 
						|
, _reader(reader)
 | 
						|
, _size(reader->size()) {
 | 
						|
}
 | 
						|
 | 
						|
File::Context::~Context() = default;
 | 
						|
 | 
						|
int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) {
 | 
						|
	return static_cast<Context*>(opaque)->read(
 | 
						|
		bytes::make_span(buffer, bufferSize));
 | 
						|
}
 | 
						|
 | 
						|
int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) {
 | 
						|
	return static_cast<Context*>(opaque)->seek(offset, whence);
 | 
						|
}
 | 
						|
 | 
						|
int File::Context::read(bytes::span buffer) {
 | 
						|
	Assert(_size >= _offset);
 | 
						|
	const auto amount = std::min(std::size_t(_size - _offset), buffer.size());
 | 
						|
 | 
						|
	if (unroll()) {
 | 
						|
		return -1;
 | 
						|
	} else if (amount > kMaxSingleReadAmount) {
 | 
						|
		LOG(("Streaming Error: Read callback asked for too much data: %1"
 | 
						|
			).arg(amount));
 | 
						|
		return -1;
 | 
						|
	} else if (!amount) {
 | 
						|
		return amount;
 | 
						|
	}
 | 
						|
 | 
						|
	buffer = buffer.subspan(0, amount);
 | 
						|
	while (true) {
 | 
						|
		const auto result = _reader->fill(_offset, buffer, &_semaphore);
 | 
						|
		if (result == Reader::FillState::Success) {
 | 
						|
			break;
 | 
						|
		} else if (result == Reader::FillState::WaitingRemote) {
 | 
						|
			// Perhaps for the correct sleeping in case of enough packets
 | 
						|
			// being read already we require SleepPolicy::Allowed here.
 | 
						|
			// Otherwise if we wait for the remote frequently and
 | 
						|
			// _queuedPackets never get to kMaxQueuedPackets and we don't call
 | 
						|
			// processQueuedPackets(SleepPolicy::Allowed) ever.
 | 
						|
			//
 | 
						|
			// But right now we can't simply pass SleepPolicy::Allowed here,
 | 
						|
			// it freezes because of two _semaphore.acquire one after another.
 | 
						|
			processQueuedPackets(SleepPolicy::Disallowed);
 | 
						|
			_delegate->fileWaitingForData();
 | 
						|
		}
 | 
						|
		_semaphore.acquire();
 | 
						|
		if (_interrupted) {
 | 
						|
			return -1;
 | 
						|
		} else if (const auto error = _reader->streamingError()) {
 | 
						|
			fail(*error);
 | 
						|
			return -1;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	sendFullInCache();
 | 
						|
 | 
						|
	_offset += amount;
 | 
						|
	return amount;
 | 
						|
}
 | 
						|
 | 
						|
int64_t File::Context::seek(int64_t offset, int whence) {
 | 
						|
	const auto checkedSeek = [&](int64_t offset) {
 | 
						|
		if (_failed || offset < 0 || offset > _size) {
 | 
						|
			return -1;
 | 
						|
		}
 | 
						|
		return (_offset = offset);
 | 
						|
	};
 | 
						|
	switch (whence) {
 | 
						|
	case SEEK_SET: return checkedSeek(offset);
 | 
						|
	case SEEK_CUR: return checkedSeek(_offset + offset);
 | 
						|
	case SEEK_END: return checkedSeek(_size + offset);
 | 
						|
	case AVSEEK_SIZE: return _size;
 | 
						|
	}
 | 
						|
	return -1;
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::logError(QLatin1String method) {
 | 
						|
	if (!unroll()) {
 | 
						|
		FFmpeg::LogError(method);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::logError(
 | 
						|
		QLatin1String method,
 | 
						|
		FFmpeg::AvErrorWrap error) {
 | 
						|
	if (!unroll()) {
 | 
						|
		FFmpeg::LogError(method, error);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::logFatal(QLatin1String method) {
 | 
						|
	if (!unroll()) {
 | 
						|
		FFmpeg::LogError(method);
 | 
						|
		fail(_format ? Error::InvalidData : Error::OpenFailed);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::logFatal(
 | 
						|
		QLatin1String method,
 | 
						|
		FFmpeg::AvErrorWrap error) {
 | 
						|
	if (!unroll()) {
 | 
						|
		FFmpeg::LogError(method, error);
 | 
						|
		fail(_format ? Error::InvalidData : Error::OpenFailed);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
Stream File::Context::initStream(
 | 
						|
		not_null<AVFormatContext*> format,
 | 
						|
		AVMediaType type) {
 | 
						|
	auto result = Stream();
 | 
						|
	const auto index = result.index = av_find_best_stream(
 | 
						|
		format,
 | 
						|
		type,
 | 
						|
		-1,
 | 
						|
		-1,
 | 
						|
		nullptr,
 | 
						|
		0);
 | 
						|
	if (index < 0) {
 | 
						|
		return result;
 | 
						|
	}
 | 
						|
 | 
						|
	const auto info = format->streams[index];
 | 
						|
	if (type == AVMEDIA_TYPE_VIDEO) {
 | 
						|
		if (info->disposition & AV_DISPOSITION_ATTACHED_PIC) {
 | 
						|
			// ignore cover streams
 | 
						|
			return Stream();
 | 
						|
		}
 | 
						|
		result.rotation = FFmpeg::ReadRotationFromMetadata(info);
 | 
						|
		result.aspect = FFmpeg::ValidateAspectRatio(info->sample_aspect_ratio);
 | 
						|
	} else if (type == AVMEDIA_TYPE_AUDIO) {
 | 
						|
		result.frequency = info->codecpar->sample_rate;
 | 
						|
		if (!result.frequency) {
 | 
						|
			return result;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	result.codec = FFmpeg::MakeCodecPointer(info);
 | 
						|
	if (!result.codec) {
 | 
						|
		return result;
 | 
						|
	}
 | 
						|
 | 
						|
	result.frame = FFmpeg::MakeFramePointer();
 | 
						|
	if (!result.frame) {
 | 
						|
		result.codec = nullptr;
 | 
						|
		return result;
 | 
						|
	}
 | 
						|
	result.timeBase = info->time_base;
 | 
						|
	result.duration = (info->duration != AV_NOPTS_VALUE)
 | 
						|
		? FFmpeg::PtsToTime(info->duration, result.timeBase)
 | 
						|
		: UnreliableFormatDuration(format, info)
 | 
						|
		? kTimeUnknown
 | 
						|
		: FFmpeg::PtsToTime(format->duration, FFmpeg::kUniversalTimeBase);
 | 
						|
	if (result.duration == kTimeUnknown) {
 | 
						|
		result.duration = kDurationUnavailable;
 | 
						|
	} else if (result.duration <= 0) {
 | 
						|
		result.codec = nullptr;
 | 
						|
	} else {
 | 
						|
		++result.duration;
 | 
						|
		if (result.duration > kDurationMax) {
 | 
						|
			result.duration = 0;
 | 
						|
			result.codec = nullptr;
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return result;
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::seekToPosition(
 | 
						|
		not_null<AVFormatContext*> format,
 | 
						|
		const Stream &stream,
 | 
						|
		crl::time position) {
 | 
						|
	auto error = FFmpeg::AvErrorWrap();
 | 
						|
 | 
						|
	if (!position) {
 | 
						|
		return;
 | 
						|
	} else if (stream.duration == kDurationUnavailable) {
 | 
						|
		// Seek in files with unknown duration is not supported.
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	//
 | 
						|
	// Non backward search reads the whole file if the position is after
 | 
						|
	// the last keyframe inside the index. So we search only backward.
 | 
						|
	//
 | 
						|
	//const auto seekFlags = 0;
 | 
						|
	//error = av_seek_frame(
 | 
						|
	//	format,
 | 
						|
	//	streamIndex,
 | 
						|
	//	TimeToPts(position, kUniversalTimeBase),
 | 
						|
	//	seekFlags);
 | 
						|
	//if (!error) {
 | 
						|
	//	return;
 | 
						|
	//}
 | 
						|
	//
 | 
						|
	error = av_seek_frame(
 | 
						|
		format,
 | 
						|
		stream.index,
 | 
						|
		FFmpeg::TimeToPts(
 | 
						|
			std::clamp(position, crl::time(0), stream.duration - 1),
 | 
						|
			stream.timeBase),
 | 
						|
		AVSEEK_FLAG_BACKWARD);
 | 
						|
	if (!error) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	return logFatal(qstr("av_seek_frame"), error);
 | 
						|
}
 | 
						|
 | 
						|
std::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap> File::Context::readPacket() {
 | 
						|
	auto error = FFmpeg::AvErrorWrap();
 | 
						|
 | 
						|
	auto result = FFmpeg::Packet();
 | 
						|
	error = av_read_frame(_format.get(), &result.fields());
 | 
						|
	if (unroll()) {
 | 
						|
		return FFmpeg::AvErrorWrap();
 | 
						|
	} else if (!error) {
 | 
						|
		return result;
 | 
						|
	} else if (error.code() != AVERROR_EOF) {
 | 
						|
		logFatal(qstr("av_read_frame"), error);
 | 
						|
	}
 | 
						|
	return error;
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::start(crl::time position) {
 | 
						|
	auto error = FFmpeg::AvErrorWrap();
 | 
						|
 | 
						|
	if (unroll()) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
	auto format = FFmpeg::MakeFormatPointer(
 | 
						|
		static_cast<void *>(this),
 | 
						|
		&Context::Read,
 | 
						|
		nullptr,
 | 
						|
		&Context::Seek);
 | 
						|
	if (!format) {
 | 
						|
		return fail(Error::OpenFailed);
 | 
						|
	}
 | 
						|
 | 
						|
	if ((error = avformat_find_stream_info(format.get(), nullptr))) {
 | 
						|
		return logFatal(qstr("avformat_find_stream_info"), error);
 | 
						|
	}
 | 
						|
 | 
						|
	auto video = initStream(format.get(), AVMEDIA_TYPE_VIDEO);
 | 
						|
	if (unroll()) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	auto audio = initStream(format.get(), AVMEDIA_TYPE_AUDIO);
 | 
						|
	if (unroll()) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	_reader->headerDone();
 | 
						|
	if (_reader->isRemoteLoader()) {
 | 
						|
		sendFullInCache(true);
 | 
						|
	}
 | 
						|
	if (video.codec || audio.codec) {
 | 
						|
		seekToPosition(format.get(), video.codec ? video : audio, position);
 | 
						|
	}
 | 
						|
	if (unroll()) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	if (video.codec) {
 | 
						|
		_queuedPackets[video.index].reserve(kMaxQueuedPackets);
 | 
						|
	}
 | 
						|
	if (audio.codec) {
 | 
						|
		_queuedPackets[audio.index].reserve(kMaxQueuedPackets);
 | 
						|
	}
 | 
						|
 | 
						|
	const auto header = _reader->headerSize();
 | 
						|
	if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
 | 
						|
		return fail(Error::OpenFailed);
 | 
						|
	}
 | 
						|
	_format = std::move(format);
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::sendFullInCache(bool force) {
 | 
						|
	const auto started = _fullInCache.has_value();
 | 
						|
	if (force || started) {
 | 
						|
		const auto nowFullInCache = _reader->fullInCache();
 | 
						|
		if (!started || *_fullInCache != nowFullInCache) {
 | 
						|
			_fullInCache = nowFullInCache;
 | 
						|
			_delegate->fileFullInCache(nowFullInCache);
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::readNextPacket() {
 | 
						|
	auto result = readPacket();
 | 
						|
	if (unroll()) {
 | 
						|
		return;
 | 
						|
	} else if (const auto packet = std::get_if<FFmpeg::Packet>(&result)) {
 | 
						|
		const auto index = packet->fields().stream_index;
 | 
						|
		const auto i = _queuedPackets.find(index);
 | 
						|
		if (i == end(_queuedPackets)) {
 | 
						|
			return;
 | 
						|
		}
 | 
						|
		i->second.push_back(std::move(*packet));
 | 
						|
		if (i->second.size() == kMaxQueuedPackets) {
 | 
						|
			processQueuedPackets(SleepPolicy::Allowed);
 | 
						|
		}
 | 
						|
		Assert(i->second.size() < kMaxQueuedPackets);
 | 
						|
	} else {
 | 
						|
		// Still trying to read by drain.
 | 
						|
		Assert(v::is<FFmpeg::AvErrorWrap>(result));
 | 
						|
		Assert(v::get<FFmpeg::AvErrorWrap>(result).code() == AVERROR_EOF);
 | 
						|
		processQueuedPackets(SleepPolicy::Allowed);
 | 
						|
		if (!finished()) {
 | 
						|
			handleEndOfFile();
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::handleEndOfFile() {
 | 
						|
	_delegate->fileProcessEndOfFile();
 | 
						|
	if (_delegate->fileReadMore()) {
 | 
						|
		_readTillEnd = false;
 | 
						|
		auto error = FFmpeg::AvErrorWrap(av_seek_frame(
 | 
						|
			_format.get(),
 | 
						|
			-1, // stream_index
 | 
						|
			0, // timestamp
 | 
						|
			AVSEEK_FLAG_BACKWARD));
 | 
						|
		if (error) {
 | 
						|
			logFatal(qstr("av_seek_frame"));
 | 
						|
		}
 | 
						|
 | 
						|
		// If we loaded a file till the end then we think it is fully cached,
 | 
						|
		// assume we finished loading and don't want to keep all other
 | 
						|
		// download tasks throttled because of an active streaming.
 | 
						|
		_reader->tryRemoveLoaderAsync();
 | 
						|
	} else {
 | 
						|
		_readTillEnd = true;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::processQueuedPackets(SleepPolicy policy) {
 | 
						|
	const auto more = _delegate->fileProcessPackets(_queuedPackets);
 | 
						|
	if (!more && policy == SleepPolicy::Allowed) {
 | 
						|
		do {
 | 
						|
			_reader->startSleep(&_semaphore);
 | 
						|
			_semaphore.acquire();
 | 
						|
			_reader->stopSleep();
 | 
						|
		} while (!unroll() && !_delegate->fileReadMore());
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::interrupt() {
 | 
						|
	_interrupted = true;
 | 
						|
	_semaphore.release();
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::wake() {
 | 
						|
	_semaphore.release();
 | 
						|
}
 | 
						|
 | 
						|
bool File::Context::interrupted() const {
 | 
						|
	return _interrupted;
 | 
						|
}
 | 
						|
 | 
						|
bool File::Context::failed() const {
 | 
						|
	return _failed;
 | 
						|
}
 | 
						|
 | 
						|
bool File::Context::unroll() const {
 | 
						|
	return failed() || interrupted();
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::fail(Error error) {
 | 
						|
	_failed = true;
 | 
						|
	_delegate->fileError(error);
 | 
						|
}
 | 
						|
 | 
						|
bool File::Context::finished() const {
 | 
						|
	return unroll() || _readTillEnd;
 | 
						|
}
 | 
						|
 | 
						|
void File::Context::stopStreamingAsync() {
 | 
						|
	// If we finished loading we don't want to keep all other
 | 
						|
	// download tasks throttled because of an active streaming.
 | 
						|
	_reader->stopStreamingAsync();
 | 
						|
}
 | 
						|
 | 
						|
File::File(std::shared_ptr<Reader> reader)
 | 
						|
: _reader(std::move(reader)) {
 | 
						|
}
 | 
						|
 | 
						|
void File::start(not_null<FileDelegate*> delegate, crl::time position) {
 | 
						|
	stop(true);
 | 
						|
 | 
						|
	_reader->startStreaming();
 | 
						|
	_context.emplace(delegate, _reader.get());
 | 
						|
 | 
						|
	_thread = std::thread([=, context = &*_context] {
 | 
						|
		crl::toggle_fp_exceptions(true);
 | 
						|
		context->start(position);
 | 
						|
		while (!context->finished()) {
 | 
						|
			context->readNextPacket();
 | 
						|
		}
 | 
						|
		if (!context->interrupted()) {
 | 
						|
			context->stopStreamingAsync();
 | 
						|
		}
 | 
						|
	});
 | 
						|
}
 | 
						|
 | 
						|
void File::wake() {
 | 
						|
	Expects(_context.has_value());
 | 
						|
 | 
						|
	_context->wake();
 | 
						|
}
 | 
						|
 | 
						|
void File::stop(bool stillActive) {
 | 
						|
	if (_thread.joinable()) {
 | 
						|
		_context->interrupt();
 | 
						|
		_thread.join();
 | 
						|
	}
 | 
						|
	_reader->stopStreaming(stillActive);
 | 
						|
	_context.reset();
 | 
						|
}
 | 
						|
 | 
						|
bool File::isRemoteLoader() const {
 | 
						|
	return _reader->isRemoteLoader();
 | 
						|
}
 | 
						|
 | 
						|
void File::setLoaderPriority(int priority) {
 | 
						|
	_reader->setLoaderPriority(priority);
 | 
						|
}
 | 
						|
 | 
						|
File::~File() {
 | 
						|
	stop();
 | 
						|
}
 | 
						|
 | 
						|
} // namespace Streaming
 | 
						|
} // namespace Media
 |