Skip to content

Commit

Permalink
refactor(*): introduce streamer pipeline to enable multiple output
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaultBee committed Jan 20, 2025
1 parent 7920b7b commit eacd8be
Show file tree
Hide file tree
Showing 52 changed files with 3,432 additions and 1,078 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.github.thibaultbee.streampack.core.elements.endpoints

import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
import io.github.thibaultbee.streampack.core.elements.data.Frame
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow

class DummyEndpoint : IEndpointInternal {
private val _isOpen = MutableStateFlow(false)
override val isOpen: StateFlow<Boolean> = _isOpen

var numOfAudioFramesWritten = 0
private set
var numOfVideoFramesWritten = 0
private set
val numOfFramesWritten: Int
get() = numOfAudioFramesWritten + numOfVideoFramesWritten

private val _isStreaming = MutableStateFlow(false)
val isStreaming: StateFlow<Boolean> = _isStreaming

override val info: IEndpoint.IEndpointInfo
get() = TODO("Not yet implemented")

override fun getInfo(type: MediaDescriptor.Type): IEndpoint.IEndpointInfo {
TODO("Not yet implemented")
}

override val metrics: Any
get() = TODO("Not yet implemented")

override suspend fun open(descriptor: MediaDescriptor) {
_isOpen.emit(true)
}

override suspend fun close() {
_isOpen.emit(false)
}

override suspend fun write(frame: Frame, streamPid: Int) {
when {
frame.isAudio -> numOfAudioFramesWritten++
frame.isVideo -> numOfVideoFramesWritten++
}
}

override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
return streamConfigs.associateWith { it.hashCode() }
}

override fun addStream(streamConfig: CodecConfig): Int {
return streamConfig.hashCode()
}

override suspend fun startStream() {
_isStreaming.emit(true)
}

