Skip to content

Rework proposal for threadshare

Progress:

  • Initial proposal
  • Second proposal (factorize state changes, API enhancements and some design flaws corrections)
  • Check the termination strategy for Queue
  • Extra testing (note: pipeline::test_premature_shutdown deadlocks sometimes)
  • Fix FIXMEs (some more enhancement and better use of some futures)
  • Update remaining elements.
  • Update tokio.
  • Rebase on master
  • Clippy pass
  • Use beta toolchain.
  • Finalize doc

Context

Disclaimer: there are parts of the threadshare model for which I still don't have a completely clear representation of the dynamics, so there might still be holes in this proposal.

While working on the tokio 0.2 migration I came up with the following mental-model for the ts-elements:

  • Data streams are processed in segments (like in a partition, not in the sense of gst::Segment).
  • A segment is shared between 2 ts-elements:
    • one of them (A) is processing a Stream of incoming data, using a push_item or push_buffer function
    • the other one (B) is pushing futures to a queue.
  • The push_{item,buffer} function in (A) takes care of draining the pending future queue feeded by (B).
  • (A) builds a sticky custom Event to communicate the IOContext and the future queue to be used by (B).

Note: elements with a src and a sink such as ts-proxy and ts-queue both act as (A) and (B) for 2 segments of a data stream.

Proposal

I tried to map the threadshare IOContext model to the executor model described in the async book.

  • The async book defines the term "task" for a top level future. In current proposal, I named the futures pushed by the elements acting as (B) Tasks and the pending future list TaskQueues. See below for a discussion about this choice.
  • 2 Elements sharing the same data stream segment use the same IOContext and the same TaskQueueId. A TaskContext struct gather them together. This allows sharing them at once and avoid storing and managing them separately in each element.
  • Since there is a certain separation of concerns between (A) and (B), I propose to separate the methods they use in 2 structs:
    • Elements acting as (A) use a TaskExecutor. The TaskExecutor runs a StreamProcessor on the incoming data stream and handles the errors regarding the stream or the subsequent processing. The TaskExecutor automatically drains the TaskQueue, saving the element from explicitly handling this in the push_{item,buffer} function. The TaskExecutor allows building a custom event referring to its TaskContext.
    • Element acting as (B) use a TaskSpawner. The TaskSpawner is built from a custom event referring to the TaskExecutor's TaskContext. (B) can then spawn a new task which will be processed by the StreamProcessor on the TaskExecutor.
  • The JitterBuffer drains its TaskQueue, so the behaviour is a little different.

Edit: TaskScheduler might be more accurate than TaskSpawner since the tasks are not immediately spawned and in order to stay closer to the terminology used in DataQueue.

Task vs Future

As mentioned previously, the async book uses the term "task" for a top level future. In the above model, what should be considered as a top-level future might be subject to debate:

  1. From a technical standpoint, the top-level futures are those invoked when calling TaskExecutor::run_stream_processor or TaskExecutor::timeout.
  2. From a functional standpoint, it could be argued that TaskExecutor::run_stream_processor runs an executor so that (B) is able to spawn futures, which would be high level-futures, i.e. tasks, under that perspective. TaskExecutor::timeout is a little different since it is not implicitely related to the TaskQueue.

If we decide that 1 is more accurate, I'll rename the TaskQueue, TaskSpawner, etc. as FutureQueue, FutureSpawner, etc.

Edited by François Laignel

Merge request reports

Loading