Skip to content

Commit

Permalink
Miguration #1558
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Sep 10, 2024
1 parent 0b7c998 commit 3d06ea5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
46 changes: 34 additions & 12 deletions SRTHaishinKit/SRTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public actor SRTConnection {
/// This instance connect to server(true) or not(false)
@Published public private(set) var connected = false

private var mode: SRTMode = .caller
private var socket: SRTSocket?
private var streams: [SRTStream] = []
private var clients: [SRTSocket] = []
Expand Down Expand Up @@ -69,13 +70,24 @@ public actor SRTConnection {
}
}
}
Task {
let networkMonitor = await socket.makeNetworkMonitor()
self.networkMonitor = networkMonitor
await networkMonitor.startRunning()
for await event in await networkMonitor.event {
for stream in streams {
await stream.dispatch(event)
self.mode = mode
switch mode {
case .caller:
Task {
let networkMonitor = await socket.makeNetworkMonitor()
self.networkMonitor = networkMonitor
await networkMonitor.startRunning()
for await event in await networkMonitor.event {
for stream in streams {
await stream.dispatch(event)
}
}
}
case .listener:
Task {
for await client in await socket.accept {
connected = true
clients.append(client)
}
}
}
Expand Down Expand Up @@ -107,11 +119,21 @@ public actor SRTConnection {

func recv() {
Task {
guard let socket else {
return
}
for await data in await socket.inputs {
await streams.first?.doInput(data)
switch mode {
case .caller:
guard let socket else {
return
}
for await data in await socket.inputs {
await streams.first?.doInput(data)
}
case .listener:
guard let socket = clients.first else {
return
}
for await data in await socket.inputs {
await streams.first?.doInput(data)
}
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions SRTHaishinKit/SRTSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ final actor SRTSocket {
}
}

var accept: AsyncStream<SRTSocket> {
AsyncStream<SRTSocket> { condination in
Task.detached {
repeat {
do {
let client = try await self.accept()
condination.yield(client)
try await Task.sleep(nanoseconds: 1_000_000_000)
} catch {
condination.finish()
}
} while await self.connected
}
}
}

private(set) var mode: SRTMode = .caller
private(set) var perf: CBytePerfMon = .init()
private(set) var socket: SRTSOCKET = SRT_INVALID_SOCK
Expand Down Expand Up @@ -79,6 +95,23 @@ final actor SRTSocket {
init() {
}

init(socket: SRTSOCKET) async throws {
self.socket = socket
guard configure(.post) else {
throw makeSocketError()
}
if incomingBuffer.count < windowSizeC {
incomingBuffer = .init(count: Int(windowSizeC))
}
status = srt_getsockstate(socket)
switch status {
case SRTS_CONNECTED:
connected = true
default:
break
}
}

func open(_ addr: sockaddr_in, mode: SRTMode, options: [SRTSocketOption: any Sendable] = [:]) throws {
guard socket == SRT_INVALID_SOCK else {
return
Expand Down Expand Up @@ -158,6 +191,11 @@ final actor SRTSocket {
return SRTError.illegalState(message: error_message)
}

private func accept() async throws -> SRTSocket {
let accept = srt_accept(socket, nil, nil)
return try await SRTSocket(socket: accept)
}

@inline(__always)
private func sendmsg2(_ data: Data) -> Int32 {
return data.withUnsafeBytes { pointer in
Expand Down

0 comments on commit 3d06ea5

Please sign in to comment.