diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/AudioOnlyStreamerStateTest.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/AudioOnlyStreamerStateTest.kt index 66b593cd8..86577e10b 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/AudioOnlyStreamerStateTest.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/AudioOnlyStreamerStateTest.kt @@ -55,14 +55,14 @@ class AudioOnlyStreamerStateTest(private val descriptor: MediaDescriptor) { // Single method calls @Test - fun configureAudioOnlyTest() { + fun configureAudioOnlyTest() = runTest { streamer.setAudioConfig( ConfigurationUtils.dummyValidAudioConfig() ) } @Test - fun configureVideoOnlyTest() { + fun configureVideoOnlyTest() = runTest { try { streamer.setVideoConfig( ConfigurationUtils.dummyValidVideoConfig() @@ -73,7 +73,7 @@ class AudioOnlyStreamerStateTest(private val descriptor: MediaDescriptor) { } @Test - fun configureTest() { + fun configureTest() = runTest { try { streamer.setConfig( ConfigurationUtils.dummyValidAudioConfig(), @@ -85,7 +85,7 @@ class AudioOnlyStreamerStateTest(private val descriptor: MediaDescriptor) { } @Test - fun configureErrorTest() { + fun configureErrorTest() = runTest { try { streamer.setAudioConfig( ConfigurationUtils.dummyInvalidAudioConfig() @@ -105,7 +105,7 @@ class AudioOnlyStreamerStateTest(private val descriptor: MediaDescriptor) { } @Test - fun configureReleaseTest() { + fun configureReleaseTest() = runTest { streamer.setAudioConfig( ConfigurationUtils.dummyValidAudioConfig() ) @@ -151,7 +151,7 @@ class AudioOnlyStreamerStateTest(private val descriptor: MediaDescriptor) { } @Test - fun multipleConfigureTest() { + fun multipleConfigureTest() = runTest { (0..10).forEach { _ -> streamer.setAudioConfig( ConfigurationUtils.dummyValidAudioConfig() diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/StreamerStateTest.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/StreamerStateTest.kt index cb6cd24ab..c7e8d9024 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/StreamerStateTest.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/state/StreamerStateTest.kt @@ -48,21 +48,21 @@ abstract class StreamerStateTest( // Single method calls @Test - open fun configureAudioOnlyTest() { + open fun configureAudioOnlyTest() = runTest { streamer.setAudioConfig( ConfigurationUtils.dummyValidAudioConfig() ) } @Test - open fun configureVideoOnlyTest() { + open fun configureVideoOnlyTest() = runTest { streamer.setVideoConfig( ConfigurationUtils.dummyValidVideoConfig() ) } @Test - open fun configureTest() { + open fun configureTest() = runTest { streamer.setConfig( ConfigurationUtils.dummyValidAudioConfig(), ConfigurationUtils.dummyValidVideoConfig() @@ -70,7 +70,7 @@ abstract class StreamerStateTest( } @Test - open fun configureErrorTest() { + open fun configureErrorTest() = runTest { try { streamer.setConfig( ConfigurationUtils.dummyInvalidAudioConfig(), @@ -117,7 +117,7 @@ abstract class StreamerStateTest( } @Test - open fun configureReleaseTest() { + open fun configureReleaseTest() = runTest { streamer.setConfig( ConfigurationUtils.dummyValidAudioConfig(), ConfigurationUtils.dummyValidVideoConfig() @@ -156,7 +156,7 @@ abstract class StreamerStateTest( // Stress test @Test - open fun multipleConfigureTest() { + open fun multipleConfigureTest() = runTest { (0..10).forEach { _ -> streamer.setConfig( ConfigurationUtils.dummyValidAudioConfig(), diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/SurfaceProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/SurfaceProcessor.kt index dc3331f7d..c35e027e1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/SurfaceProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/SurfaceProcessor.kt @@ -178,8 +178,10 @@ class SurfaceProcessor( surfaceOutputs.forEach { try { - it.updateTransformMatrix(surfaceOutputMatrix, textureMatrix) - renderer.render(surfaceTexture.timestamp, surfaceOutputMatrix, it.surface) + if (it.isStreaming()) { + it.updateTransformMatrix(surfaceOutputMatrix, textureMatrix) + renderer.render(surfaceTexture.timestamp, surfaceOutputMatrix, it.surface) + } } catch (e: Exception) { Logger.e(TAG, "Error while rendering frame", e) } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/AbstractSurfaceOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/AbstractSurfaceOutput.kt index 930348a4f..a0d457cb3 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/AbstractSurfaceOutput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/AbstractSurfaceOutput.kt @@ -24,7 +24,8 @@ import io.github.thibaultbee.streampack.core.elements.processing.video.utils.GLU open class AbstractSurfaceOutput( override val surface: Surface, - final override val resolution: Size + final override val resolution: Size, + override val isStreaming: () -> Boolean ) : ISurfaceOutput { protected val lock = Any() protected var isClosed = false @@ -51,6 +52,7 @@ interface ISurfaceOutput { val surface: Surface val cropRect: Rect val resolution: Size + val isStreaming: () -> Boolean fun updateTransformMatrix(output: FloatArray, input: FloatArray) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt index f2ad4fa0c..1394f9efb 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt @@ -31,9 +31,10 @@ import io.github.thibaultbee.streampack.core.elements.utils.extensions.rotate class SurfaceOutput( surface: Surface, resolution: Size, + isStreaming: () -> Boolean, private val transformationInfo: TransformationInfo ) : - AbstractSurfaceOutput(surface, resolution) { + AbstractSurfaceOutput(surface, resolution, isStreaming) { private val infoProvider: ISourceInfoProvider get() = transformationInfo.infoProvider diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/StateFlowExtensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/StateFlowExtensions.kt new file mode 100644 index 000000000..8320a7eea --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/StateFlowExtensions.kt @@ -0,0 +1,14 @@ +package io.github.thibaultbee.streampack.core.elements.utils.extensions + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.runningFold + +data class History(val previous: T?, val current: T) + +fun StateFlow.runningHistory(): Flow> = + runningFold( + initial = null as (History?), + operation = { accumulator, new -> History(accumulator?.current, new) } + ).filterNotNull() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt new file mode 100644 index 000000000..f638dd37d --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt @@ -0,0 +1,555 @@ +/* + * Copyright (C) 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.pipelines + +import android.content.Context +import android.util.Size +import android.view.Surface +import io.github.thibaultbee.streampack.core.elements.data.Frame +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal +import io.github.thibaultbee.streampack.core.elements.processing.video.ISurfaceProcessorInternal +import io.github.thibaultbee.streampack.core.elements.processing.video.SurfaceProcessor +import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.AbstractSurfaceOutput +import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.SurfaceOutput +import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider +import io.github.thibaultbee.streampack.core.elements.sources.audio.AudioSourceConfig +import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource +import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal +import io.github.thibaultbee.streampack.core.elements.sources.video.ISurfaceSource +import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoFrameSource +import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource +import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal +import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig +import io.github.thibaultbee.streampack.core.elements.sources.video.camera.CameraSource +import io.github.thibaultbee.streampack.core.elements.utils.RotationValue +import io.github.thibaultbee.streampack.core.elements.utils.extensions.displayRotation +import io.github.thibaultbee.streampack.core.elements.utils.extensions.runningHistory +import io.github.thibaultbee.streampack.core.elements.utils.extensions.sourceConfig +import io.github.thibaultbee.streampack.core.logger.Logger +import io.github.thibaultbee.streampack.core.pipelines.outputs.IAudioPipelineOutputInternal +import io.github.thibaultbee.streampack.core.pipelines.outputs.IPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.IVideoPipelineOutputInternal +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IAudioEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutputInternal +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IVideoEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.utils.SourceConfigUtils +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import java.nio.ByteBuffer +import java.util.concurrent.Executors + +/** + * Base class of all streamers. + * + * @param context the application context + * @param videoSourceInternal the video source implementation + * @param audioSourceInternal the audio source implementation + */ +open class StreamerPipeline( + protected val context: Context, + protected val audioSourceInternal: IAudioSourceInternal?, + protected val videoSourceInternal: IVideoSourceInternal?, +) { + private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + private var surfaceProcessor: ISurfaceProcessorInternal? = null + + private val _throwable = MutableStateFlow(null) + val throwable: StateFlow = _throwable + + private val sourceInfoProvider = videoSourceInternal?.infoProvider + + // SOURCES + /** + * The audio source. + * It allows advanced audio settings. + */ + val audioSource: IAudioSource? + get() = audioSourceInternal + + /** + * The video source. + * It allows advanced video settings. + */ + val videoSource: IVideoSource? + get() = videoSourceInternal + + private val _isStreaming = MutableStateFlow(false) + val isStreaming: StateFlow = _isStreaming + + /** + * Whether the streamer has audio. + */ + val hasAudio = audioSourceInternal != null + + /** + * Whether the streamer has video. + */ + val hasVideo = videoSourceInternal != null + + // OUTPUTS + private val outputs = mutableMapOf() + + /** + * Sets the target rotation of all outputs.s + */ + var targetRotation: Int = context.displayRotation + set(@RotationValue value) { + outputs.keys.forEach { + (it as? IVideoPipelineOutputInternal)?.targetRotation = value + } + field = value + } + + + private var _audioSourceConfig: AudioSourceConfig? = null + private var audioSourceConfig: AudioSourceConfig + get() = requireNotNull(_audioSourceConfig) { "Audio source config is not set" } + set(value) { + require(hasAudio) { "Do not need to set audio as it is a video only streamer" } + require(!isStreaming.value) { "Can't change audio source configuration while streaming" } + + if (_audioSourceConfig == value) { + Logger.i(TAG, "Audio source configuration is the same, skipping configuration") + return + } + _audioSourceConfig = value + applyAudioSourceConfig(value) + } + + private fun applyAudioSourceConfig(audioConfig: AudioSourceConfig) { + try { + audioSourceInternal?.configure(audioConfig) + } catch (t: Throwable) { + throw t + } + } + + private var _videoSourceConfig: VideoSourceConfig? = null + private var videoSourceConfig: VideoSourceConfig + get() = requireNotNull(_videoSourceConfig) { "Video source config is not set" } + set(value) { + require(hasVideo) { "Do not need to set video as it is a audio only streamer" } + require(!isStreaming.value) { "Can't change video source configuration while streaming" } + + if (_videoSourceConfig == value) { + Logger.i(TAG, "Video source configuration is the same, skipping configuration") + return + } + + val previousVideoConfig = _videoSourceConfig + _videoSourceConfig = value + applyVideoSourceConfig(previousVideoConfig, value) + } + + private fun applyVideoSourceConfig( + previousVideoConfig: VideoSourceConfig?, videoConfig: VideoSourceConfig + ) { + try { + videoSourceInternal?.configure(videoConfig) + if (previousVideoConfig?.resolution != videoConfig.resolution) { + if (videoSourceInternal is ISurfaceSource) { + surfaceProcessor = buildOrUpdateSurfaceProcessor(videoConfig) + } + } + } catch (t: Throwable) { + throw t + } + } + + private fun buildSurfaceOutput( + videoOutput: IVideoPipelineOutputInternal + ): AbstractSurfaceOutput { + val surfaceWithSize = requireNotNull(videoOutput.surface.value) { + "Output $videoOutput has no surface" + } + + return buildSurfaceOutput( + surfaceWithSize.surface, + surfaceWithSize.resolution, + videoOutput.isStreaming::value, + sourceInfoProvider!! + ) + } + + /** + * Creates a surface output for the given surface. + * + * Use it for additional processing. + * + * @param surface the encoder surface + * @param resolution the resolution of the surface + * @param infoProvider the source info provider for internal processing + */ + private fun buildSurfaceOutput( + surface: Surface, + resolution: Size, + isStreaming: () -> Boolean, + infoProvider: ISourceInfoProvider + ): AbstractSurfaceOutput { + return SurfaceOutput( + surface, resolution, isStreaming, SurfaceOutput.TransformationInfo( + targetRotation, isMirroringRequired(), infoProvider + ) + ) + } + + /** + * Whether the output surface needs to be mirrored. + */ + protected open fun isMirroringRequired(): Boolean { + if (videoSourceInternal is CameraSource) { + return videoSourceInternal.infoProvider.isFrontFacing + } + return false + } + + /** + * Updates the transformation of the surface output. + * To be called when the source info provider or [isMirroringRequired] is updated. + */ + fun resetSurfaceProcessorOutputSurface() { + outputs.keys.filterIsInstance() + .filter { it.surface.value != null }.forEach { + resetSurfaceProcessorOutputSurface(it) + } + } + + /** + * Updates the transformation of the surface output. + */ + private fun resetSurfaceProcessorOutputSurface( + videoOutput: IVideoPipelineOutputInternal + ) { + Logger.i(TAG, "Updating transformation") + videoOutput.surface.value?.let { + surfaceProcessor?.removeOutputSurface(it.surface) + } + + surfaceProcessor?.addOutputSurface(buildSurfaceOutput(videoOutput)) + } + + private fun releaseSurfaceProcessor() { + val videoSource = videoSourceInternal + if (videoSource is ISurfaceSource) { + videoSource.outputSurface?.let { + surfaceProcessor?.removeInputSurface(it) + } + } + surfaceProcessor?.removeAllOutputSurfaces() + surfaceProcessor?.release() + } + + private fun buildOrUpdateSurfaceProcessor( + videoSourceConfig: VideoSourceConfig + ): ISurfaceProcessorInternal { + val videoSource = videoSourceInternal + if (videoSource !is ISurfaceSource) { + throw IllegalStateException("Video source must have an output surface") + } + + val surfaceProcessor = surfaceProcessor as SurfaceProcessor? + if (surfaceProcessor?.dynamicRangeProfile == videoSourceConfig.dynamicRangeProfile) { + val outputSurface = requireNotNull(videoSource.outputSurface) { + "Video source must have an output surface" + } + surfaceProcessor.updateInputSurface( + outputSurface, videoSource.infoProvider.getSurfaceSize( + videoSourceConfig.resolution, targetRotation + ) + ) + return surfaceProcessor + } + + + val newSurfaceProcessor = when { + surfaceProcessor == null -> SurfaceProcessor(videoSourceConfig.dynamicRangeProfile) + surfaceProcessor.dynamicRangeProfile != videoSourceConfig.dynamicRangeProfile -> { + releaseSurfaceProcessor() + SurfaceProcessor(videoSourceConfig.dynamicRangeProfile) + } + + else -> throw IllegalStateException("Surface processor unexpected state") + } + + // Adds surface processor input + videoSource.outputSurface = newSurfaceProcessor.createInputSurface( + videoSource.infoProvider.getSurfaceSize( + videoSourceConfig.resolution, targetRotation + ) + ) + + // Adds surface processor output + outputs.keys.filterIsInstance() + .filter { it.surface.value != null }.forEach { + newSurfaceProcessor.addOutputSurface(buildSurfaceOutput(it)) + } + + return newSurfaceProcessor + } + + /** + * Adds an output. + */ + fun addOutput(output: IPipelineOutput) { + require(output is IVideoPipelineOutputInternal || output is IAudioPipelineOutputInternal) { + "Output must be an audio or video output" + } + if (outputs.contains(output)) { + Logger.w(TAG, "Output $output already added") + return + } + + val scope = CoroutineScope(dispatcher) + scope.launch { + output.isStreaming.collect { isStreaming -> + if (isStreaming) { + if (!this@StreamerPipeline.isStreaming.value) { + startStream() + } + } else { + // Call [stopStream] if all outputs are stopped + if (outputs.keys.all { !it.isStreaming.value }) { + stopStreamInternal() + _isStreaming.emit(false) + } + } + } + } + if (output is IVideoPipelineOutputInternal) { + addVideoOutputIfNeeded(output, scope) + } + if (output is IAudioPipelineOutputInternal) { + addAudioOutputIfNeeded(output) + } + if (output is IAudioEncodingPipelineOutput) { + scope.launch { + output.audioCodecConfigFlow.filterNotNull().collect { audioCodecConfig -> + if (hasAudio != output.hasAudio) { + Logger.w( + TAG, + "Output $output has audio: ${output.hasAudio} but streamer has audio: $hasAudio" + ) + } + //TODO: Check if audio codec config is compatible with audio source config of all outputs + audioSourceConfig = audioCodecConfig.sourceConfig + } + } + } + if (output is IVideoEncodingPipelineOutput) { + scope.launch { + output.videoCodecConfigFlow.filterNotNull().collect { _ -> + if (hasVideo != output.hasVideo) { + Logger.w( + TAG, + "Output $output has video: ${output.hasVideo} but streamer has video: $hasVideo" + ) + } + videoSourceConfig = buildVideoSourceConfig() + } + } + } + + outputs[output] = scope + } + + private fun buildVideoSourceConfig(): VideoSourceConfig { + val videoCodecConfigs = + outputs.keys.filterIsInstance().mapNotNull { + (it as? IEncodingPipelineOutputInternal)?.videoCodecConfigFlow?.value + } + return SourceConfigUtils.buildVideoSourceConfig(videoCodecConfigs) + } + + private fun addVideoOutputIfNeeded( + output: IVideoPipelineOutputInternal, scope: CoroutineScope + ) { + if (hasVideo) { + if (videoSourceInternal is ISurfaceSource) { + output.videoSourceTimestampOffset = videoSourceInternal.timestampOffset + scope.launch { + output.surface.runningHistory().collect { (previousSurface, newSurface) -> + Logger.i(TAG, "Surface changed") + if (previousSurface?.surface == newSurface?.surface) { + return@collect + } + + previousSurface?.let { + Logger.i(TAG, "Removing previous surface") + surfaceProcessor?.removeOutputSurface(it.surface) + } + newSurface?.let { + Logger.i(TAG, "Adding new surface") + surfaceProcessor?.addOutputSurface( + buildSurfaceOutput( + it.surface, + it.resolution, + output.isStreaming::value, + sourceInfoProvider!! + ) + ) + } + } + } + } else if (videoSourceInternal is IVideoFrameSource) { + require(outputs.keys.filterIsInstance().size == 1) { + "Only one output is allowed for frame source" + } + output.videoSourceTimestampOffset = 0L + output.videoFrameRequestedListener = object : + IEncoderInternal.IByteBufferInput.OnFrameRequestedListener { + override fun onFrameRequested(buffer: ByteBuffer): Frame { + return videoSourceInternal.getVideoFrame(buffer) + } + } + } else { + output.videoFrameRequestedListener = null + Logger.w(TAG, "Output $output has video but streamer has no video") + } + } + } + + private fun addAudioOutputIfNeeded(output: IAudioPipelineOutputInternal) { + if (hasAudio) { + output.audioFrameRequestedListener = object : + IEncoderInternal.IByteBufferInput.OnFrameRequestedListener { + override fun onFrameRequested(buffer: ByteBuffer): Frame { + return audioSourceInternal!!.getAudioFrame(buffer) + } + } + } else { + output.audioFrameRequestedListener = null + Logger.w(TAG, "Output $output has audio but streamer has no audio") + } + } + + /** + * Removes an output. + * + * It will stop the stream. + */ + private suspend fun removeOutputImpl(output: IPipelineOutput) { + output.stopStream() + + // Clean streamer output + if (output is IVideoPipelineOutputInternal) { + output.surface.value?.let { + surfaceProcessor?.removeOutputSurface(it.surface) + } + output.videoFrameRequestedListener = null + } + if (output is IAudioPipelineOutputInternal) { + output.audioFrameRequestedListener = null + } + + outputs[output]?.cancel() + } + + /** + * Removes an output. + * + * It will stop the stream. + */ + suspend fun removeOutput(output: IPipelineOutput) { + if (!outputs.contains(output)) { + Logger.w(TAG, "Output $output not found") + return + } + + removeOutputImpl(output) + + outputs.remove(output) + } + + /** + * Starts audio/video source. + * + * @see [stopStream] + */ + private suspend fun startStream() { + Logger.e(TAG, "startStream") + // Sources + audioSourceInternal?.startStream() + videoSourceInternal?.startStream() + + _isStreaming.emit(true) + } + + /** + * Stops all streams. + * + * Internally, it resets audio and video recorders and encoders to get them ready for another + * [startStream]. It explains why preview could be restarted. + */ + suspend fun stopStream() { + // Sources + stopStreamInternal() + + // Outputs + outputs.keys.forEach { + try { + it.stopStream() + } catch (t: Throwable) { + Logger.w(TAG, "stopStream: Can't stop output $it: ${t.message}") + } + } + + _isStreaming.emit(false) + } + + private suspend fun stopStreamInternal() { + // Sources + audioSourceInternal?.stopStream() + videoSourceInternal?.stopStream() + } + + + /** + * Releases all resources including audio and video sources, processors and outputs. + */ + fun release() { + // Sources + audioSourceInternal?.release() + releaseSurfaceProcessor() + val videoSource = videoSourceInternal + if (videoSource is ISurfaceSource) { + videoSource.outputSurface = null + } + videoSourceInternal?.release() + + // Outputs + outputs.keys.forEach { + try { + runBlocking { + removeOutputImpl(it) + } + it.release() + } catch (t: Throwable) { + Logger.w(TAG, "release: Can't release output $it: ${t.message}") + } + } + } + + companion object { + const val TAG = "StreamerPipeline" + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/IPipelineOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/IPipelineOutput.kt new file mode 100644 index 000000000..472fa2882 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/IPipelineOutput.kt @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.pipelines.outputs + +import android.util.Size +import android.view.Surface +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal.IByteBufferInput.OnFrameRequestedListener +import io.github.thibaultbee.streampack.core.elements.utils.RotationValue +import io.github.thibaultbee.streampack.core.streamers.single.startStream +import kotlinx.coroutines.flow.StateFlow + + +/** + * An output component for a streamer. + */ +interface IPipelineOutput { + /** + * Whether the output has audio. + */ + val hasAudio: Boolean + + /** + * Whether the output has video. + */ + val hasVideo: Boolean + + /** + * Returns the last throwable that occurred. + */ + val throwable: StateFlow + + /** + * Returns true if stream is running. + */ + val isStreaming: StateFlow + + /** + * Starts audio/video stream. + * + * @see [stopStream] + */ + suspend fun startStream() + + /** + * Stops audio/video stream. + * + * @see [startStream] + */ + suspend fun stopStream() + + /** + * Clean and reset the output. + */ + fun release() +} + +/** + * A [Surface] with its resolution. + * + * @param surface The [Surface]. + * @param resolution The resolution of the [Surface]. + */ +data class SurfaceWithSize( + val surface: Surface, + val resolution: Size +) + +/** + * An internal video output component for a pipeline. + */ +interface IVideoPipelineOutputInternal : IPipelineOutput { + /** + * The rotation in one the [Surface] rotations from the device natural orientation. + */ + @RotationValue + var targetRotation: Int + + /** + * The [Surface] flow to render video. + * For surface mode video encoder. + */ + val surface: StateFlow + + /** + * The video source timestamp offset. + * Used to synchronize video and audio when video comes from the [surface].s + */ + var videoSourceTimestampOffset: Long + + /** + * The video [Frame] listener. + * For buffer mode video encoder. + */ + var videoFrameRequestedListener: OnFrameRequestedListener? +} + +/** + * An internal audio output component for a pipeline. + */ +interface IAudioPipelineOutputInternal : IPipelineOutput { + /** + * The audio [Frame] listener. + */ + var audioFrameRequestedListener: OnFrameRequestedListener? +} + diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutput.kt new file mode 100644 index 000000000..cad9aaee0 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutput.kt @@ -0,0 +1,615 @@ +/* + * Copyright (C) 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.pipelines.outputs.encoding + +import android.content.Context +import android.view.Surface +import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor +import io.github.thibaultbee.streampack.core.elements.data.Frame +import io.github.thibaultbee.streampack.core.elements.encoders.AudioCodecConfig +import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal.IByteBufferInput.OnFrameRequestedListener +import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig +import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.AudioEncoderConfig +import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder +import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.VideoEncoderConfig +import io.github.thibaultbee.streampack.core.elements.encoders.rotateFromNaturalOrientation +import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal +import io.github.thibaultbee.streampack.core.elements.utils.RotationValue +import io.github.thibaultbee.streampack.core.elements.utils.extensions.displayRotation +import io.github.thibaultbee.streampack.core.logger.Logger +import io.github.thibaultbee.streampack.core.pipelines.StreamerPipeline.Companion.TAG +import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceWithSize +import io.github.thibaultbee.streampack.core.regulator.controllers.IBitrateRegulatorController +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext + +/** + * An implementation of [IEncodingPipelineOutputInternal] that manages encoding and endpoint. + * + * @param context The application context + * @param endpointInternal The endpoint implementation + * @param defaultRotation The default rotation in [Surface] rotation ([Surface.ROTATION_0], ...). By default, it is the current device orientation. + * @param coroutineDispatcher The coroutine dispatcher to use. By default, it is [Dispatchers.Default] + */ +class EncodingPipelineOutput( + private val context: Context, + private val endpointInternal: IEndpointInternal, + @RotationValue defaultRotation: Int = context.displayRotation, + private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default +) : IEncodingPipelineOutputInternal { + /** + * Mutex to avoid concurrent open/close operations. + */ + private val openCloseMutex = Mutex() + + /** + * Mutex to avoid concurrent audio configuration operations. + */ + private val audioConfigurationMutex = Mutex() + + /** + * Mutex to avoid concurrent video configuration operations. + */ + private val videoConfigurationMutex = Mutex() + + private var bitrateRegulatorController: IBitrateRegulatorController? = null + + private var audioStreamId: Int? = null + private var videoStreamId: Int? = null + + /** + * Whether the output has audio. + */ + override val hasAudio: Boolean + get() = audioEncoderInternal != null + + /** + * Whether the output has video. + */ + override val hasVideo: Boolean + get() = videoEncoderInternal != null + + // INPUTS + private val _surface = MutableStateFlow(null) + override val surface: StateFlow = _surface + + /** + * The video source. + * We need to know the timestamp offset to synchronize audio and video. + * This is set by the [StreamerPipeline]. + */ + override var videoSourceTimestampOffset = 0L + + // ENCODERS + private var audioEncoderInternal: IEncoderInternal? = null + override val audioEncoder: IEncoder? + get() = audioEncoderInternal + + private var videoEncoderInternal: IEncoderInternal? = null + override val videoEncoder: IEncoder? + get() = videoEncoderInternal + + // ENDPOINT + override val endpoint: IEndpoint + get() = endpointInternal + + /** + * Keep the target rotation if it can't be applied immediately. + * It will be applied when the stream is stopped. + */ + @RotationValue + private var pendingTargetRotation: Int? = null + + /** + * The target rotation in [Surface] rotation ([Surface.ROTATION_0], ...) + */ + @RotationValue + private var _targetRotation = defaultRotation + + override var targetRotation: Int + @RotationValue get() = _targetRotation + set(@RotationValue value) { + if (isStreaming.value) { + Logger.w(TAG, "Can't change rotation to $value while streaming") + pendingTargetRotation = value + return + } + + setTargetRotationInternal(value) + } + + private val _throwable = MutableStateFlow(null) + override val throwable: StateFlow = _throwable + + override val isOpen: StateFlow + get() = endpointInternal.isOpen + + private val _isStreaming = MutableStateFlow(false) + override val isStreaming: StateFlow = _isStreaming + + override var audioFrameRequestedListener: OnFrameRequestedListener? = null + override var videoFrameRequestedListener: OnFrameRequestedListener? = null + + /** + * Manages error on stream. + * Stops only stream. + * + * @param t triggered [Throwable] + */ + private fun onInternalError(t: Throwable) { + try { + runBlocking { + if (isStreaming.value) { + stopStream() + } + } + } catch (t: Throwable) { + Logger.e(TAG, "onStreamError: Can't stop stream", t) + } finally { + Logger.e(TAG, "onStreamError: ${t.message}", t) + _throwable.tryEmit(t) + } + } + + private val audioEncoderListener = object : IEncoderInternal.IListener { + override fun onError(t: Throwable) { + onInternalError(t) + } + + override fun onOutputFrame(frame: Frame) { + audioStreamId?.let { + runBlocking { + this@EncodingPipelineOutput.endpointInternal.write(frame, it) + } + } + } + } + + private val videoEncoderListener = object : IEncoderInternal.IListener { + override fun onError(t: Throwable) { + onInternalError(t) + } + + override fun onOutputFrame(frame: Frame) { + videoStreamId?.let { + frame.pts += videoSourceTimestampOffset + frame.dts = if (frame.dts != null) { + frame.dts!! + videoSourceTimestampOffset + } else { + null + } + runBlocking { + this@EncodingPipelineOutput.endpointInternal.write(frame, it) + } + } + } + } + + private val _audioCodecConfigFlow = MutableStateFlow(null) + override val audioCodecConfigFlow: StateFlow = _audioCodecConfigFlow + + private val audioCodecConfig: AudioCodecConfig? + get() = _audioCodecConfigFlow.value + + override suspend fun setAudioCodecConfig(audioCodecConfig: AudioCodecConfig) { + audioConfigurationMutex.withLock { + setAudioCodecConfigInternal(audioCodecConfig) + } + } + + private suspend fun setAudioCodecConfigInternal(audioCodecConfig: AudioCodecConfig) { + require(!isStreaming.value) { "Can't change audio configuration while streaming" } + + if (this.audioCodecConfig == audioCodecConfig) { + Logger.i(TAG, "Audio configuration is the same, skipping configuration") + return + } + + applyAudioCodecConfig(audioCodecConfig) + _audioCodecConfigFlow.emit(audioCodecConfig) + } + + private fun applyAudioCodecConfig(audioConfig: AudioCodecConfig) { + try { + audioEncoderInternal?.release() + audioEncoderInternal = MediaCodecEncoder( + AudioEncoderConfig( + audioConfig + ), listener = audioEncoderListener + ).apply { + if (input is MediaCodecEncoder.ByteBufferInput) { + input.listener = requireNotNull(audioFrameRequestedListener) { + "Audio frame requested listener is required" + } + } else { + throw UnsupportedOperationException("Audio encoder only support ByteBuffer mode") + } + configure() + } + } catch (t: Throwable) { + audioEncoderInternal?.release() + audioEncoderInternal = null + throw t + } + } + + private val _videoCodecConfigFlow = MutableStateFlow(null) + override val videoCodecConfigFlow: StateFlow = _videoCodecConfigFlow + + private val videoCodecConfig: VideoCodecConfig? + get() = _videoCodecConfigFlow.value + + override suspend fun setVideoCodecConfig(videoCodecConfig: VideoCodecConfig) { + videoConfigurationMutex.withLock { + setVideoCodecConfigInternal(videoCodecConfig) + } + } + + private suspend fun setVideoCodecConfigInternal(videoCodecConfig: VideoCodecConfig) { + require(!isStreaming.value) { "Can't change video configuration while streaming" } + + if (this.videoCodecConfig == videoCodecConfig) { + Logger.i(TAG, "Video configuration is the same, skipping configuration") + return + } + + applyVideoCodecConfig(videoCodecConfig) + + _videoCodecConfigFlow.emit(videoCodecConfig) + } + + private fun applyVideoCodecConfig(videoConfig: VideoCodecConfig) { + try { + videoEncoderInternal = buildAndConfigureVideoEncoder( + videoConfig, targetRotation + ) + } catch (t: Throwable) { + videoEncoderInternal?.release() + videoEncoderInternal = null + throw t + } + } + + private fun buildAndConfigureVideoEncoder( + videoConfig: VideoCodecConfig, @RotationValue targetRotation: Int + ): IEncoderInternal { + val rotatedVideoConfig = videoConfig.rotateFromNaturalOrientation(context, targetRotation) + + // Release codec instance + videoEncoderInternal?.let { encoder -> + val input = encoder.input + if (input is MediaCodecEncoder.SurfaceInput) { + _surface.tryEmit(null) + } + encoder.release() + } + + // Prepare new codec instance + return buildVideoEncoder(rotatedVideoConfig, videoFrameRequestedListener == null).apply { + configure() + } + } + + private fun buildVideoEncoder( + videoConfig: VideoCodecConfig, useSurfaceMode: Boolean + ): IEncoderInternal { + val videoEncoder = MediaCodecEncoder( + VideoEncoderConfig( + videoConfig, useSurfaceMode + ), listener = videoEncoderListener + ) + + when (videoEncoder.input) { + is MediaCodecEncoder.SurfaceInput -> { + videoEncoder.input.listener = + object : IEncoderInternal.ISurfaceInput.OnSurfaceUpdateListener { + override fun onSurfaceUpdated(surface: Surface) { + _surface.tryEmit(SurfaceWithSize(surface, videoConfig.resolution)) + } + } + } + + is MediaCodecEncoder.ByteBufferInput -> { + videoEncoder.input.listener = requireNotNull(videoFrameRequestedListener) { + "Video frame requested listener is required" + } + } + + else -> { + throw UnsupportedOperationException("Unknown input type: ${videoEncoder.input}") + } + } + + return videoEncoder + } + + /** + * Opens the output endpoint. + * + * @param descriptor Media descriptor to open + */ + override suspend fun open(descriptor: MediaDescriptor) = withContext(coroutineDispatcher) { + openCloseMutex.withLock { + endpointInternal.open(descriptor) + } + } + + /** + * Closes the streamer endpoint. + */ + override suspend fun close() = withContext(coroutineDispatcher) { + openCloseMutex.withLock { + if (!isOpen.value) { + Logger.i(TAG, "Endpoint is already closed") + return@withContext + } + stopStreamInternal() + endpointInternal.close() + } + } + + /** + * Starts audio and/or video streams without a concurrent lock. + * + * @see [stopStream] + */ + private suspend fun startStreamInternal() { + require(isOpen.value) { "Endpoint must be opened before starting stream" } + require(!isStreaming.value) { "Stream is already running" } + require(hasAudio || hasVideo) { "At least one of audio or video must be set" } + if (hasAudio) { + requireNotNull(audioCodecConfig) { "Audio configuration must be set" } + } + if (hasVideo) { + requireNotNull(videoCodecConfig) { "Video configuration must be set" } + } + + try { + val streams = mutableListOf() + val orientedVideoConfig = videoCodecConfig?.let { + /** + * We need to get oriented size for the muxer. + * For example, the [FlvMuxer] `onMetaData` event needs to know the oriented size. + */ + it.rotateFromNaturalOrientation(context, targetRotation).apply { + streams.add(this) + } + } + + audioCodecConfig?.let { + streams.add(it) + } + + val streamsIdMap = endpointInternal.addStreams(streams) + orientedVideoConfig?.let { + videoStreamId = streamsIdMap[it] + } + audioCodecConfig?.let { audioStreamId = streamsIdMap[it] } + + endpointInternal.startStream() + + audioEncoderInternal?.startStream() + + videoEncoderInternal?.startStream() + + bitrateRegulatorController?.start() + + _isStreaming.emit(true) + } catch (t: Throwable) { + stopStreamInternal() + throw t + } + } + + /** + * Starts audio and/or video streams. + * + * Before starting the stream, the endpoint must be opened with [open] and the audio and/or + * video configuration must be set. + * + * @see [stopStream] + */ + override suspend fun startStream() = withContext(coroutineDispatcher) { + openCloseMutex.lock() + audioConfigurationMutex.lock() + videoConfigurationMutex.lock() + + try { + startStreamInternal() + } finally { + audioConfigurationMutex.unlock() + videoConfigurationMutex.unlock() + openCloseMutex.unlock() + } + } + + /** + * Stops audio/video stream. + * + * Internally, it resets audio and video recorders and encoders to get them ready for another + * [startStream] session. It explains why preview could be restarted. + * + * @see [startStream] + */ + override suspend fun stopStream() = withContext(coroutineDispatcher) { + openCloseMutex.lock() + audioConfigurationMutex.lock() + videoConfigurationMutex.lock() + + try { + stopStreamInternal() + } finally { + audioConfigurationMutex.unlock() + videoConfigurationMutex.unlock() + openCloseMutex.unlock() + } + } + + /** + * Stops audio/video and reset stream implementation. + * + * @see [stopStream] + */ + private suspend fun stopStreamInternal() { + if (!isStreaming.value) { + Logger.i(TAG, "Stream is already stopped") + return + } + + stopStreamImpl() + + try { + audioEncoderInternal?.reset() + } catch (t: Throwable) { + Logger.w(TAG, "Can't reset audio encoder: ${t.message}") + } + try { + videoEncoderInternal?.reset() + } catch (t: Throwable) { + Logger.w(TAG, "Can't reset video encoder: ${t.message}") + } + + _isStreaming.emit(false) + } + + private suspend fun resetVideoEncoder() { + _surface.emit(null) + + val previousVideoEncoder = videoEncoderInternal + pendingTargetRotation?.let { + setTargetRotationInternal(it) + } + pendingTargetRotation = null + + // Only reset if the encoder is the same. Otherwise, it is already configured. + if (previousVideoEncoder == videoEncoderInternal) { + videoEncoderInternal?.reset() + } + } + + /** + * Stops audio/video stream implementation without a concurrent lock. + * + * @see [stopStream] + */ + private suspend fun stopStreamImpl() { + try { + bitrateRegulatorController?.stop() + } catch (t: Throwable) { + Logger.w(TAG, "Can't stop bitrate regulator controller: ${t.message}") + } + + // Encoders + try { + audioEncoderInternal?.stopStream() + } catch (t: Throwable) { + Logger.w(TAG, "Can't stop audio encoder: ${t.message}") + } + try { + videoEncoderInternal?.stopStream() + } catch (t: Throwable) { + Logger.w(TAG, "Can't stop video encoder: ${t.message}") + } + + // Endpoint + try { + endpointInternal.stopStream() + } catch (t: Throwable) { + Logger.w(TAG, "Can't stop endpoint: ${t.message}") + } + } + + /** + * Releases endpoint and encoders. + */ + override fun release() { + // Encoders + audioEncoderInternal?.release() + audioEncoderInternal = null + videoEncoderInternal?.release() + videoEncoderInternal = null + + _surface.tryEmit(null) + + // Endpoint + endpointInternal.release() + } + + /** + * Adds a bitrate regulator controller. + * + * Limitation: it is only available for SRT for now. + */ + override fun addBitrateRegulatorController(controllerFactory: IBitrateRegulatorController.Factory) { + bitrateRegulatorController?.stop() + bitrateRegulatorController = controllerFactory.newBitrateRegulatorController(this).apply { + if (isStreaming.value) { + this.start() + } + Logger.d( + TAG, "Bitrate regulator controller added: ${this.javaClass.simpleName}" + ) + } + } + + /** + * Removes the bitrate regulator controller. + */ + override fun removeBitrateRegulatorController() { + bitrateRegulatorController?.stop() + bitrateRegulatorController = null + Logger.d(TAG, "Bitrate regulator controller removed") + } + + private fun setTargetRotationInternal(@RotationValue newTargetRotation: Int) { + if (shouldUpdateRotation(newTargetRotation)) { + updateVideoEncoderForTransformation() + } + } + + private fun updateVideoEncoderForTransformation() { + if (hasVideo) { + val videoConfig = videoCodecConfig + if (videoConfig != null) { + videoEncoderInternal = buildAndConfigureVideoEncoder( + videoConfig, targetRotation + ) + } + } + } + + /** + * @return true if the target rotation has changed + */ + private fun shouldUpdateRotation(@RotationValue newTargetRotation: Int): Boolean { + return if (targetRotation != newTargetRotation) { + _targetRotation = newTargetRotation + true + } else { + false + } + } +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/IEncodingPipelineOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/IEncodingPipelineOutput.kt new file mode 100644 index 000000000..1af8aa63c --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/IEncodingPipelineOutput.kt @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.pipelines.outputs.encoding + +import android.net.Uri +import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor +import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor +import io.github.thibaultbee.streampack.core.elements.encoders.AudioCodecConfig +import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder +import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig +import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.pipelines.outputs.IAudioPipelineOutputInternal +import io.github.thibaultbee.streampack.core.pipelines.outputs.IPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.IVideoPipelineOutputInternal +import io.github.thibaultbee.streampack.core.regulator.controllers.IBitrateRegulatorController +import io.github.thibaultbee.streampack.core.streamers.single.open +import io.github.thibaultbee.streampack.core.streamers.single.startStream +import kotlinx.coroutines.flow.StateFlow + +/** + * An output component for a streamer. + */ +interface IEncodingPipelineOutput : IPipelineOutput { + /** + * Advanced settings for the endpoint. + */ + val endpoint: IEndpoint + + /** + * Returns true if output is opened. + * For example, if the streamer is connected to a server if the endpoint is SRT or RTMP. + */ + val isOpen: StateFlow + + /** + * Opens the streamer output. + * + * @param descriptor Media descriptor to open + */ + suspend fun open(descriptor: MediaDescriptor) + + /** + * Closes the streamer output. + */ + suspend fun close() + + /** + * Adds a bitrate regulator controller to the streamer. + */ + fun addBitrateRegulatorController(controllerFactory: IBitrateRegulatorController.Factory) + + /** + * Removes the bitrate regulator controller from the streamer. + */ + fun removeBitrateRegulatorController() +} + +/** + * Opens the streamer endpoint. + * + * @param uri The uri to open + */ +suspend fun IEncodingPipelineOutput.open(uri: Uri) = + open(UriMediaDescriptor(uri)) + +/** + * Opens the streamer endpoint. + * + * @param uriString The uri to open + */ +suspend fun IEncodingPipelineOutput.open(uriString: String) = + open(UriMediaDescriptor(Uri.parse(uriString))) + + +/** + * Starts audio/video stream. + * + * Same as doing [open] and [startStream]. + * + * @param descriptor The media descriptor to open + * @see [IEncodingPipelineOutput.stopStream] + */ +suspend fun IEncodingPipelineOutput.startStream(descriptor: MediaDescriptor) { + open(descriptor) + startStream() +} + +/** + * Starts audio/video stream. + * + * Same as doing [open] and [startStream]. + * + * @param uri The uri to open + * @see [IEncodingPipelineOutput.stopStream] + */ +suspend fun IEncodingPipelineOutput.startStream(uri: Uri) { + open(uri) + startStream() +} + +/** + * Starts audio/video stream. + * + * Same as doing [open] and [startStream]. + * + * @param uriString The uri to open + * @see [IEncodingPipelineOutput.stopStream] + */ +suspend fun IEncodingPipelineOutput.startStream(uriString: String) { + open(uriString) + startStream() +} + +/** + * An audio encoding output component for a pipeline. + */ +interface IAudioEncodingPipelineOutput { + /** + * The audio configuration flow. + */ + val audioCodecConfigFlow: StateFlow + + /** + * Advanced settings for the audio encoder. + */ + val audioEncoder: IEncoder? + + /** + * Configures only audio codec settings. + * + * @param audioCodecConfig The audio codec configuration + * + * @throws [Throwable] if configuration can not be applied. + */ + suspend fun setAudioCodecConfig(audioCodecConfig: AudioCodecConfig) +} + +/** + * An video encoding output component for a pipeline. + */ +interface IVideoEncodingPipelineOutput { + /** + * The video configuration flow. + */ + val videoCodecConfigFlow: StateFlow + + /** + * Advanced settings for the video encoder. + */ + val videoEncoder: IEncoder? + + /** + * Configures only video codec settings. + * + * @param videoCodecConfig The video codec configuration + * + * @throws [Throwable] if configuration can not be applied. + */ + suspend fun setVideoCodecConfig(videoCodecConfig: VideoCodecConfig) +} + +/** + * An internal output component for a streamer. + */ +interface IAudioEncodingPipelineOutputInternal : IEncodingPipelineOutput, + IAudioEncodingPipelineOutput, + IAudioPipelineOutputInternal + +/** + * An internal output component for a streamer. + */ +interface IVideoEncodingPipelineOutputInternal : IEncodingPipelineOutput, + IVideoEncodingPipelineOutput, + IVideoPipelineOutputInternal + +interface IEncodingPipelineOutputInternal : IAudioEncodingPipelineOutputInternal, + IVideoEncodingPipelineOutputInternal + + + diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/utils/SourceConfigUtils.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/utils/SourceConfigUtils.kt new file mode 100644 index 000000000..25e4a7504 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/utils/SourceConfigUtils.kt @@ -0,0 +1,16 @@ +package io.github.thibaultbee.streampack.core.pipelines.utils + +import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig +import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig + +object SourceConfigUtils { + internal fun buildVideoSourceConfig(videoCodecConfigs: List): VideoSourceConfig { + val maxResolution = + videoCodecConfigs.map { it.resolution }.maxWith(compareBy({ it.width }, { it.height })) + val fps = videoCodecConfigs.first().fps + require(videoCodecConfigs.all { it.fps == fps }) { "All video codec configs must have the same fps" } + val dynamicRangeProfile = videoCodecConfigs.first().dynamicRangeProfile + require(videoCodecConfigs.all { it.dynamicRangeProfile == dynamicRangeProfile }) { "All video codec configs must have the same dynamic range profile" } + return VideoSourceConfig(maxResolution, fps, dynamicRangeProfile) + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt index 23dc1d6a7..478da0956 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt @@ -19,6 +19,7 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer /** @@ -41,9 +42,9 @@ abstract class BitrateRegulatorController( /** * Creates a [IBitrateRegulatorController] object from given parameters * - * @param streamer the [ICoroutineSingleStreamer] implementation. + * @param pipelineOutput the [ICoroutineSingleStreamer] implementation. * @return a [IBitrateRegulatorController] object */ - abstract override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): IBitrateRegulatorController + abstract override fun newBitrateRegulatorController(pipelineOutput: IEncodingPipelineOutput): IBitrateRegulatorController } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt index a4a6b7846..592e5211f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt @@ -19,8 +19,10 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.elements.utils.Scheduler +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IAudioEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IVideoEncodingPipelineOutput import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator -import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer /** * A [BitrateRegulatorController] implementation that triggers [IBitrateRegulator.update] every [delayTimeInMs]. @@ -34,7 +36,7 @@ import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleSt */ open class DefaultBitrateRegulatorController( audioEncoder: IEncoder?, - videoEncoder: IEncoder?, + videoEncoder: IEncoder, endpoint: IEndpoint, bitrateRegulatorFactory: IBitrateRegulator.Factory, bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), @@ -46,17 +48,13 @@ open class DefaultBitrateRegulatorController( bitrateRegulatorFactory, bitrateRegulatorConfig ) { - init { - requireNotNull(videoEncoder) { "Video encoder is required" } - } - /** * Bitrate regulator. Calls regularly by [scheduler]. Don't call it otherwise or you might break regulation. */ private val bitrateRegulator = bitrateRegulatorFactory.newBitrateRegulator( bitrateRegulatorConfig, { - videoEncoder!!.bitrate = it + videoEncoder.bitrate = it }, { /* Do nothing for audio */ } ) @@ -67,7 +65,7 @@ open class DefaultBitrateRegulatorController( private val scheduler = Scheduler(delayTimeInMs) { bitrateRegulator.update( endpoint.metrics, - videoEncoder?.bitrate ?: 0, + videoEncoder.bitrate, audioEncoder?.bitrate ?: 0 ) } @@ -85,11 +83,24 @@ open class DefaultBitrateRegulatorController( private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), private val delayTimeInMs: Long = 500 ) : BitrateRegulatorController.Factory() { - override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): BitrateRegulatorController { + override fun newBitrateRegulatorController(pipelineOutput: IEncodingPipelineOutput): BitrateRegulatorController { + require(pipelineOutput is IVideoEncodingPipelineOutput) { + "Pipeline output must be an video encoding output" + } + + val videoEncoder = requireNotNull(pipelineOutput.videoEncoder) { + "Video encoder must be set" + } + + val audioEncoder = if (pipelineOutput is IAudioEncodingPipelineOutput) { + pipelineOutput.audioEncoder + } else { + null + } return DefaultBitrateRegulatorController( - streamer.audioEncoder, - streamer.videoEncoder, - streamer.endpoint, + audioEncoder, + videoEncoder, + pipelineOutput.endpoint, bitrateRegulatorFactory, bitrateRegulatorConfig, delayTimeInMs diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IBitrateRegulatorController.kt index cd6255e0c..fcea8f294 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IBitrateRegulatorController.kt @@ -15,7 +15,7 @@ */ package io.github.thibaultbee.streampack.core.regulator.controllers -import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput /** * Interface to implement a bitrate regulator controller. @@ -36,12 +36,12 @@ interface IBitrateRegulatorController { /** * Creates a [IBitrateRegulatorController] object from given parameters * - * @param streamer the [ICoroutineSingleStreamer] implementation. + * @param pipelineOutput the [IEncodingPipelineOutput] implementation. * * @return a [IBitrateRegulatorController] object */ fun newBitrateRegulatorController( - streamer: ICoroutineSingleStreamer + pipelineOutput: IEncodingPipelineOutput ): IBitrateRegulatorController } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/AudioOnlySingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/AudioOnlySingleStreamer.kt index 357f32166..386512549 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/AudioOnlySingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/AudioOnlySingleStreamer.kt @@ -25,6 +25,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.audio.audiorecord. * A [SingleStreamer] that sends only microphone frames. * * @param context application context + * @param audioSourceInternal the audio source implementation * @param internalEndpoint the [IEndpointInternal] implementation */ open class AudioOnlySingleStreamer( diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt index 60d89d0e6..7eafac237 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt @@ -153,10 +153,6 @@ open class CameraSingleStreamer( return CameraStreamerConfigurationInfo(endpointInfo) } - override fun isMirroringRequired(): Boolean { - return cameraSource.infoProvider.isFrontFacing - } - /** * Sets a preview surface. */ diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt index f7b004435..02464bd9f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt @@ -98,26 +98,6 @@ interface ISingleStreamer { */ fun getInfo(descriptor: MediaDescriptor): IConfigurationInfo - /** - * Configures only audio settings. - * - * @param audioConfig Audio configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [release] - */ - fun setAudioConfig(audioConfig: AudioConfig) - - /** - * Configures only video settings. - * - * @param videoConfig Video configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [release] - */ - fun setVideoConfig(videoConfig: VideoConfig) - /** * Clean and reset the streamer. */ @@ -141,29 +121,6 @@ val ISingleStreamer.targetRotationDegrees: Int @IntRange(from = 0, to = 359) get() = targetRotation.rotationToDegrees - -/** - * Configures both video and audio settings. - * It is the first method to call after a [SingleStreamer] instantiation. - * It must be call when both stream and audio and video capture are not running. - * - * Use [IConfigurationInfo] to get value limits. - * - * If video encoder does not support [VideoConfig.level] or [VideoConfig.profile], it fallbacks - * to video encoder default level and default profile. - * - * @param audioConfig Audio configuration to set - * @param videoConfig Video configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [ISingleStreamer.release] - */ -@RequiresPermission(Manifest.permission.RECORD_AUDIO) -fun ISingleStreamer.setConfig(audioConfig: AudioConfig, videoConfig: VideoConfig) { - setAudioConfig(audioConfig) - setVideoConfig(videoConfig) -} - /** * A single Streamer based on coroutines. */ @@ -184,6 +141,26 @@ interface ICoroutineSingleStreamer : ISingleStreamer { */ val isStreaming: StateFlow + /** + * Configures only audio settings. + * + * @param audioConfig Audio configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [release] + */ + suspend fun setAudioConfig(audioConfig: AudioConfig) + + /** + * Configures only video settings. + * + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [release] + */ + suspend fun setVideoConfig(videoConfig: VideoConfig) + /** * Opens the streamer endpoint. * @@ -266,6 +243,28 @@ suspend fun ICoroutineSingleStreamer.startStream(uriString: String) { startStream() } +/** + * Configures both video and audio settings. + * It is the first method to call after a [SingleStreamer] instantiation. + * It must be call when both stream and audio and video capture are not running. + * + * Use [IConfigurationInfo] to get value limits. + * + * If video encoder does not support [VideoConfig.level] or [VideoConfig.profile], it fallbacks + * to video encoder default level and default profile. + * + * @param audioConfig Audio configuration to set + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [ISingleStreamer.release] + */ +@RequiresPermission(Manifest.permission.RECORD_AUDIO) +suspend fun ICoroutineSingleStreamer.setConfig(audioConfig: AudioConfig, videoConfig: VideoConfig) { + setAudioConfig(audioConfig) + setVideoConfig(videoConfig) +} + interface ICallbackSingleStreamer : ISingleStreamer { /** * Returns true if endpoint is opened. @@ -278,6 +277,26 @@ interface ICallbackSingleStreamer : ISingleStreamer { */ val isStreaming: Boolean + /** + * Configures only audio settings. + * + * @param audioConfig Audio configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [release] + */ + fun setAudioConfig(audioConfig: AudioConfig) + + /** + * Configures only video settings. + * + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [release] + */ + fun setVideoConfig(videoConfig: VideoConfig) + /** * Opens the streamer endpoint asynchronously. * @@ -402,3 +421,26 @@ fun ICallbackSingleStreamer.startStream(uri: Uri) = startStream(UriMediaDescript * @see [ICallbackSingleStreamer.stopStream] */ fun ICallbackSingleStreamer.startStream(uriString: String) = startStream(Uri.parse(uriString)) + + +/** + * Configures both video and audio settings. + * It is the first method to call after a [SingleStreamer] instantiation. + * It must be call when both stream and audio and video capture are not running. + * + * Use [IConfigurationInfo] to get value limits. + * + * If video encoder does not support [VideoConfig.level] or [VideoConfig.profile], it fallbacks + * to video encoder default level and default profile. + * + * @param audioConfig Audio configuration to set + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + * @see [ISingleStreamer.release] + */ +@RequiresPermission(Manifest.permission.RECORD_AUDIO) +fun ICallbackSingleStreamer.setConfig(audioConfig: AudioConfig, videoConfig: VideoConfig) { + setAudioConfig(audioConfig) + setVideoConfig(videoConfig) +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ScreenRecorderSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ScreenRecorderSingleStreamer.kt index 61d3907f8..91c7dd999 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ScreenRecorderSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ScreenRecorderSingleStreamer.kt @@ -75,7 +75,8 @@ open class ScreenRecorderSingleStreamer( (videoSourceInternal as MediaProjectionVideoSource).apply { listener = object : MediaProjectionVideoSource.Listener { override fun onStop() { - onStreamError(Exception("Screen source has been stopped")) + // TODO: handle error from source + // onStreamError(Exception("Screen source has been stopped")) } } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt index 1efb8d959..51023bf72 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt @@ -17,51 +17,29 @@ package io.github.thibaultbee.streampack.core.streamers.single import android.Manifest import android.content.Context -import android.util.Size import android.view.Surface import androidx.annotation.RequiresPermission import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor -import io.github.thibaultbee.streampack.core.elements.data.Frame -import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder -import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal -import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.AudioEncoderConfig -import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder -import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.VideoEncoderConfig -import io.github.thibaultbee.streampack.core.elements.encoders.rotateFromNaturalOrientation import io.github.thibaultbee.streampack.core.elements.endpoints.DynamicEndpoint import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal -import io.github.thibaultbee.streampack.core.elements.processing.video.SurfaceProcessor -import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.AbstractSurfaceOutput -import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.SurfaceOutput -import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal -import io.github.thibaultbee.streampack.core.elements.sources.video.ISurfaceSource -import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoFrameSource import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal import io.github.thibaultbee.streampack.core.elements.utils.RotationValue +import io.github.thibaultbee.streampack.core.elements.utils.combineStates import io.github.thibaultbee.streampack.core.elements.utils.extensions.displayRotation -import io.github.thibaultbee.streampack.core.elements.utils.extensions.sourceConfig -import io.github.thibaultbee.streampack.core.logger.Logger +import io.github.thibaultbee.streampack.core.pipelines.StreamerPipeline +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput import io.github.thibaultbee.streampack.core.regulator.controllers.IBitrateRegulatorController import io.github.thibaultbee.streampack.core.streamers.infos.IConfigurationInfo import io.github.thibaultbee.streampack.core.streamers.infos.StreamerConfigurationInfo -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.nio.ByteBuffer -import java.util.concurrent.Executors /** - * The single streamer implementation. - * - * A single streamer is a streamer that can handle only one stream at a time. + * Base class of all streamers. * * @param context the application context * @param videoSourceInternal the video source implementation @@ -71,153 +49,87 @@ import java.util.concurrent.Executors */ open class SingleStreamer( protected val context: Context, - protected val audioSourceInternal: IAudioSourceInternal?, - protected val videoSourceInternal: IVideoSourceInternal?, - protected val endpointInternal: IEndpointInternal = DynamicEndpoint(context), + audioSourceInternal: IAudioSourceInternal?, + videoSourceInternal: IVideoSourceInternal?, + endpointInternal: IEndpointInternal = DynamicEndpoint(context), @RotationValue defaultRotation: Int = context.displayRotation ) : ICoroutineSingleStreamer { - private val dispatcher: CoroutineDispatcher = - Executors.newSingleThreadExecutor().asCoroutineDispatcher() - - private val _throwable = MutableStateFlow(null) - override val throwable: StateFlow = _throwable - - private var audioStreamId: Int? = null - private var videoStreamId: Int? = null - - private var bitrateRegulatorController: IBitrateRegulatorController? = null - - // Keep configurations - private var _audioConfig: AudioConfig? = null - private var _videoConfig: VideoConfig? = null - - override val audioConfig: AudioConfig? - get() = _audioConfig - - override val videoConfig: VideoConfig? - get() = _videoConfig - - protected val sourceInfoProvider = videoSourceInternal?.infoProvider - - private val audioEncoderListener = object : IEncoderInternal.IListener { - override fun onError(t: Throwable) { - onStreamError(t) - } - - override fun onOutputFrame(frame: Frame) { - audioStreamId?.let { - runBlocking { - this@SingleStreamer.endpointInternal.write(frame, it) - } - } - } + private val streamerOutput = EncodingPipelineOutput( + context, + endpointInternal, + defaultRotation + ) + private val pipeline = StreamerPipeline( + context, + audioSourceInternal, + videoSourceInternal + ).apply { + addOutput(streamerOutput) } - private val videoEncoderListener = object : IEncoderInternal.IListener { - override fun onError(t: Throwable) { - onStreamError(t) + override val throwable: StateFlow = + combineStates(pipeline.throwable, streamerOutput.throwable) { throwableArray -> + throwableArray[0] ?: throwableArray[1] } - override fun onOutputFrame(frame: Frame) { - videoStreamId?.let { - val videoEncoder = - requireNotNull(videoEncoderInternal) { "Video encoder must not be null" } - val timestampOffset = if (videoEncoder is ISurfaceSource) { - videoEncoder.timestampOffset - } else { - 0L - } - frame.pts += timestampOffset - frame.dts = if (frame.dts != null) { - frame.dts!! + timestampOffset - } else { - null - } - runBlocking { - this@SingleStreamer.endpointInternal.write(frame, it) - } - } - } - } + override val isOpen: StateFlow + get() = streamerOutput.isOpen - /** - * Manages error on stream. - * Stops only stream. - * - * @param t triggered [Throwable] - */ - protected fun onStreamError(t: Throwable) { - try { - runBlocking { - stopStream() - } - } catch (t: Throwable) { - Logger.e(TAG, "onStreamError: Can't stop stream", t) - } finally { - Logger.e(TAG, "onStreamError: ${t.message}", t) - _throwable.tryEmit(t) - } + override val isStreaming: StateFlow = combineStates( + streamerOutput.isStreaming, + pipeline.isStreaming + ) { isStreamingArray -> + isStreamingArray[0] && isStreamingArray[1] } - // SOURCES - + // AUDIO /** * The audio source. - * It allows advanced audio settings. + * It allows advanced audio source settings. */ override val audioSource: IAudioSource? - get() = audioSourceInternal + get() = pipeline.audioSource + override val audioEncoder: IEncoder? + get() = streamerOutput.audioEncoder + // VIDEO /** * The video source. - * It allows advanced video settings. + * It allows advanced video source settings. */ override val videoSource: IVideoSource? - get() = videoSourceInternal - - // ENCODERS - - private var audioEncoderInternal: IEncoderInternal? = null - - /** - * The audio encoder. - * Only valid when audio has been [setAudioConfig]. It is null after [release]. - */ - override val audioEncoder: IEncoder? - get() = audioEncoderInternal - - private var videoEncoderInternal: IEncoderInternal? = null - - /** - * The video encoder. - * Only valid when audio has been [setAudioConfig]. It is null after [release]. - */ + get() = pipeline.videoSource override val videoEncoder: IEncoder? - get() = videoEncoderInternal + get() = streamerOutput.videoEncoder - private var surfaceProcessor: SurfaceProcessor? = null + // INTERNAL + protected val videoSourceInternal = pipeline.videoSource as IVideoSourceInternal? + protected val audioSourceInternal = pipeline.audioSource as IAudioSourceInternal? // ENDPOINT - override val endpoint: IEndpoint - get() = endpointInternal - - override val isOpen: StateFlow - get() = endpointInternal.isOpen - - - private val _isStreaming = MutableStateFlow(false) - override val isStreaming: StateFlow = _isStreaming + get() = streamerOutput.endpoint /** * Whether the streamer has audio. */ - val hasAudio = audioSourceInternal != null + val hasAudio: Boolean + get() = pipeline.hasAudio /** * Whether the streamer has video. */ - val hasVideo = videoSourceInternal != null + val hasVideo: Boolean + get() = pipeline.hasVideo + + /** + * The target rotation in [Surface] rotation ([Surface.ROTATION_0], ...) + */ + override var targetRotation: Int + @RotationValue get() = pipeline.targetRotation + set(@RotationValue newTargetRotation) { + pipeline.targetRotation = newTargetRotation + } /** * Gets configuration information. @@ -229,34 +141,6 @@ open class SingleStreamer( override val info: IConfigurationInfo get() = StreamerConfigurationInfo(endpoint.info) - /** - * The target rotation in [Surface] rotation ([Surface.ROTATION_0], ...) - */ - @RotationValue - private var _targetRotation = defaultRotation - - /** - * Keep the target rotation if it can't be applied immediately. - * It will be applied when the stream is stopped. - */ - @RotationValue - private var pendingTargetRotation: Int? = null - - /** - * The target rotation in [Surface] rotation ([Surface.ROTATION_0], ...) - */ - override var targetRotation: Int - @RotationValue get() = _targetRotation - set(@RotationValue newTargetRotation) { - if (isStreaming.value) { - Logger.w(TAG, "Can't change rotation while streaming") - pendingTargetRotation = newTargetRotation - return - } - - setTargetRotationInternal(newTargetRotation) - } - /** * Gets configuration information from [MediaDescriptor]. * @@ -274,6 +158,16 @@ open class SingleStreamer( return StreamerConfigurationInfo(endpointInfo) } + // CONFIGURATION + /** + * Whether the streamer has audio configuration. + */ + val hasAudioConfig: Boolean + get() = streamerOutput.audioCodecConfigFlow.value != null + + override val audioConfig: AudioConfig + get() = requireNotNull(streamerOutput.audioCodecConfigFlow.value) + /** * Configures audio settings. * It is the first method to call after a [SingleStreamer] instantiation. @@ -286,210 +180,18 @@ open class SingleStreamer( * @throws [Throwable] if configuration can not be applied. */ @RequiresPermission(Manifest.permission.RECORD_AUDIO) - override fun setAudioConfig(audioConfig: AudioConfig) { - require(hasAudio) { "Do not need to set audio as it is a video only streamer" } - requireNotNull(audioSourceInternal) { "Audio source must not be null" } - - if (this._audioConfig == audioConfig) { - Logger.i(TAG, "Audio configuration is the same, skipping configuration") - return - } - - this._audioConfig = audioConfig - - try { - audioSourceInternal.configure(audioConfig.sourceConfig) - - audioEncoderInternal?.release() - audioEncoderInternal = MediaCodecEncoder( - AudioEncoderConfig( - audioConfig - ), listener = audioEncoderListener - ).apply { - if (input is MediaCodecEncoder.ByteBufferInput) { - input.listener = - object : IEncoderInternal.IByteBufferInput.OnFrameRequestedListener { - override fun onFrameRequested(buffer: ByteBuffer): Frame { - return audioSourceInternal.getAudioFrame(buffer) - } - } - } else { - throw UnsupportedOperationException("Audio encoder only support ByteBuffer mode") - } - configure() - } - } catch (t: Throwable) { - release() - throw t - } + override suspend fun setAudioConfig(audioConfig: AudioConfig) { + streamerOutput.setAudioCodecConfig(audioConfig) } /** - * Creates a surface output for the given surface. - * - * Use it for additional processing. - * - * @param surface the encoder surface - * @param resolution the resolution of the surface - * @param infoProvider the source info provider for internal processing + * Whether the streamer has video configuration. */ - protected open fun buildSurfaceOutput( - surface: Surface, resolution: Size, infoProvider: ISourceInfoProvider - ): AbstractSurfaceOutput { - return SurfaceOutput( - surface, resolution, SurfaceOutput.TransformationInfo( - targetRotation, isMirroringRequired(), infoProvider - ) - ) - } + val hasVideoConfig: Boolean + get() = streamerOutput.videoCodecConfigFlow.value != null - /** - * Whether the output surface needs to be mirrored. - */ - protected open fun isMirroringRequired(): Boolean { - return false - } - - /** - * Updates the transformation of the surface output. - * To be called when the source info provider or [isMirroringRequired] is updated. - */ - protected fun updateTransformation() { - val sourceInfoProvider = requireNotNull(sourceInfoProvider) { - "Source info provider must not be null" - } - val videoConfig = requireNotNull(videoConfig) { "Video config must not be null" } - - val videoEncoder = requireNotNull(videoEncoderInternal) { "Video encoder must not be null" } - val input = videoEncoder.input as MediaCodecEncoder.SurfaceInput - - val surface = requireNotNull(input.surface) { "Surface must not be null" } - updateTransformation(surface, videoConfig.resolution, sourceInfoProvider) - } - - /** - * Updates the transformation of the surface output. - */ - protected open fun updateTransformation( - surface: Surface, resolution: Size, infoProvider: ISourceInfoProvider - ) { - Logger.i(TAG, "Updating transformation") - surfaceProcessor?.removeOutputSurface(surface) - surfaceProcessor?.addOutputSurface( - buildSurfaceOutput( - surface, resolution, infoProvider - ) - ) - } - - private fun buildOrUpdateSurfaceProcessor( - videoConfig: VideoConfig, videoSource: IVideoSourceInternal - ): SurfaceProcessor { - if (videoSource !is ISurfaceSource) { - throw IllegalStateException("Video source must have an output surface") - } - val previousSurfaceProcessor = surfaceProcessor - val newSurfaceProcessor = when { - previousSurfaceProcessor == null -> SurfaceProcessor(videoConfig.dynamicRangeProfile) - previousSurfaceProcessor.dynamicRangeProfile != videoConfig.dynamicRangeProfile -> { - videoSource.outputSurface?.let { - previousSurfaceProcessor.removeInputSurface(it) - } - previousSurfaceProcessor.removeAllOutputSurfaces() - previousSurfaceProcessor.release() - SurfaceProcessor(videoConfig.dynamicRangeProfile) - } - - else -> previousSurfaceProcessor - } - - if (newSurfaceProcessor != previousSurfaceProcessor) { - videoSource.outputSurface = newSurfaceProcessor.createInputSurface( - videoSource.infoProvider.getSurfaceSize( - videoConfig.resolution, targetRotation - ) - ) - } else { - newSurfaceProcessor.updateInputSurface( - videoSource.outputSurface!!, - videoSource.infoProvider.getSurfaceSize(videoConfig.resolution, targetRotation) - ) - } - - return newSurfaceProcessor - } - - private fun buildAndConfigureVideoEncoder( - videoConfig: VideoConfig, videoSource: IVideoSourceInternal - ): IEncoderInternal { - val videoEncoder = MediaCodecEncoder( - VideoEncoderConfig( - videoConfig, videoSource is ISurfaceSource - ), listener = videoEncoderListener - ) - - when (videoEncoder.input) { - is MediaCodecEncoder.SurfaceInput -> { - surfaceProcessor = buildOrUpdateSurfaceProcessor(videoConfig, videoSource) - - videoEncoder.input.listener = - object : IEncoderInternal.ISurfaceInput.OnSurfaceUpdateListener { - override fun onSurfaceUpdated(surface: Surface) { - val surfaceProcessor = requireNotNull(surfaceProcessor) { - "Surface processor must not be null" - } - // TODO: only remove previous encoder surface - surfaceProcessor.removeAllOutputSurfaces() - Logger.d(TAG, "Updating with new encoder surface input") - surfaceProcessor.addOutputSurface( - buildSurfaceOutput( - surface, videoConfig.resolution, videoSource.infoProvider - ) - ) - } - } - } - - is MediaCodecEncoder.ByteBufferInput -> { - videoEncoder.input.listener = - object : IEncoderInternal.IByteBufferInput.OnFrameRequestedListener { - override fun onFrameRequested(buffer: ByteBuffer): Frame { - return (videoSource as IVideoFrameSource).getVideoFrame(buffer) - } - } - } - - else -> { - throw UnsupportedOperationException("Unknown input type") - } - } - - videoEncoder.configure() - - return videoEncoder - } - - private fun buildAndConfigureVideoEncoderIfNeeded( - videoConfig: VideoConfig, - videoSource: IVideoSourceInternal, - @RotationValue targetRotation: Int - ): IEncoderInternal { - val rotatedVideoConfig = videoConfig.rotateFromNaturalOrientation(context, targetRotation) - - // Release codec instance - videoEncoderInternal?.let { encoder -> - val input = encoder.input - if (input is MediaCodecEncoder.SurfaceInput) { - input.surface?.let { surface -> - surfaceProcessor?.removeOutputSurface(surface) - } - } - encoder.release() - } - - // Prepare new codec instance - return buildAndConfigureVideoEncoder(rotatedVideoConfig, videoSource) - } + override val videoConfig: VideoConfig + get() = requireNotNull(streamerOutput.videoCodecConfigFlow.value) /** * Configures video settings. @@ -505,27 +207,12 @@ open class SingleStreamer( * * @throws [Throwable] if configuration can not be applied. */ - override fun setVideoConfig(videoConfig: VideoConfig) { - require(hasVideo) { "Do not need to set video as it is a audio only streamer" } - requireNotNull(videoSourceInternal) { "Video source must not be null" } - - if (this._videoConfig == videoConfig) { - Logger.i(TAG, "Video configuration is the same, skipping configuration") - return - } - - this._videoConfig = videoConfig - - try { - videoSourceInternal.configure(videoConfig.sourceConfig) + override suspend fun setVideoConfig(videoConfig: VideoConfig) { + streamerOutput.setVideoCodecConfig(videoConfig) + } - videoEncoderInternal = buildAndConfigureVideoEncoderIfNeeded( - videoConfig, videoSourceInternal, targetRotation - ) - } catch (t: Throwable) { - release() - throw t - } + protected fun updateTransformation() { + pipeline.resetSurfaceProcessorOutputSurface() } /** @@ -533,17 +220,12 @@ open class SingleStreamer( * * @param descriptor Media descriptor to open */ - override suspend fun open(descriptor: MediaDescriptor) = withContext(dispatcher) { - endpointInternal.open(descriptor) - } + override suspend fun open(descriptor: MediaDescriptor) = streamerOutput.open(descriptor) /** * Closes the streamer endpoint. */ - override suspend fun close() = withContext(dispatcher) { - stopStreamInternal() - endpointInternal.close() - } + override suspend fun close() = streamerOutput.close() /** * Starts audio/video stream. @@ -553,51 +235,7 @@ open class SingleStreamer( * * @see [stopStream] */ - override suspend fun startStream() = withContext(dispatcher) { - require(isOpen.value) { "Endpoint must be opened before starting stream" } - require(!isStreaming.value) { "Stream is already running" } - - try { - val streams = mutableListOf() - val orientedVideoConfig = if (hasVideo) { - val videoConfig = requireNotNull(_videoConfig) { "Requires video config" } - /** - * If sourceOrientationProvider is not null, we need to get oriented size. - * For example, the [FlvMuxer] `onMetaData` event needs to know the oriented size. - */ - videoConfig.rotateFromNaturalOrientation(context, targetRotation) - } else { - null - } - if (orientedVideoConfig != null) { - streams.add(orientedVideoConfig) - } - - if (hasAudio) { - val audioConfig = requireNotNull(_audioConfig) { "Requires audio config" } - streams.add(audioConfig) - } - - val streamsIdMap = endpointInternal.addStreams(streams) - orientedVideoConfig?.let { videoStreamId = streamsIdMap[orientedVideoConfig] } - _audioConfig?.let { audioStreamId = streamsIdMap[_audioConfig as CodecConfig] } - - endpointInternal.startStream() - - audioSourceInternal?.startStream() - audioEncoderInternal?.startStream() - - videoSourceInternal?.startStream() - videoEncoderInternal?.startStream() - - bitrateRegulatorController?.start() - - _isStreaming.emit(true) - } catch (t: Throwable) { - stopStreamInternal() - throw t - } - } + override suspend fun startStream() = streamerOutput.startStream() /** * Stops audio/video stream. @@ -607,97 +245,13 @@ open class SingleStreamer( * * @see [startStream] */ - override suspend fun stopStream() = withContext(dispatcher) { - stopStreamInternal() - } - - private fun resetVideoEncoder() { - val previousVideoEncoder = videoEncoderInternal - pendingTargetRotation?.let { - setTargetRotationInternal(it) - } - pendingTargetRotation = null - - // Only reset if the encoder is the same. Otherwise, it is already configured. - if (previousVideoEncoder == videoEncoderInternal) { - videoEncoderInternal?.reset() - } - } - - /** - * Stops audio/video and reset stream implementation. - * - * @see [stopStream] - */ - private suspend fun stopStreamInternal() { - stopStreamImpl() - - audioEncoderInternal?.reset() - resetVideoEncoder() - - _isStreaming.emit(false) - } + override suspend fun stopStream() = pipeline.stopStream() /** - * Stops audio/video stream implementation. - * - * @see [stopStream] - */ - private suspend fun stopStreamImpl() { - bitrateRegulatorController?.stop() - - // Sources - audioSourceInternal?.stopStream() - videoSourceInternal?.stopStream() - - // Encoders - try { - audioEncoderInternal?.stopStream() - } catch (e: IllegalStateException) { - Logger.w(TAG, "stopStreamImpl: Can't stop audio encoder: ${e.message}") - } - try { - videoEncoderInternal?.stopStream() - } catch (e: IllegalStateException) { - Logger.w(TAG, "stopStreamImpl: Can't stop video encoder: ${e.message}") - } - - // Endpoint - endpointInternal.stopStream() - } - - /** - * Releases recorders and encoders object. - * It also stops preview if needed - * - * @see [setAudioConfig] + * Releases the streamer. */ override fun release() { - // Sources - audioSourceInternal?.release() - val videoSource = videoSourceInternal - val outputSurface = if (videoSource is ISurfaceSource) { - val surface = videoSource.outputSurface - videoSource.outputSurface = null - surface - } else { - null - } - videoSourceInternal?.release() - outputSurface?.let { - surfaceProcessor?.removeInputSurface(it) - } - - surfaceProcessor?.release() - - // Encoders - audioEncoderInternal?.release() - audioEncoderInternal = null - videoEncoderInternal?.release() - videoEncoderInternal = null - - // Endpoint - endpointInternal.release() + pipeline.release() } /** @@ -705,57 +259,14 @@ open class SingleStreamer( * * Limitation: it is only available for SRT for now. */ - override fun addBitrateRegulatorController(controllerFactory: IBitrateRegulatorController.Factory) { - bitrateRegulatorController?.stop() - bitrateRegulatorController = controllerFactory.newBitrateRegulatorController(this).apply { - if (isStreaming.value) { - this.start() - } - Logger.d( - TAG, "Bitrate regulator controller added: ${this.javaClass.simpleName}" - ) - } - - } + override fun addBitrateRegulatorController(controllerFactory: IBitrateRegulatorController.Factory) = + streamerOutput.addBitrateRegulatorController(controllerFactory) /** * Removes the bitrate regulator controller. */ - override fun removeBitrateRegulatorController() { - bitrateRegulatorController?.stop() - bitrateRegulatorController = null - Logger.d(TAG, "Bitrate regulator controller removed") - } - - private fun setTargetRotationInternal(@RotationValue newTargetRotation: Int) { - if (shouldUpdateRotation(newTargetRotation)) { - sendTransformation() - } - } - - private fun sendTransformation() { - if (hasVideo) { - val videoConfig = videoConfig - if (videoConfig != null) { - videoSourceInternal?.configure(videoConfig.sourceConfig) - videoEncoderInternal = buildAndConfigureVideoEncoderIfNeeded( - videoConfig, requireNotNull(videoSourceInternal), targetRotation - ) - } - } - } - - /** - * @return true if the target rotation has changed - */ - private fun shouldUpdateRotation(@RotationValue newTargetRotation: Int): Boolean { - return if (targetRotation != newTargetRotation) { - _targetRotation = newTargetRotation - true - } else { - false - } - } + override fun removeBitrateRegulatorController() = + streamerOutput.removeBitrateRegulatorController() companion object { const val TAG = "SingleStreamer" diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt index cb32da6fb..1066ff072 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt @@ -100,11 +100,15 @@ open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : @RequiresPermission(Manifest.permission.RECORD_AUDIO) override fun setAudioConfig(audioConfig: AudioConfig) { - streamer.setAudioConfig(audioConfig) + coroutineScope.launch { + streamer.setAudioConfig(audioConfig) + } } override fun setVideoConfig(videoConfig: VideoConfig) { - streamer.setVideoConfig(videoConfig) + coroutineScope.launch { + streamer.setVideoConfig(videoConfig) + } } override fun open(descriptor: MediaDescriptor) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CameraCallbackSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CameraCallbackSingleStreamer.kt index 6dfad5a94..e4d76b9ce 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CameraCallbackSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CameraCallbackSingleStreamer.kt @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2024 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.github.thibaultbee.streampack.core.streamers.single.callbacks import android.Manifest @@ -19,6 +34,7 @@ import io.github.thibaultbee.streampack.core.streamers.single.CameraSingleStream import io.github.thibaultbee.streampack.core.streamers.single.ICallbackSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.SingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.open import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt index a28d18743..1f80caaa6 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt @@ -16,9 +16,11 @@ package io.github.thibaultbee.streampack.ext.srt.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IAudioEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput +import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IVideoEncodingPipelineOutput import io.github.thibaultbee.streampack.core.regulator.controllers.BitrateRegulatorController import io.github.thibaultbee.streampack.core.regulator.controllers.DefaultBitrateRegulatorController -import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer import io.github.thibaultbee.streampack.ext.srt.regulator.DefaultSrtBitrateRegulator import io.github.thibaultbee.streampack.ext.srt.regulator.SrtBitrateRegulator @@ -31,11 +33,24 @@ class DefaultSrtBitrateRegulatorController { private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), private val delayTimeInMs: Long = 500 ) : BitrateRegulatorController.Factory() { - override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): DefaultBitrateRegulatorController { + override fun newBitrateRegulatorController(pipelineOutput: IEncodingPipelineOutput): DefaultBitrateRegulatorController { + require(pipelineOutput is IVideoEncodingPipelineOutput) { + "Pipeline output must be an video encoding output" + } + + val videoEncoder = requireNotNull(pipelineOutput.videoEncoder) { + "Video encoder must be set" + } + + val audioEncoder = if (pipelineOutput is IAudioEncodingPipelineOutput) { + pipelineOutput.audioEncoder + } else { + null + } return DefaultBitrateRegulatorController( - streamer.audioEncoder, - streamer.videoEncoder, - streamer.endpoint, + audioEncoder, + videoEncoder, + pipelineOutput.endpoint, bitrateRegulatorFactory, bitrateRegulatorConfig, delayTimeInMs