Skip to content

Commit

Permalink
resolve non-exhaustive patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
rawhat committed Dec 19, 2023
1 parent 654d007 commit da32902
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 121 deletions.
13 changes: 3 additions & 10 deletions src/glisten/acceptor.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ pub fn start(
}
}
}
msg -> {
logger.error(#("Unknown message type", msg))
actor.Stop(process.Abnormal("Unknown message type"))
}
}
},
))
Expand Down Expand Up @@ -115,12 +111,9 @@ pub fn start_pool(
frequency_period: 1,
init: fn(children) {
iterator.range(from: 0, to: pool.pool_count)
|> iterator.fold(
children,
fn(children, _index) {
supervisor.add(children, supervisor.worker(fn(_arg) { start(pool) }))
},
)
|> iterator.fold(children, fn(children, _index) {
supervisor.add(children, supervisor.worker(fn(_arg) { start(pool) }))
})
},
))
}
216 changes: 105 additions & 111 deletions src/glisten/handler.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import gleam/erlang/process.{type Selector, type Subject}
import gleam/function
import gleam/option.{type Option, Some}
import gleam/otp/actor
import gleam/otp/port.{type Port}
import gleam/result
import gleam/string
import glisten/socket.{type Socket}
Expand All @@ -17,9 +16,7 @@ pub type InternalMessage {
Close
Ready
ReceiveMessage(BitArray)
Ssl(socket: Port, data: BitArray)
SslClosed
Tcp(socket: Port, data: BitArray)
TcpClosed
}

