| /* |
| * Copyright (C) 2016, 2017 Metrological Group B.V. |
| * Copyright (C) 2016, 2017 Igalia S.L |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public License |
| * aint with this library; see the file COPYING.LIB. If not, write to |
| * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| #include "config.h" |
| #include "AppendPipeline.h" |
| |
| #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE) |
| |
| #include "AudioTrackPrivateGStreamer.h" |
| #include "GStreamerCommon.h" |
| #include "GStreamerEMEUtilities.h" |
| #include "GStreamerMediaDescription.h" |
| #include "GStreamerRegistryScannerMSE.h" |
| #include "MediaSampleGStreamer.h" |
| #include "InbandTextTrackPrivateGStreamer.h" |
| #include "MediaDescription.h" |
| #include "SourceBufferPrivateGStreamer.h" |
| #include "VideoTrackPrivateGStreamer.h" |
| #include <functional> |
| #include <gst/app/gstappsink.h> |
| #include <gst/app/gstappsrc.h> |
| #include <gst/gst.h> |
| #include <gst/pbutils/pbutils.h> |
| #include <gst/video/video.h> |
| #include <wtf/Condition.h> |
| #include <wtf/glib/GLibUtilities.h> |
| #include <wtf/glib/RunLoopSourcePriority.h> |
| #include <wtf/text/StringConcatenateNumbers.h> |
| |
| GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug); |
| #define GST_CAT_DEFAULT webkit_mse_debug |
| |
| namespace WebCore { |
| |
| GType AppendPipeline::s_endOfAppendMetaType = 0; |
| const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr; |
| std::once_flag AppendPipeline::s_staticInitializationFlag; |
| |
| struct EndOfAppendMeta { |
| GstMeta base; |
| static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; } |
| static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); } |
| static void free(GstMeta*, GstBuffer*) { } |
| }; |
| |
| void AppendPipeline::staticInitialization() |
| { |
| ASSERT(isMainThread()); |
| |
| const char* tags[] = { nullptr }; |
| s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags); |
| s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform); |
| } |
| |
| #if !LOG_DISABLED |
| static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*); |
| #endif |
| |
| #if ENABLE(ENCRYPTED_MEDIA) |
| static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*); |
| #endif |
| |
| static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer); |
| |
| static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*); |
| |
| // Wrapper for gst_element_set_state() that emits a critical if the state change fails or is not synchronous. |
| static void assertedElementSetState(GstElement* element, GstState desiredState) |
| { |
| GstState oldState; |
| gst_element_get_state(element, &oldState, nullptr, 0); |
| |
| GstStateChangeReturn result = gst_element_set_state(element, desiredState); |
| |
| GstState newState; |
| gst_element_get_state(element, &newState, nullptr, 0); |
| |
| if (desiredState != newState || result != GST_STATE_CHANGE_SUCCESS) { |
| GST_ERROR("AppendPipeline state change failed (returned %d): %" GST_PTR_FORMAT " %d -> %d (expected %d)", |
| static_cast<int>(result), element, static_cast<int>(oldState), static_cast<int>(newState), static_cast<int>(desiredState)); |
| ASSERT_NOT_REACHED(); |
| } |
| } |
| |
| AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate) |
| : m_mediaSourceClient(mediaSourceClient.get()) |
| , m_sourceBufferPrivate(sourceBufferPrivate.get()) |
| , m_playerPrivate(&playerPrivate) |
| , m_id(0) |
| , m_wasBusAlreadyNotifiedOfAvailableSamples(false) |
| , m_streamType(Unknown) |
| { |
| ASSERT(isMainThread()); |
| std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization); |
| |
| GST_TRACE("Creating AppendPipeline (%p)", this); |
| |
| // FIXME: give a name to the pipeline, maybe related with the track it's managing. |
| // The track name is still unknown at this time, though. |
| static size_t appendPipelineCount = 0; |
| String pipelineName = makeString("append-pipeline-", |
| m_sourceBufferPrivate->type().containerType().replace("/", "-"), '-', appendPipelineCount++); |
| m_pipeline = gst_pipeline_new(pipelineName.utf8().data()); |
| |
| m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get()))); |
| gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher); |
| gst_bus_enable_sync_message_emission(m_bus.get()); |
| |
| g_signal_connect(m_bus.get(), "sync-message::error", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) { |
| appendPipeline->handleErrorSyncMessage(message); |
| }), this); |
| g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) { |
| appendPipeline->handleNeedContextSyncMessage(message); |
| }), this); |
| g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) { |
| appendPipeline->handleStateChangeMessage(message); |
| }), this); |
| |
| // We assign the created instances here instead of adoptRef() because gst_bin_add_many() |
| // below will already take the initial reference and we need an additional one for us. |
| m_appsrc = gst_element_factory_make("appsrc", nullptr); |
| |
| GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src")); |
| gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) { |
| return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo); |
| }, this, nullptr); |
| |
| const String& type = m_sourceBufferPrivate->type().containerType(); |
| GST_DEBUG("SourceBuffer containerType: %s", type.utf8().data()); |
| if (type.endsWith("mp4") || type.endsWith("aac")) |
| m_demux = gst_element_factory_make("qtdemux", nullptr); |
| else if (type.endsWith("webm")) |
| m_demux = gst_element_factory_make("matroskademux", nullptr); |
| else |
| ASSERT_NOT_REACHED(); |
| |
| m_appsink = gst_element_factory_make("appsink", nullptr); |
| |
| gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE); |
| gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE); |
| gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); // No prerolls, no async state changes. |
| gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE); |
| gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); |
| |
| GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); |
| g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) { |
| if (isMainThread()) { |
| // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification |
| // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps. |
| #ifndef NDEBUG |
| GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink")); |
| GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get())); |
| ASSERT(!caps); |
| #endif |
| return; |
| } |
| |
| // The streaming thread has just received a new caps and is about to let samples using the |
| // new caps flow. Let's block it until the main thread has consumed the samples with the old |
| // caps and has processed the caps change. |
| appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() { |
| appendPipeline->appsinkCapsChanged(); |
| return AbortableTaskQueue::Void(); |
| }); |
| }), this); |
| |
| #if !LOG_DISABLED |
| GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink")); |
| m_demuxerDataEnteringPadProbeInformation.appendPipeline = this; |
| m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering"; |
| m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr); |
| m_appsinkDataEnteringPadProbeInformation.appendPipeline = this; |
| m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering"; |
| m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr); |
| #endif |
| |
| #if ENABLE(ENCRYPTED_MEDIA) |
| m_appsinkPadEventProbeInformation.appendPipeline = this; |
| m_appsinkPadEventProbeInformation.description = "appsink event probe"; |
| m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr); |
| #endif |
| |
| // These signals won't be connected outside of the lifetime of "this". |
| g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) { |
| appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad); |
| }), this); |
| g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) { |
| appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad); |
| }), this); |
| g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) { |
| ASSERT(!isMainThread()); |
| GST_DEBUG("Posting no-more-pads task to main thread"); |
| appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() { |
| appendPipeline->didReceiveInitializationSegment(); |
| }); |
| }), this); |
| g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn { |
| appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink); |
| return GST_FLOW_OK; |
| }), this); |
| g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) { |
| if (appendPipeline->m_errorReceived) |
| return; |
| |
| GST_ERROR("AppendPipeline's appsink received EOS. This is usually caused by an invalid initialization segment."); |
| appendPipeline->handleErrorConditionFromStreamingThread(); |
| }), this); |
| |
| // Add_many will take ownership of a reference. That's why we used an assignment before. |
| gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr); |
| gst_element_link(m_appsrc.get(), m_demux.get()); |
| |
| assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING); |
| } |
| |
| AppendPipeline::~AppendPipeline() |
| { |
| GST_DEBUG_OBJECT(m_pipeline.get(), "Destructing AppendPipeline (%p)", this); |
| ASSERT(isMainThread()); |
| |
| // Forget all pending tasks and unblock the streaming thread if it was blocked. |
| m_taskQueue.startAborting(); |
| |
| // Disconnect all synchronous event handlers and probes susceptible of firing from the main thread |
| // when changing the pipeline state. |
| |
| if (m_pipeline) { |
| ASSERT(m_bus); |
| g_signal_handlers_disconnect_by_data(m_bus.get(), this); |
| gst_bus_disable_sync_message_emission(m_bus.get()); |
| gst_bus_remove_signal_watch(m_bus.get()); |
| } |
| |
| if (m_appsrc) |
| g_signal_handlers_disconnect_by_data(m_appsrc.get(), this); |
| |
| if (m_demux) { |
| #if !LOG_DISABLED |
| GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink")); |
| gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId); |
| #endif |
| |
| g_signal_handlers_disconnect_by_data(m_demux.get(), this); |
| } |
| |
| if (m_appsink) { |
| GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); |
| g_signal_handlers_disconnect_by_data(appsinkPad.get(), this); |
| g_signal_handlers_disconnect_by_data(m_appsink.get(), this); |
| |
| #if !LOG_DISABLED |
| gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId); |
| #endif |
| |
| #if ENABLE(ENCRYPTED_MEDIA) |
| gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId); |
| #endif |
| } |
| |
| // We can tear down the pipeline safely now. |
| if (m_pipeline) |
| gst_element_set_state(m_pipeline.get(), GST_STATE_NULL); |
| } |
| |
| void AppendPipeline::handleErrorConditionFromStreamingThread() |
| { |
| ASSERT(!isMainThread()); |
| // Notify the main thread that the append has a decode error. |
| auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this]() { |
| m_errorReceived = true; |
| // appendParsingFailed() will cause resetParserState() to be called. |
| m_sourceBufferPrivate->appendParsingFailed(); |
| return AbortableTaskQueue::Void(); |
| }); |
| // The streaming thread has now been unblocked because we are aborting in the main thread. |
| ASSERT(!response); |
| } |
| |
| void AppendPipeline::handleErrorSyncMessage(GstMessage* message) |
| { |
| ASSERT(!isMainThread()); |
| GST_WARNING_OBJECT(m_pipeline.get(), "Demuxing error: %" GST_PTR_FORMAT, message); |
| handleErrorConditionFromStreamingThread(); |
| } |
| |
| GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo) |
| { |
| ASSERT(!isMainThread()); |
| m_streamingThread = &WTF::Thread::current(); |
| |
| GstBuffer* buffer = GST_BUFFER(padProbeInfo->data); |
| ASSERT(GST_IS_BUFFER(buffer)); |
| |
| GST_TRACE_OBJECT(m_pipeline.get(), "Buffer entered appsrcEndOfAppendCheckerProbe: %" GST_PTR_FORMAT, buffer); |
| |
| EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType)); |
| if (!endOfAppendMeta) { |
| // Normal buffer, nothing to do. |
| return GST_PAD_PROBE_OK; |
| } |
| |
| GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread"); |
| m_taskQueue.enqueueTask([this]() { |
| handleEndOfAppend(); |
| }); |
| return GST_PAD_PROBE_DROP; |
| } |
| |
| void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message) |
| { |
| const gchar* contextType = nullptr; |
| gst_message_parse_context_type(message, &contextType); |
| GST_TRACE("context type: %s", contextType); |
| |
| // MediaPlayerPrivateGStreamerBase will take care of setting up encryption. |
| m_playerPrivate->handleSyncMessage(message); |
| } |
| |
| void AppendPipeline::handleStateChangeMessage(GstMessage* message) |
| { |
| ASSERT(isMainThread()); |
| |
| if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) { |
| GstState currentState, newState; |
| gst_message_parse_state_changed(message, ¤tState, &newState, nullptr); |
| CString sourceBufferType = String(m_sourceBufferPrivate->type().raw()) |
| .replace("/", "_").replace(" ", "_") |
| .replace("\"", "").replace("\'", "").utf8(); |
| CString dotFileName = makeString("webkit-append-", |
| sourceBufferType.data(), '-', |
| gst_element_state_get_name(currentState), '_', |
| gst_element_state_get_name(newState)).utf8(); |
| GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data()); |
| } |
| } |
| |
| gint AppendPipeline::id() |
| { |
| ASSERT(isMainThread()); |
| |
| if (m_id) |
| return m_id; |
| |
| static gint s_totalAudio = 0; |
| static gint s_totalVideo = 0; |
| static gint s_totalText = 0; |
| |
| switch (m_streamType) { |
| case Audio: |
| m_id = ++s_totalAudio; |
| break; |
| case Video: |
| m_id = ++s_totalVideo; |
| break; |
| case Text: |
| m_id = ++s_totalText; |
| break; |
| case Unknown: |
| case Invalid: |
| GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type"); |
| ASSERT_NOT_REACHED(); |
| break; |
| } |
| |
| GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id); |
| |
| return m_id; |
| } |
| |
| void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps) |
| { |
| ASSERT(isMainThread()); |
| |
| m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps); |
| m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown; |
| |
| const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get()); |
| auto& gstRegistryScanner = GStreamerRegistryScannerMSE::singleton(); |
| if (!gstRegistryScanner.isCodecSupported(originalMediaType)) { |
| m_presentationSize = WebCore::FloatSize(); |
| m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid; |
| } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) { |
| Optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get()); |
| if (size.hasValue()) |
| m_presentationSize = size.value(); |
| else |
| m_presentationSize = WebCore::FloatSize(); |
| |
| m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video; |
| } else { |
| m_presentationSize = WebCore::FloatSize(); |
| if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX)) |
| m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio; |
| else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX)) |
| m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text; |
| } |
| } |
| |
| void AppendPipeline::appsinkCapsChanged() |
| { |
| ASSERT(isMainThread()); |
| |
| // Consume any pending samples with the previous caps. |
| consumeAppsinkAvailableSamples(); |
| |
| GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); |
| GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get())); |
| |
| if (!caps) |
| return; |
| |
| if (doCapsHaveType(caps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) { |
| Optional<FloatSize> size = getVideoResolutionFromCaps(caps.get()); |
| if (size.hasValue()) |
| m_presentationSize = size.value(); |
| } |
| |
| // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track. |
| bool previousCapsWereNull = !m_appsinkCaps; |
| |
| if (m_appsinkCaps != caps) { |
| m_appsinkCaps = WTFMove(caps); |
| m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull); |
| } |
| } |
| |
| void AppendPipeline::handleEndOfAppend() |
| { |
| ASSERT(isMainThread()); |
| consumeAppsinkAvailableSamples(); |
| GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete"); |
| sourceBufferPrivate()->didReceiveAllPendingSamples(); |
| } |
| |
| void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample) |
| { |
| ASSERT(isMainThread()); |
| |
| if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) { |
| GST_WARNING("Received sample without buffer from appsink."); |
| return; |
| } |
| |
| auto mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId()); |
| |
| GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f", |
| mediaSample->trackID().string().utf8().data(), |
| mediaSample->presentationTime().toString().utf8().data(), |
| mediaSample->decodeTime().toString().utf8().data(), |
| mediaSample->duration().toString().utf8().data(), |
| mediaSample->presentationSize().width(), mediaSample->presentationSize().height()); |
| |
| // If we're beyond the duration, ignore this sample. |
| MediaTime duration = m_mediaSourceClient->duration(); |
| if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) { |
| GST_DEBUG_OBJECT(m_pipeline.get(), "Detected sample (%s) beyond the duration (%s), discarding", mediaSample->presentationTime().toString().utf8().data(), duration.toString().utf8().data()); |
| return; |
| } |
| |
| // Add a gap sample if a gap is detected before the first sample. |
| if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) { |
| GST_DEBUG("Adding gap offset"); |
| mediaSample->applyPtsOffset(MediaTime::zeroTime()); |
| } |
| |
| m_sourceBufferPrivate->didReceiveSample(mediaSample.get()); |
| } |
| |
| void AppendPipeline::didReceiveInitializationSegment() |
| { |
| ASSERT(isMainThread()); |
| |
| WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment; |
| |
| GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr); |
| initializationSegment.duration = m_mediaSourceClient->duration(); |
| |
| switch (m_streamType) { |
| case Audio: { |
| WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info; |
| info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get()); |
| info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get()); |
| initializationSegment.audioTracks.append(info); |
| break; |
| } |
| case Video: { |
| WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info; |
| info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get()); |
| info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get()); |
| initializationSegment.videoTracks.append(info); |
| break; |
| } |
| default: |
| GST_ERROR("Unsupported stream type or codec"); |
| break; |
| } |
| |
| m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment); |
| } |
| |
| AtomString AppendPipeline::trackId() |
| { |
| ASSERT(isMainThread()); |
| |
| if (!m_track) |
| return AtomString(); |
| |
| return m_track->id(); |
| } |
| |
| void AppendPipeline::consumeAppsinkAvailableSamples() |
| { |
| ASSERT(isMainThread()); |
| |
| GRefPtr<GstSample> sample; |
| int batchedSampleCount = 0; |
| // In some cases each frame increases the duration of the movie. |
| // Batch duration changes so that if we pick 100 of such samples we don't have to run 100 times |
| // layout for the video controls, but only once. |
| m_playerPrivate->blockDurationChanges(); |
| while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) { |
| appsinkNewSample(WTFMove(sample)); |
| batchedSampleCount++; |
| } |
| m_playerPrivate->unblockDurationChanges(); |
| |
| GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount); |
| } |
| |
| void AppendPipeline::resetParserState() |
| { |
| ASSERT(isMainThread()); |
| GST_DEBUG_OBJECT(m_pipeline.get(), "Handling resetParserState() in AppendPipeline by resetting the pipeline"); |
| |
| // FIXME: Implement a flush event-based resetParserState() implementation would allow the initialization segment to |
| // survive, in accordance with the spec. |
| |
| // This function restores the GStreamer pipeline to the same state it was when the AppendPipeline constructor |
| // finished. All previously enqueued data is lost and the demuxer is reset, losing all pads and track data. |
| |
| // Unlock the streaming thread. |
| m_taskQueue.startAborting(); |
| |
| // Reset the state of all elements in the pipeline. |
| assertedElementSetState(m_pipeline.get(), GST_STATE_READY); |
| |
| // The parser is tear down automatically when the demuxer is reset (see disconnectDemuxerSrcPadFromAppsinkFromAnyThread()). |
| ASSERT(!m_parser); |
| |
| // Set the pipeline to PLAYING so that it can be used again. |
| assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING); |
| |
| // All processing related to the previous append has been aborted and the pipeline is idle. |
| // We can listen again to new requests coming from the streaming thread. |
| m_taskQueue.finishAborting(); |
| |
| #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG))) |
| { |
| static unsigned i = 0; |
| // This is here for debugging purposes. It does not make sense to have it as class member. |
| WTF::String dotFileName = makeString("reset-pipeline-", ++i); |
| gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data()); |
| } |
| #endif |
| } |
| |
| void AppendPipeline::pushNewBuffer(GRefPtr<GstBuffer>&& buffer) |
| { |
| GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer.get()); |
| GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer.leakRef()); |
| // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should |
| // be true at this point. |
| if (pushDataBufferRet != GST_FLOW_OK) { |
| GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push data buffer into appsrc."); |
| ASSERT_NOT_REACHED(); |
| } |
| |
| // Push an additional empty buffer that marks the end of the append. |
| // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful |
| // completion of the append. |
| // |
| // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the |
| // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case |
| // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous |
| // buffer has completed. |
| |
| GstBuffer* endOfAppendBuffer = gst_buffer_new(); |
| gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr); |
| |
| GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer); |
| GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer); |
| if (pushEndOfAppendBufferRet != GST_FLOW_OK) { |
| GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push end-of-append buffer into appsrc."); |
| ASSERT_NOT_REACHED(); |
| } |
| } |
| |
| void AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*) |
| { |
| ASSERT(!isMainThread()); |
| if (&WTF::Thread::current() != m_streamingThread) { |
| // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe(). |
| // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first. |
| // This error will only raise if someone modifies the pipeline to include more than one streaming thread or |
| // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken. |
| // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has |
| // been demuxed completely.; |
| GST_ERROR_OBJECT(m_pipeline.get(), "Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run."); |
| ASSERT_NOT_REACHED(); |
| } |
| |
| if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) { |
| GST_TRACE("Posting appsink-new-sample task to the main thread"); |
| m_taskQueue.enqueueTask([this]() { |
| m_wasBusAlreadyNotifiedOfAvailableSamples.clear(); |
| consumeAppsinkAvailableSamples(); |
| }); |
| } |
| } |
| |
| static GRefPtr<GstElement> |
| createOptionalParserForFormat(GstPad* demuxerSrcPad) |
| { |
| GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad)); |
| GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0); |
| const char* mediaType = gst_structure_get_name(structure); |
| |
| GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad)); |
| GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get())); |
| |
| if (!g_strcmp0(mediaType, "audio/x-opus")) { |
| GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get()); |
| ASSERT(opusparse); |
| g_return_val_if_fail(opusparse, nullptr); |
| return GRefPtr<GstElement>(opusparse); |
| } |
| if (!g_strcmp0(mediaType, "video/x-h264")) { |
| GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get()); |
| ASSERT(h264parse); |
| g_return_val_if_fail(h264parse, nullptr); |
| return GRefPtr<GstElement>(h264parse); |
| } |
| |
| return nullptr; |
| } |
| |
| void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad) |
| { |
| ASSERT(!isMainThread()); |
| |
| GST_DEBUG("connecting to appsink"); |
| |
| if (m_demux->numsrcpads > 1) { |
| GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads); |
| gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr); |
| g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId)); |
| return; |
| } |
| |
| GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); |
| |
| // Only one stream per demuxer is supported. |
| ASSERT(!gst_pad_is_linked(appsinkSinkPad.get())); |
| |
| gint64 timeLength = 0; |
| if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength) |
| && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE) |
| m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC); |
| else |
| m_initialDuration = MediaTime::positiveInfiniteTime(); |
| |
| GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread"); |
| auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() { |
| connectDemuxerSrcPadToAppsink(demuxerSrcPad); |
| return AbortableTaskQueue::Void(); |
| }); |
| if (!response) { |
| // The AppendPipeline has been destroyed or aborted before we received a response. |
| return; |
| } |
| |
| // Must be done in the thread we were called from (usually streaming thread). |
| bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio) |
| || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video) |
| || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text); |
| |
| if (isData) { |
| GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get())); |
| if (!parent) |
| gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get()); |
| |
| // Current head of the pipeline being built. |
| GRefPtr<GstPad> currentSrcPad = demuxerSrcPad; |
| |
| // Some audio files unhelpfully omit the duration of frames in the container. We need to parse |
| // the contained audio streams in order to know the duration of the frames. |
| // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018. |
| m_parser = createOptionalParserForFormat(currentSrcPad.get()); |
| if (m_parser) { |
| gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get()); |
| gst_element_sync_state_with_parent(m_parser.get()); |
| |
| GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink")); |
| GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src")); |
| |
| gst_pad_link(currentSrcPad.get(), parserSinkPad.get()); |
| currentSrcPad = parserSrcPad; |
| } |
| |
| gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get()); |
| |
| gst_element_sync_state_with_parent(m_appsink.get()); |
| |
| GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link"); |
| } |
| } |
| |
| void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad) |
| { |
| ASSERT(isMainThread()); |
| GST_DEBUG("Connecting to appsink"); |
| |
| const String& type = m_sourceBufferPrivate->type().containerType(); |
| if (type.endsWith("webm")) |
| gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr); |
| |
| GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); |
| |
| // Only one stream per demuxer is supported. |
| ASSERT(!gst_pad_is_linked(sinkSinkPad.get())); |
| |
| GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad))); |
| |
| #ifndef GST_DISABLE_GST_DEBUG |
| { |
| GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get())); |
| GST_DEBUG("%s", strcaps.get()); |
| } |
| #endif |
| |
| if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()) |
| m_mediaSourceClient->durationChanged(m_initialDuration); |
| |
| parseDemuxerSrcPadCaps(gst_caps_ref(caps.get())); |
| |
| switch (m_streamType) { |
| case WebCore::MediaSourceStreamTypeGStreamer::Audio: |
| m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get()); |
| break; |
| case WebCore::MediaSourceStreamTypeGStreamer::Video: |
| m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get()); |
| break; |
| case WebCore::MediaSourceStreamTypeGStreamer::Text: |
| m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get()); |
| break; |
| case WebCore::MediaSourceStreamTypeGStreamer::Invalid: |
| GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get()); |
| // 3.5.7 Initialization Segment Received |
| // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the |
| // append error algorithm and abort these steps. |
| |
| // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the |
| // AppendPipeline will be destroyed. |
| m_sourceBufferPrivate->appendParsingFailed(); |
| return; |
| default: |
| GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get()); |
| break; |
| } |
| |
| m_appsinkCaps = WTFMove(caps); |
| m_playerPrivate->trackDetected(this, m_track, true); |
| } |
| |
| void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*) |
| { |
| // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with |
| // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when |
| // a state change is made to bring the demuxer down. (State change operations run in the main thread.) |
| GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before"); |
| |
| GST_DEBUG("Disconnecting appsink"); |
| |
| if (m_parser) { |
| assertedElementSetState(m_parser.get(), GST_STATE_NULL); |
| gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get()); |
| m_parser = nullptr; |
| } |
| |
| GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after"); |
| } |
| |
| #if !LOG_DISABLED |
| static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation) |
| { |
| ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER); |
| GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info); |
| GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer)); |
| return GST_PAD_PROBE_OK; |
| } |
| #endif |
| |
| #if ENABLE(ENCRYPTED_MEDIA) |
| static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation) |
| { |
| ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM); |
| GstEvent* event = gst_pad_probe_info_get_event(info); |
| GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event)); |
| WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline; |
| |
| switch (GST_EVENT_TYPE(event)) { |
| case GST_EVENT_PROTECTION: |
| if (appendPipeline && appendPipeline->playerPrivate()) |
| appendPipeline->playerPrivate()->handleProtectionEvent(event); |
| return GST_PAD_PROBE_DROP; |
| default: |
| break; |
| } |
| |
| return GST_PAD_PROBE_OK; |
| } |
| #endif |
| |
| static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer) |
| { |
| ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER); |
| GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info); |
| GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer)); |
| return GST_PAD_PROBE_DROP; |
| } |
| |
| static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*) |
| { |
| // matroskademux sets GstSegment.start to the PTS of the first frame. |
| // |
| // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped |
| // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened |
| // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case. |
| // |
| // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE. |
| // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have |
| // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from |
| // audiobasefilter, such as opusparse. |
| // |
| // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first |
| // frame is zero. |
| |
| ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM); |
| GstEvent* event = static_cast<GstEvent*>(info->data); |
| if (event->type == GST_EVENT_SEGMENT) { |
| GstSegment segment; |
| gst_event_copy_segment(event, &segment); |
| |
| segment.start = 0; |
| |
| GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment)); |
| gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get()); |
| } |
| return GST_PAD_PROBE_OK; |
| } |
| |
| } // namespace WebCore. |
| |
| #endif // USE(GSTREAMER) |