Draft: fallbacksrc: Add multi-stream support with the stream API
Closes #383
Marking as draft for now because of a few missing features and an upstream blocker.
This MR aims to bring support for handling more than 2 (1 video + 1 audio) streams to fallbacksrc
.
fallbacksrc
now fully supports the stream selection API and lets the app/user select which outputs are needed, by posting its own StreamCollection and respecting choices made via the select-streams
message.
Internally fallbacksrc
will still always provide the 2 default streams, and if used like the old version, it will still only expose pads according to enable_video/enable_audio
properties.
However, as soon as the source posts its StreamCollection, fallbacksrc
will create its own streams to match and will let the user/application know it can select more outputs in that case. Once selected, an output is guaranteed to stay present - if the source disappears, it'll switch to the fallback one.
There are a few things missing/blocking for now, WIP:
-
A decodebin3 race condition which is blocking us from selecting source streams correctly - Requires gstreamer!7594 (merged)
- Probably also good to include gstreamer!7598 (merged)
-
A mapping system to allow user to specify which fallback streams should be picked once a given source stream disappears -
CustomSource
needs modifications to work in this new scenario -
Seqnums usage needs to be more consistent (should match between various events at the beginning, after a seek etc.) -
Streams need group IDs correctly set
Co-authored-by: @slomo
Example application of some sort
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use gst::glib;
use gst::prelude::*;
fn main() {
gst::init().unwrap();
let pipeline = gst::Pipeline::default();
let src = gst::ElementFactory::make("fallbacksrc")
.property("uri", "http://127.0.0.1:8000/output-single.mp4")
.property("fallback-uri", "http://127.0.0.1:8000/fallback-single.mp4")
.build()
.unwrap();
pipeline.add(&src).unwrap();
let streams = Arc::new(Mutex::new(HashMap::new()));
src.connect_pad_added({
let streams = streams.clone();
move |src, pad| {
let pipeline = src.parent().and_downcast::<gst::Pipeline>().unwrap();
let stream = pad.stream().unwrap();
println!("Got pad for stream {:#?}", stream.debug());
let stream_id = stream.stream_id().unwrap();
if stream.stream_type().contains(gst::StreamType::AUDIO) {
let audioconvert = gst::ElementFactory::make("audioconvert")
.name(format!("audioconvert-{stream_id}"))
.build()
.unwrap();
let autoaudiosink = gst::ElementFactory::make("autoaudiosink")
.name(format!("autoaudiosink-{stream_id}"))
.build()
.unwrap();
pipeline.add(&audioconvert).unwrap();
pipeline.add(&autoaudiosink).unwrap();
audioconvert.link(&autoaudiosink).unwrap();
autoaudiosink.sync_state_with_parent().unwrap();
audioconvert.sync_state_with_parent().unwrap();
pad.link(&audioconvert.static_pad("sink").unwrap()).unwrap();
} else {
let videoconvert = gst::ElementFactory::make("videoconvert")
.name(format!("videoconvert-{stream_id}"))
.build()
.unwrap();
let autovideosink = gst::ElementFactory::make("autovideosink")
.name(format!("autovideosink-{stream_id}"))
.build()
.unwrap();
pipeline.add(&videoconvert).unwrap();
pipeline.add(&autovideosink).unwrap();
videoconvert.link(&autovideosink).unwrap();
autovideosink.sync_state_with_parent().unwrap();
videoconvert.sync_state_with_parent().unwrap();
pad.link(&videoconvert.static_pad("sink").unwrap()).unwrap();
}
streams.lock().unwrap().insert(pad.clone(), stream);
}
});
src.connect_pad_removed({
let streams = streams.clone();
move |src, pad| {
let pipeline = src.parent().and_downcast::<gst::Pipeline>().unwrap();
let Some(stream) = streams.lock().unwrap().get(pad).cloned() else {
return;
};
println!("Lost pad for stream {:#?}", stream.debug());
let stream_id = stream.stream_id().unwrap();
let videoconvert = pipeline
.by_name(&format!("videoconvert-{stream_id}"))
.unwrap();
let autovideosink = pipeline
.by_name(&format!("autovideosink-{stream_id}"))
.unwrap();
pipeline.remove(&autovideosink).unwrap();
pipeline.remove(&videoconvert).unwrap();
autovideosink.set_state(gst::State::Null).unwrap();
videoconvert.set_state(gst::State::Null).unwrap();
}
});
let (bus_sender, bus_receiver) = async_channel::unbounded();
let bus = pipeline.bus().unwrap();
bus.set_sync_handler({
let src = src.clone();
move |_bus, msg| {
use gst::MessageView;
match msg.view() {
MessageView::StreamCollection(msg) => {
let collection = msg.stream_collection();
println!("Received stream-collection: {:#?}", collection.debug());
let mut selected_streams = Vec::new();
for stream in &collection {
let caps = stream.caps().unwrap();
let stream_id = stream.stream_id().unwrap();
let s = caps.structure(0).unwrap();
if s.name() == "video/x-raw" {
println!("Selecting stream {stream_id}");
selected_streams.push(stream_id);
}
}
src.send_event(
gst::event::SelectStreams::builder(
&selected_streams
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
)
.build(),
);
}
MessageView::StreamsSelected(msg) => {
let streams = msg.streams();
for stream in &streams {
let stream_id = stream.stream_id().unwrap();
println!("Selected {stream_id}");
}
}
_ => (),
}
bus_sender.send_blocking(msg.clone()).unwrap();
gst::BusSyncReply::Drop
}
});
pipeline.set_state(gst::State::Playing).unwrap();
glib::MainContext::default().block_on(async {
while let Ok(msg) = bus_receiver.recv().await {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(msg) => {
println!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
break;
}
_ => (),
};
}
});
pipeline.set_state(gst::State::Null).unwrap();
}