Skip to content

Commit

Permalink
fixed SRTSocket would deadlock.
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Aug 18, 2024
1 parent 56a7638 commit 92a62e1
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 143 deletions.
2 changes: 1 addition & 1 deletion Examples/NetStreamSwitcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ actor NetStreamSwitcher {
guard let connection = connection as? SRTConnection else {
return
}
await connection.close()
try? await connection.close()
logger.info("conneciton.close")
}
}
Expand Down
12 changes: 8 additions & 4 deletions HaishinKit.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
BC3802192AB6AD79001AE399 /* AudioMixerTrackTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC3802182AB6AD79001AE399 /* AudioMixerTrackTests.swift */; };
BC3C56712C3F75B200C83107 /* RTMPStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC3C56702C3F75B200C83107 /* RTMPStatus.swift */; };
BC3E384429C216BB007CD972 /* ADTSReaderTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC3E384329C216BB007CD972 /* ADTSReaderTests.swift */; };
BC3E49D02C45520B00A9C5B6 /* HKStreamPublisher.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC3E49CF2C45520B00A9C5B6 /* HKStreamPublisher.swift */; };
BC3E49D02C45520B00A9C5B6 /* HKStreamIngestor.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC3E49CF2C45520B00A9C5B6 /* HKStreamIngestor.swift */; };
BC3E49D12C455A1B00A9C5B6 /* NetStreamSwitcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = BCE0E33B2AD369410082C16F /* NetStreamSwitcher.swift */; };
BC3E49D22C45617100A9C5B6 /* SRTHaishinKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = BCCC45962AA289FA0016EFE8 /* SRTHaishinKit.framework */; };
BC3E49D32C45617100A9C5B6 /* SRTHaishinKit.framework in Embed Frameworks */ = {isa = PBXBuildFile; fileRef = BCCC45962AA289FA0016EFE8 /* SRTHaishinKit.framework */; settings = {ATTRIBUTES = (CodeSignOnCopy, RemoveHeadersOnCopy, ); }; };
Expand Down Expand Up @@ -208,6 +208,7 @@
BC72EF2C25F24E480068F040 /* Logboard.xcframework in Embed Frameworks */ = {isa = PBXBuildFile; fileRef = BC34DFD125EBB12C005F975A /* Logboard.xcframework */; settings = {ATTRIBUTES = (CodeSignOnCopy, RemoveHeadersOnCopy, ); }; };
BC72EF3225F24E500068F040 /* Logboard.xcframework in Frameworks */ = {isa = PBXBuildFile; fileRef = BC34DFD125EBB12C005F975A /* Logboard.xcframework */; };
BC72EF3325F24E500068F040 /* Logboard.xcframework in Embed Frameworks */ = {isa = PBXBuildFile; fileRef = BC34DFD125EBB12C005F975A /* Logboard.xcframework */; settings = {ATTRIBUTES = (CodeSignOnCopy, RemoveHeadersOnCopy, ); }; };
BC77ED6A2C7261880003427A /* HKStreamReadyState.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC77ED692C7261800003427A /* HKStreamReadyState.swift */; };
BC7A0E442B088FA7005FB2F7 /* Example_visionOSApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC7A0E432B088FA7005FB2F7 /* Example_visionOSApp.swift */; };
BC7A0E462B088FA7005FB2F7 /* ContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = BC7A0E452B088FA7005FB2F7 /* ContentView.swift */; };
BC7A0E552B0894B9005FB2F7 /* HaishinKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 2945CBBD1B4BE66000104112 /* HaishinKit.framework */; };
Expand Down Expand Up @@ -669,7 +670,7 @@
BC3802182AB6AD79001AE399 /* AudioMixerTrackTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AudioMixerTrackTests.swift; sourceTree = "<group>"; };
BC3C56702C3F75B200C83107 /* RTMPStatus.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RTMPStatus.swift; sourceTree = "<group>"; };
BC3E384329C216BB007CD972 /* ADTSReaderTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ADTSReaderTests.swift; sourceTree = "<group>"; };
BC3E49CF2C45520B00A9C5B6 /* HKStreamPublisher.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HKStreamPublisher.swift; sourceTree = "<group>"; };
BC3E49CF2C45520B00A9C5B6 /* HKStreamIngestor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HKStreamIngestor.swift; sourceTree = "<group>"; };
BC3E49D72C466B5D00A9C5B6 /* RTMPAuthenticator.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RTMPAuthenticator.swift; sourceTree = "<group>"; };
BC4231632BCA5F28003A80DC /* AudioMixerByMultiTrack.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AudioMixerByMultiTrack.swift; sourceTree = "<group>"; };
BC4231692BCA8BE5003A80DC /* AudioMixerBySingleTrack.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AudioMixerBySingleTrack.swift; sourceTree = "<group>"; };
Expand All @@ -695,6 +696,7 @@
BC6499A82C3C4E77002E8186 /* RTMPResponse.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RTMPResponse.swift; sourceTree = "<group>"; };
BC6889772B011AEB0026A4C2 /* CaptureSession.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CaptureSession.swift; sourceTree = "<group>"; };
BC6FC91D29609A6800A746EE /* ShapeFactory.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ShapeFactory.swift; sourceTree = "<group>"; };
BC77ED692C7261800003427A /* HKStreamReadyState.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HKStreamReadyState.swift; sourceTree = "<group>"; };
BC7A0E3D2B088FA7005FB2F7 /* Example visionOS.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = "Example visionOS.app"; sourceTree = BUILT_PRODUCTS_DIR; };
BC7A0E432B088FA7005FB2F7 /* Example_visionOSApp.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Example_visionOSApp.swift; sourceTree = "<group>"; };
BC7A0E452B088FA7005FB2F7 /* ContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContentView.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1333,9 +1335,10 @@
BC37F0B72C4185B7001428F0 /* AudioPlayerNode.swift */,
29AF3FCE1D7C744C00E41212 /* HKStream.swift */,
BCF13C582C666E6400ADDBA9 /* HKStreamBitRateStrategy.swift */,
BC3E49CF2C45520B00A9C5B6 /* HKStreamIngestor.swift */,
BCD8702A2BC266CD009E495B /* HKStreamOutput.swift */,
BC2170672C721034007CF0BF /* HKStreamPlayer.swift */,
BC3E49CF2C45520B00A9C5B6 /* HKStreamPublisher.swift */,
BC77ED692C7261800003427A /* HKStreamReadyState.swift */,
2976A47D1D48C5C700B53EF2 /* HKStreamRecorder.swift */,
BCA604D02C4FC43C00C25989 /* MediaLink.swift */,
29B8768D1CD70AFE00FC07DA /* SoundTransform.swift */,
Expand Down Expand Up @@ -1881,6 +1884,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
BC77ED6A2C7261880003427A /* HKStreamReadyState.swift in Sources */,
BC4914AE28DDF445009E2DF6 /* VTDecompressionSession+Extension.swift in Sources */,
29B876B11CD70B2800FC07DA /* RTMPMessage.swift in Sources */,
BCB9773F2621812800C9A649 /* ISOTypeBufferUtil.swift in Sources */,
Expand Down Expand Up @@ -1941,7 +1945,7 @@
BC42316A2BCA8BE5003A80DC /* AudioMixerBySingleTrack.swift in Sources */,
BC562DCB29576D220048D89A /* AVCaptureSession.Preset+Extension.swift in Sources */,
BC2170682C72103C007CF0BF /* HKStreamPlayer.swift in Sources */,
BC3E49D02C45520B00A9C5B6 /* HKStreamPublisher.swift in Sources */,
BC3E49D02C45520B00A9C5B6 /* HKStreamIngestor.swift in Sources */,
29B876AB1CD70B2800FC07DA /* AMF0Serializer.swift in Sources */,
BC4231642BCA5F28003A80DC /* AudioMixerByMultiTrack.swift in Sources */,
29B8765B1CD70A7900FC07DA /* AudioCodec.swift in Sources */,
Expand Down
23 changes: 14 additions & 9 deletions SRTHaishinKit/SRTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import Foundation
import HaishinKit
import libsrt

