Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use channel.address instead of id #582

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 59 additions & 25 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ class MqttAdapter extends Adapter {
}

private getSecurityReqs() {

let userAndPasswordSecurityReq
let X509SecurityReq

const securityRequirements = this.AsyncAPIServer.security().map(e => e.all().map(e => e.scheme()))
const securityRequirements = this.AsyncAPIServer.security().map((e) =>
e.all().map((e) => e.scheme())
)

securityRequirements.forEach(security => {
securityRequirements.forEach((security) => {
for (const sec of security) {
const securityType = sec.type().toLocaleLowerCase()
switch (securityType) {
Expand All @@ -62,14 +63,23 @@ class MqttAdapter extends Adapter {
X509SecurityReq = sec
break
default:
this.emit("error", new Error(`Invalid security type '${securityType}' specified for server '${this.serverName}'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values(SecurityTypes)}`))
this.emit(
'error',
new Error(
`Invalid security type '${securityType}' specified for server '${
this.serverName
}'. Please double-check your configuration to ensure you're using a supported security type. Here is a list of supported types: ${Object.values(
SecurityTypes
)}`
)
)
}
}
})

return {
userAndPasswordSecurityReq,
X509SecurityReq
X509SecurityReq,
}
}

Expand Down Expand Up @@ -110,7 +120,9 @@ class MqttAdapter extends Adapter {
this.client.on('close', () => {
this.emit('close', {
connection: this.client,
channels: this.channelNames,
channels: this.channelNames.map((channelName) =>
this.parsedAsyncAPI.channels().get(channelName).address()
),
})
})

Expand Down Expand Up @@ -139,23 +151,40 @@ class MqttAdapter extends Adapter {

private subscribe(channels: string[]) {
channels.forEach((channel) => {
const binding = this.parsedAsyncAPI.channels().get(channel).bindings().get('mqtt')?.value()
this.client.subscribe(channel, {
qos: binding?.qos ? binding.qos : 0,
}, (err, granted) => {
if (err) {
logLineWithIcon('x', `Error while trying to subscribe to \`${channel}\` topic.`, {
highlightedWords: [channel],
iconColor: '#f00',
disableEmojis: true,
})
console.log(err.message)
return
const binding = this.parsedAsyncAPI
.channels()
.get(channel)
.bindings()
.get('mqtt')
?.value()
this.client.subscribe(
channel,
{
qos: binding?.qos ? binding.qos : 0,
},
(err, granted) => {
if (err) {
logLineWithIcon(
'x',
`Error while trying to subscribe to \`${channel}\` topic.`,
{
highlightedWords: [channel],
iconColor: '#f00',
disableEmojis: true,
}
)
console.log(err.message)
return
}
logLineWithIcon(
':zap:',
`Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`,
{
highlightedWords: [channel],
}
)
}
logLineWithIcon(':zap:', `Subscribed to \`${channel}\` topic with QoS ${granted?.[0].qos}`, {
highlightedWords: [channel],
})
})
)
})
}

Expand Down Expand Up @@ -216,7 +245,12 @@ class MqttAdapter extends Adapter {

_send(message: GleeMessage): Promise<void> {
return new Promise((resolve, reject) => {
const binding = this.parsedAsyncAPI.channels().get(message.channel).bindings().get('mqtt')?.value()
const binding = this.parsedAsyncAPI
.channels()
.get(message.channel)
.bindings()
.get('mqtt')
?.value()
this.client.publish(
message.channel,
message.payload,
Expand Down Expand Up @@ -244,11 +278,11 @@ class MqttAdapter extends Adapter {
dup: packet.dup,
length: packet.length,
}

const id = this.parsedAsyncAPI.channels().filter(channel => channel.address() === packet.topic)[0].id()
return new GleeMessage({
payload: packet.payload,
headers,
channel: packet.topic,
channel: id,
})
}

Expand Down