Expand Down Expand Up @@ -73,128 +70,125 @@ pub type Handler(user_message, data) {
pub fn start(
handler: Handler(user_message, data),
) -> Result(Subject(Message(user_message)), actor.StartError) {
actor.start_spec(actor.Spec(
init: fn() {
let subject = process.new_subject()

let #(initial_state, user_selector) = handler.on_init()

let selector =
process.new_selector()
|> process.selecting_record3(
atom.create_from_string("tcp"),
fn(_sock, data) {
actor.start_spec(
actor.Spec(
init: fn() {
let subject = process.new_subject()
let #(initial_state, user_selector) = handler.on_init()
let selector =
process.new_selector()
|> process.selecting_record3(atom.create_from_string("tcp"), fn(
_sock,
data,
) {
data
|> dynamic.bit_array
|> result.unwrap(<<>>)
|> ReceiveMessage
},
)
|> process.selecting_record3(
atom.create_from_string("ssl"),
fn(_sock, data) {
})
|> process.selecting_record3(atom.create_from_string("ssl"), fn(
_sock,
data,
) {
data
|> dynamic.bit_array
|> result.unwrap(<<>>)
|> ReceiveMessage
},
)
|> process.selecting_record2(
atom.create_from_string("ssl_closed"),
fn(_nil) { SslClosed },
)
|> process.selecting_record2(
atom.create_from_string("tcp_closed"),
fn(_nil) { TcpClosed },
)
|> process.map_selector(Internal)
|> process.selecting(subject, function.identity)

let selector = case user_selector {
Some(sel) ->
sel
|> process.map_selector(User)
|> process.merge_selector(selector, _)
_ -> selector
}

actor.Ready(
LoopState(
client_ip: handler.transport.peername(handler.socket),
socket: handler.socket,
sender: subject,
transport: handler.transport,
data: initial_state,
),
selector,
)
},
init_timeout: 1000,
loop: fn(msg, state) {
let connection =
Connection(
socket: state.socket,
client_ip: state.client_ip,
transport: state.transport,
sender: state.sender,
})
|> process.selecting_record2(atom.create_from_string("ssl_closed"), fn(
_nil,
) {
SslClosed
})
|> process.selecting_record2(atom.create_from_string("tcp_closed"), fn(
_nil,
) {
TcpClosed
})
|> process.map_selector(Internal)
|> process.selecting(subject, function.identity)
let selector = case user_selector {
Some(sel) ->
sel
|> process.map_selector(User)
|> process.merge_selector(selector, _)
_ -> selector
}
actor.Ready(
LoopState(
client_ip: handler.transport.peername(handler.socket),
socket: handler.socket,
sender: subject,
transport: handler.transport,
data: initial_state,
),
selector,
)
case msg {
Internal(TcpClosed) | Internal(SslClosed) | Internal(Close) ->
case state.transport.close(state.socket) {
Ok(Nil) -> {
let _ = case handler.on_close {
Some(on_close) -> on_close(state.data)
_ -> Nil
},
init_timeout: 1000,
loop: fn(msg, state) {
let connection =
Connection(
socket: state.socket,
client_ip: state.client_ip,
transport: state.transport,
sender: state.sender,
)
case msg {
Internal(TcpClosed) | Internal(SslClosed) | Internal(Close) ->
case state.transport.close(state.socket) {
Ok(Nil) -> {
let _ = case handler.on_close {
Some(on_close) -> on_close(state.data)
_ -> Nil
}
actor.Stop(process.Normal)
}
actor.Stop(process.Normal)
Error(err) -> actor.Stop(process.Abnormal(string.inspect(err)))
}
Error(err) -> actor.Stop(process.Abnormal(string.inspect(err)))
}
Internal(Ready) ->
state.socket
|> state.transport.handshake
|> result.replace_error("Failed to handshake socket")
|> result.then(fn(_ok) {
state.transport.set_opts(
state.socket,
[options.ActiveMode(options.Once)],
)
|> result.replace_error("Failed to set socket active")
})
|> result.replace(actor.continue(state))
|> result.map_error(fn(reason) {
actor.Stop(process.Abnormal(reason))
})
|> result.unwrap_both
User(msg) -> {
let msg = Custom(msg)
case handler.loop(msg, state.data, connection) {
actor.Continue(next_state, _selector) -> {
let assert Ok(Nil) =
state.transport.set_opts(
state.socket,
[options.ActiveMode(options.Once)],
)
actor.continue(LoopState(..state, data: next_state))
Internal(Ready) ->
state.socket
|> state.transport.handshake
|> result.replace_error("Failed to handshake socket")
|> result.then(fn(_ok) {
state.transport.set_opts(state.socket, [
options.ActiveMode(options.Once),
])
|> result.replace_error("Failed to set socket active")
})
|> result.replace(actor.continue(state))
|> result.map_error(fn(reason) {
actor.Stop(process.Abnormal(reason))
})
|> result.unwrap_both
User(msg) -> {
let msg = Custom(msg)
case handler.loop(msg, state.data, connection) {
actor.Continue(next_state, _selector) -> {
let assert Ok(Nil) =
state.transport.set_opts(state.socket, [
options.ActiveMode(options.Once),
])
actor.continue(LoopState(..state, data: next_state))
}
actor.Stop(reason) -> actor.Stop(reason)
}
actor.Stop(reason) -> actor.Stop(reason)
}
}
Internal(ReceiveMessage(msg)) -> {
let msg = Packet(msg)
case handler.loop(msg, state.data, connection) {
actor.Continue(next_state, _selector) -> {
let assert Ok(Nil) =
state.transport.set_opts(
state.socket,
[options.ActiveMode(options.Once)],
)
actor.continue(LoopState(..state, data: next_state))
Internal(ReceiveMessage(msg)) -> {
let msg = Packet(msg)
case handler.loop(msg, state.data, connection) {
actor.Continue(next_state, _selector) -> {
let assert Ok(Nil) =
state.transport.set_opts(state.socket, [
options.ActiveMode(options.Once),
])
actor.continue(LoopState(..state, data: next_state))
}
actor.Stop(reason) -> actor.Stop(reason)
}
actor.Stop(reason) -> actor.Stop(reason)
}
}
}
},
))
},
),
)
}

0 comments on commit da32902

Please sign in to comment.