/// The SRTConnection class create a two-way SRT connection.
/// An actor that provides the interface to control a two-way SRT connection.
public actor SRTConnection {
/// The error comain codes.
/// The error domain codes.
public enum Error: Swift.Error {
/// An invalid internal stare.
case invalidState
/// The uri isn’t supported.
case unsupportedUri(_ uri: URL?)
/// The fail to connect.
Expand Down Expand Up @@ -82,29 +84,32 @@ public actor SRTConnection {
}

/// Closes the connection from the server.
public func close() async {
public func close() async throws {
guard connected else {
throw Error.invalidState
}
await networkMonitor?.stopRunning()
for client in clients {
await client.close()
}
clients.removeAll()
for stream in streams {
await stream.close()
}
await socket?.close()
clients.removeAll()
await networkMonitor?.stopRunning()
connected = false
}

func output(_ data: Data) async {
func send(_ data: Data) async {
await socket?.send(data)
}

func listen() {
func recv() {
Task {
guard let stream = await socket?.recv() else {
guard let socket else {
return
}
for await data in stream {
for await data in await socket.inputs {
await streams.first?.doInput(data)
}
}
Expand Down
38 changes: 20 additions & 18 deletions SRTHaishinKit/SRTSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@ private let kSRTSOcket_payloadSize: Int = 1316
final actor SRTSocket {
static let payloadSize: Int = 1316

var timeout: Int = 0
var options: [SRTSocketOption: Any] = [:]
var inputs: AsyncStream<Data> {
AsyncStream<Data> { condination in
// If Task.detached is not used, closing will result in a deadlock.
Task.detached {
while await self.connected {
let result = await self.recvmsg()
if 0 < result {
condination.yield(await self.incomingBuffer.subdata(in: 0..<Data.Index(result)))
} else {
condination.finish()
}
}
}
}
}

private(set) var mode: SRTMode = .caller
private(set) var perf: CBytePerfMon = .init()
private(set) var socket: SRTSOCKET = SRT_INVALID_SOCK
Expand Down Expand Up @@ -53,6 +67,7 @@ final actor SRTSocket {
}
}
}
private(set) var options: [SRTSocketOption: Any] = [:]
private(set) var connected = false
private var totalBytesIn: Int = 0
private var totalBytesOut: Int = 0
Expand Down Expand Up @@ -121,21 +136,6 @@ final actor SRTSocket {
}
}

func recv() -> AsyncStream<Data> {
return AsyncStream<Data> { condination in
Task {
repeat {
let result = recvmsg()
if 0 < result {
condination.yield(incomingBuffer.subdata(in: 0..<Data.Index(result)))
} else {
condination.finish()
}
} while connected
}
}
}

func configure(_ binding: SRTSocketOption.Binding) -> Bool {
let failures = SRTSocketOption.configure(socket, binding: binding, options: options)
guard failures.isEmpty else {
Expand Down Expand Up @@ -170,12 +170,14 @@ final actor SRTSocket {

@inline(__always)
private func recvmsg() -> Int32 {
return incomingBuffer.withUnsafeMutableBytes { pointer in
let result = incomingBuffer.withUnsafeMutableBytes { pointer in
guard let buffer = pointer.baseAddress?.assumingMemoryBound(to: CChar.self) else {
return SRT_ERROR
}
return srt_recvmsg(socket, buffer, windowSizeC)
}
totalBytesIn += Int(result)
return result
}
}

Expand Down
78 changes: 30 additions & 48 deletions SRTHaishinKit/SRTStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ public actor SRTStream {
public private(set) var readyState: HKStreamReadyState = .idle
private var name: String?
private var action: (() async -> Void)?
private weak var connection: SRTConnection?
private var outputs: [any HKStreamOutput] = []
private var bitrateStorategy: (any HKStreamBitRateStrategy)?
private lazy var writer = TSWriter()
private lazy var reader = TSReader()
private lazy var player = HKStreamPlayer(self)
private lazy var publisher = HKStreamPublisher()
private var outputs: [any HKStreamOutput] = []
private var videoFormat: CMFormatDescription?
private var audioFormat: CMFormatDescription?
private var bitrateStorategy: (any HKStreamBitRateStrategy)?
private lazy var ingestor = HKStreamIngestor()
private weak var connection: SRTConnection?

/// Creates a new stream object.
public init(connection: SRTConnection) {
Expand All @@ -33,35 +31,35 @@ public actor SRTStream {
guard let name else {
switch readyState {
case .publishing:
readyState = .idle
await close()
default:
break
}
return
}
if await connection?.connected == true {
readyState = .publishing
publisher.startRunning()
ingestor.startRunning()
writer.expectedMedias.removeAll()
if let videoFormat {
if ingestor.videoInputFormat != nil {
writer.expectedMedias.insert(.video)
}
if let audioFormat {
if ingestor.audioInputFormat != nil {
writer.expectedMedias.insert(.audio)
}
Task {
for try await buffer in publisher.video where publisher.isRunning {
for try await buffer in ingestor.video where ingestor.isRunning {
append(buffer)
}
}
Task {
for await buffer in publisher.audio where publisher.isRunning {
for await buffer in ingestor.audio where ingestor.isRunning {
append(buffer.0, when: buffer.1)
}
}
Task {
for await data in writer.output where publisher.isRunning {
await connection?.output(data)
for await data in writer.output where ingestor.isRunning {
await connection?.send(data)
}
}
} else {
Expand All @@ -74,15 +72,15 @@ public actor SRTStream {
guard let name else {
switch readyState {
case .playing:
readyState = .idle
await close()
default:
break
}
return
}
if await connection?.connected == true {
await player.startRunning()
await connection?.listen()
await connection?.recv()
Task {
for try await buffer in reader.output where await player.isRunning {
await player.append(buffer.1)
Expand All @@ -99,7 +97,7 @@ public actor SRTStream {
if readyState == .idle {
return
}
publisher.stopRunning()
ingestor.stopRunning()
Task { await player.stopRunning() }
readyState = .idle
}
Expand All @@ -112,19 +110,19 @@ public actor SRTStream {
extension SRTStream: HKStream {
// MARK: HKStream
public var audioSettings: AudioCodecSettings {
publisher.audioSettings
ingestor.audioSettings
}

public var videoSettings: VideoCodecSettings {
publisher.videoSettings
ingestor.videoSettings
}

public func setAudioSettings(_ audioSettings: AudioCodecSettings) {
publisher.audioSettings = audioSettings
ingestor.audioSettings = audioSettings
}

public func setVideoSettings(_ videoSettings: VideoCodecSettings) {
publisher.videoSettings = videoSettings
ingestor.videoSettings = videoSettings
}

public func setBitrateStorategy(_ bitrateStorategy: (some HKStreamBitRateStrategy)?) {
Expand All @@ -134,19 +132,11 @@ extension SRTStream: HKStream {
public func append(_ sampleBuffer: CMSampleBuffer) {
switch sampleBuffer.formatDescription?.mediaType {
case .video:
switch readyState {
case .publishing:
if sampleBuffer.formatDescription?.isCompressed == true {
writer.videoFormat = sampleBuffer.formatDescription
if sampleBuffer.formatDescription?.isCompressed == true {
writer.append(sampleBuffer)
} else {
publisher.append(sampleBuffer)
}
default:
break
}
if sampleBuffer.formatDescription?.isCompressed == false {
videoFormat = sampleBuffer.formatDescription
writer.append(sampleBuffer)
} else {
ingestor.append(sampleBuffer)
outputs.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
default:
Expand All @@ -155,24 +145,16 @@ extension SRTStream: HKStream {
}

public func append(_ audioBuffer: AVAudioBuffer, when: AVAudioTime) {
switch readyState {
case .publishing:
switch audioBuffer {
case let audioBuffer as AVAudioPCMBuffer:
publisher.append(audioBuffer, when: when)
case let audioBuffer as AVAudioCompressedBuffer:
writer.audioFormat = audioBuffer.format
writer.append(audioBuffer, when: when)
default:
break
}
switch audioBuffer {
case let audioBuffer as AVAudioPCMBuffer:
ingestor.append(audioBuffer, when: when)
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
case let audioBuffer as AVAudioCompressedBuffer:
writer.audioFormat = audioBuffer.format
writer.append(audioBuffer, when: when)
default:
break
}
if audioBuffer is AVAudioPCMBuffer {
audioFormat = audioBuffer.format.formatDescription
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
}
}

public func attachAudioPlayer(_ audioPlayer: AudioPlayer?) {
Expand Down
14 changes: 0 additions & 14 deletions Sources/HKStream/HKStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,3 @@ public protocol HKStream: Actor, MediaMixerOutput {
/// Dispatch a network monitor event.
func dispatch(_ event: NetworkMonitorEvent)
}

/// The enumeration defines the state a HKStream client is in.
public enum HKStreamReadyState: Int, Sendable {
/// The stream is idling.
case idle
/// The stream has sent a request to play and is waiting for approval from the server.
case play
/// The stream is playing.
case playing
/// The streamhas sent a request to publish and is waiting for approval from the server.
case publish
/// The stream is publishing.
case publishing
}
Loading

0 comments on commit 92a62e1

Please sign in to comment.