override suspend fun stopStream() {
_isStreaming.emit(false)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.thibaultbee.streampack.core.pipelines

import org.junit.Assert.*

class StreamerPipelineTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.pipelines.outputs

import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal
import io.github.thibaultbee.streampack.core.logger.Logger
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow

class DummyVideoPipelineOutput : DummyPipelineOutput(hasAudio = false, hasVideo = true), IVideoPipelineOutputInternal {
companion object {
private const val TAG = "DummyVideoPipelineOutput"
}

override var targetRotation: Int = 0
override val surface: StateFlow<SurfaceWithSize?>
get() = TODO("Not yet implemented")
override var videoSourceTimestampOffset: Long
get() = TODO("Not yet implemented")
set(value) {}
override var videoFrameRequestedListener: IEncoderInternal.IByteBufferInput.OnFrameRequestedListener?
get() = TODO("Not yet implemented")
set(value) {}
}

open class DummyPipelineOutput(override val hasAudio: Boolean, override val hasVideo: Boolean) :
IPipelineOutputInternal {

override var streamListener: IPipelineOutputInternal.Listener? = null

private val _throwable = MutableStateFlow<Throwable?>(null)
override val throwable: StateFlow<Throwable?> = _throwable

private val _isStreaming = MutableStateFlow(false)
override val isStreaming: StateFlow<Boolean> = _isStreaming

override suspend fun startStream() {
Logger.i(TAG, "Start stream")
_isStreaming.emit(true)
}

override suspend fun stopStream() {
Logger.i(TAG, "Stop stream")
_isStreaming.emit(false)
}

override suspend fun release() {
Logger.i(TAG, "Release")
_isStreaming.emit(false)
}

companion object {
private const val TAG = "DummyPipelineOutput"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.thibaultbee.streampack.core.pipelines.outputs.encoding

import android.content.Context
import androidx.test.platform.app.InstrumentationRegistry
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor
import io.github.thibaultbee.streampack.core.elements.endpoints.DummyEndpoint
import junit.framework.TestCase.assertTrue
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertFalse
import org.junit.Test

class EncodingPipelineOutputTest {
private val context: Context = InstrumentationRegistry.getInstrumentation().context

@Test
fun testOpenClose() = runTest {
val output = EncodingPipelineOutput(context, endpointInternal = DummyEndpoint())
output.open(UriMediaDescriptor("file://test.mp4"))
assertTrue(output.isOpen.value)
output.close()
assertFalse(output.isOpen.value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.utils
package io.github.thibaultbee.streampack.core.pipelines.utils

import android.media.MediaFormat
import android.util.Size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.utils
package io.github.thibaultbee.streampack.core.pipelines.utils

import androidx.test.platform.app.InstrumentationRegistry
import java.io.File
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.pipelines.utils

import android.media.MediaFormat
import android.util.Size
import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig
import org.junit.Assert.assertEquals
import org.junit.Assert.fail
import org.junit.Test

class SourceConfigUtilsTest {

@Test
fun videoSourceConfigFromEmpty() {
try {
SourceConfigUtils.buildVideoSourceConfig(emptySet())
fail("Video codec configs must not be empty")
} catch (_: Throwable) {
}
}

@Test
fun buildVideoSourceConfigWithSimple() {
// Given
val videoCodecConfigs = setOf(
VideoCodecConfig(
MediaFormat.MIMETYPE_VIDEO_AVC,
resolution = Size(1280, 720),
fps = 30
),
VideoCodecConfig(MediaFormat.MIMETYPE_VIDEO_AVC, resolution = Size(1280, 720), fps = 30)
)

// When
val videoSourceConfig = SourceConfigUtils.buildVideoSourceConfig(videoCodecConfigs)

// Then
assertEquals(1280, videoSourceConfig.resolution.width)
assertEquals(720, videoSourceConfig.resolution.height)
assertEquals(30, videoSourceConfig.fps)
}

@Test
fun buildVideoSourceConfigWithDifferentResolution() {
// Given
val videoCodecConfigs = setOf(
VideoCodecConfig(MediaFormat.MIMETYPE_VIDEO_AVC, resolution = Size(1280, 720)),
VideoCodecConfig(MediaFormat.MIMETYPE_VIDEO_AVC, resolution = Size(1920, 1080))
)

// When
val videoSourceConfig = SourceConfigUtils.buildVideoSourceConfig(videoCodecConfigs)

// Then
assertEquals(1920, videoSourceConfig.resolution.width)
assertEquals(1080, videoSourceConfig.resolution.height)
}

@Test
fun videoSourceConfigWithDifferentFps() {
// Given
val videoCodecConfigs = setOf(
VideoCodecConfig(MediaFormat.MIMETYPE_VIDEO_AVC, fps = 30),
VideoCodecConfig(MediaFormat.MIMETYPE_VIDEO_AVC, fps = 25)
)

// When
try {
SourceConfigUtils.buildVideoSourceConfig(videoCodecConfigs)
fail("All video codec configs must have the same fps")
} catch (e: IllegalArgumentException) {
assertEquals("All video codec configs must have the same fps", e.message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.streamer.file
package io.github.thibaultbee.streampack.core.streamer.single.file

import android.Manifest
import android.content.Context
Expand All @@ -36,8 +36,9 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks
import io.github.thibaultbee.streampack.core.streamers.single.AudioConfig
import io.github.thibaultbee.streampack.core.streamers.single.CameraSingleStreamer
import io.github.thibaultbee.streampack.core.streamers.single.VideoConfig
import io.github.thibaultbee.streampack.core.streamers.single.setConfig
import io.github.thibaultbee.streampack.core.utils.FileUtils
import io.github.thibaultbee.streampack.core.pipelines.utils.FileUtils
import io.github.thibaultbee.streampack.core.streamer.utils.StreamerUtils
import io.github.thibaultbee.streampack.core.streamer.utils.VideoUtils
import kotlinx.coroutines.test.runTest
import org.junit.Rule
import org.junit.Test
Expand All @@ -48,7 +49,7 @@ import kotlin.time.Duration.Companion.seconds

@LargeTest
@RunWith(Parameterized::class)
class CameraStreamerFileTest(
class CameraSingleStreamerFileTest(
private val descriptor: MediaDescriptor,
private val verify: Boolean,
endpoint: IEndpointInternal?
Expand Down Expand Up @@ -105,7 +106,7 @@ class CameraStreamerFileTest(
)

// Run stream
StreamerUtils.runStream(
StreamerUtils.runSingleStream(
streamer,
descriptor,
STREAM_DURATION_MS.milliseconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.streamer.file
package io.github.thibaultbee.streampack.core.streamer.single.file

import android.Manifest
import android.content.Context
Expand All @@ -25,11 +25,12 @@ import androidx.test.filters.LargeTest
import androidx.test.platform.app.InstrumentationRegistry
import androidx.test.rule.GrantPermissionRule
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor
import io.github.thibaultbee.streampack.core.pipelines.utils.FileUtils
import io.github.thibaultbee.streampack.core.streamer.utils.StreamerUtils
import io.github.thibaultbee.streampack.core.streamer.utils.VideoUtils
import io.github.thibaultbee.streampack.core.streamers.single.AudioConfig
import io.github.thibaultbee.streampack.core.streamers.single.CameraSingleStreamer
import io.github.thibaultbee.streampack.core.streamers.single.VideoConfig
import io.github.thibaultbee.streampack.core.streamers.single.setConfig
import io.github.thibaultbee.streampack.core.utils.FileUtils
import kotlinx.coroutines.test.runTest
import org.junit.Rule
import org.junit.Test
Expand All @@ -40,7 +41,7 @@ import kotlin.time.Duration.Companion.seconds
* Test [CameraSingleStreamer] with multiple streams.
*/
@LargeTest
class CameraStreamerMultiStreamTest {
class CameraSingleStreamerMultiStreamTest {
private val context: Context = InstrumentationRegistry.getInstrumentation().context
private val streamer = CameraSingleStreamer(context)

Expand Down Expand Up @@ -81,7 +82,7 @@ class CameraStreamerMultiStreamTest {
videoConfig: VideoConfig
) {
// Run stream
StreamerUtils.runStream(
StreamerUtils.runSingleStream(
streamer,
descriptor,
STREAM_DURATION_MS.milliseconds,
Expand Down
Loading

0 comments on commit eacd8be

Please sign in to comment.