Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SRT listener mode. #1558

Merged
merged 1 commit into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Project name |Notes |License
- [x] Playback(beta)
- [ ] mode
- [x] caller
- [ ] listener
- [x] listener
- [ ] rendezvous

### Offscreen Rendering.
Expand Down
36 changes: 33 additions & 3 deletions SRTHaishinKit/SRTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public final class SRTConnection: NSObject {
/// This instance connect to server(true) or not(false)
@objc public private(set) dynamic var connected = false

var mode: SRTMode = .caller
var socket: SRTSocket<SRTConnection>? {
didSet {
socket?.delegate = self
Expand Down Expand Up @@ -59,7 +60,13 @@ public final class SRTConnection: NSObject {
socket = .init()
try socket?.open(addr, mode: mode, options: options)
self.uri = uri
connected = socket?.status == SRTS_CONNECTED
switch mode {
case .listener:
break
default:
connected = socket?.status == SRTS_CONNECTED
}
self.mode = mode
continuation.resume()
} catch {
continuation.resume(throwing: error)
Expand Down Expand Up @@ -98,14 +105,37 @@ public final class SRTConnection: NSObject {
extension SRTConnection: SRTSocketDelegate {
// MARK: SRTSocketDelegate
func socket(_ socket: SRTSocket<SRTConnection>, status: SRT_SOCKSTATUS) {
connected = socket.status == SRTS_CONNECTED
switch mode {
case .caller:
connected = socket.status == SRTS_CONNECTED
case .listener:
let connected = socket.status == SRTS_CONNECTED
guard !connected else {
return
}
if let indexOf = clients.firstIndex(where: { $0.socket == socket.socket }) {
clients[indexOf].delegate = nil
clients[indexOf].close()
clients.remove(at: indexOf)
}
self.connected = false
}
}

func socket(_ socket: SRTSocket<SRTConnection>, incomingDataAvailabled data: Data, bytes: Int32) {
streams.first?.doInput(data.subdata(in: 0..<Data.Index(bytes)))
}

func socket(_ socket: SRTSocket<SRTConnection>, didAcceptSocket client: SRTSocket<SRTConnection>) {
clients.append(client)
// only one client can accept.
if clients.isEmpty {
client.delegate = self
clients.append(client)
connected = true
client.startRunning()
client.doInput()
} else {
client.reject()
}
}
}
24 changes: 17 additions & 7 deletions SRTHaishinKit/SRTSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,18 @@ final class SRTSocket<T: SRTSocketDelegate> {
}

func doInput() {
incomingQueue.async {
repeat {
let result = self.recvmsg()
if 0 < result {
self.delegate?.socket(self, incomingDataAvailabled: self.incomingBuffer, bytes: result)
}
} while self.isRunning.value
switch mode {
case .caller:
incomingQueue.async {
repeat {
let result = self.recvmsg()
if 0 < result {
self.delegate?.socket(self, incomingDataAvailabled: self.incomingBuffer, bytes: result)
}
} while self.isRunning.value
}
case .listener:
break
}
}

Expand All @@ -161,6 +166,11 @@ final class SRTSocket<T: SRTSocketDelegate> {
return srt_bstats(socket, &perf, 1)
}

func reject() {
srt_setrejectreason(socket, Int32(SRT_REJ_CLOSE.rawValue))
srt_close(socket)
}

private func accept() {
let socket = srt_accept(socket, nil, nil)
do {
Expand Down
8 changes: 8 additions & 0 deletions Sources/IO/IOStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ open class IOStream: NSObject {
/// The lockQueue.
public let lockQueue: DispatchQueue = .init(label: "com.haishinkit.HaishinKit.IOStream.lock", qos: .userInitiated)

/// The boolean value that indicates audio samples allow access or not.
public internal(set) var audioSampleAccess = true
/// The boolean value that indicates video samples allow access or not.
public internal(set) var videoSampleAccess = true

/// The offscreen rendering object.
public var screen: Screen {
return mixer.videoIO.screen
Expand Down Expand Up @@ -560,6 +565,9 @@ extension IOStream: IOTellyUnitDelegate {
// MARK: IOTellyUnitDelegate
func tellyUnit(_ tellyUnit: IOTellyUnit, dequeue sampleBuffer: CMSampleBuffer) {
mixer.videoIO.view?.enqueue(sampleBuffer)
if videoSampleAccess {
observers.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
}

func tellyUnit(_ tellyUnit: IOTellyUnit, didBufferingChanged: Bool) {
Expand Down
4 changes: 0 additions & 4 deletions Sources/RTMP/RTMPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ open class RTMPStream: IOStream {
public internal(set) var info = RTMPStreamInfo()
/// The object encoding (AMF). Framework supports AMF0 only.
public private(set) var objectEncoding: RTMPObjectEncoding = RTMPConnection.defaultObjectEncoding
/// The boolean value that indicates audio samples allow access or not.
public private(set) var audioSampleAccess = true
/// The boolean value that indicates video samples allow access or not.
public private(set) var videoSampleAccess = true
/// Incoming audio plays on the stream or not.
public var receiveAudio = true {
didSet {
Expand Down