Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for CoAP transport #308

Open
wants to merge 68 commits into
base: develop_coap
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
71aa63e
First commit
Woguagua Jul 23, 2024
1f94e76
Add basic coap message definitions
Woguagua Jul 23, 2024
31a96e5
Complete CoapDecoder
Woguagua Jul 23, 2024
aeaaf92
Fix bug CoapDecoder
Woguagua Jul 23, 2024
5e978e1
Add CoapEncoder
Woguagua Jul 23, 2024
ff05063
Add coap relative handlers and processors
Woguagua Jul 23, 2024
addb17b
Add todo mark of coap handlers and processors
Woguagua Jul 23, 2024
5f1d3eb
fix bug
Woguagua Jul 24, 2024
18a32de
add component annotation
Woguagua Jul 24, 2024
2f238ae
Complete coap message storage without coap notification, now a new me…
Woguagua Jul 24, 2024
47fa71b
CoapSession relative, not completed
Woguagua Jul 29, 2024
150f12d
CoapSession relative, not completed
Woguagua Jul 29, 2024
3a06830
CoapSession relative, not completed
Woguagua Jul 29, 2024
fb2b860
CoapSession relative, not completed
Woguagua Jul 30, 2024
b19e623
CoapSession relative, complete basic subscribe and publish, can send …
Woguagua Jul 30, 2024
f328154
Add HashedWheeltimer in CoapSessionLoopImpl to check session alive pe…
Woguagua Jul 31, 2024
a45d2e9
Complete: send final message when removing the session.
Woguagua Jul 31, 2024
3cffd37
Complete: store retain message, send retain message when subscribe.
Woguagua Jul 31, 2024
b72774e
Add RPC forward for coap when receiving datagram packet. But need fur…
Woguagua Aug 1, 2024
67c0cf9
code formatting.
Woguagua Aug 1, 2024
d32b452
Complete RPC forward.
Woguagua Aug 1, 2024
d430178
Add CoapRetryManager
Woguagua Aug 2, 2024
599d2b2
Complete coap retry, but need formatting and removing the write actio…
Woguagua Aug 2, 2024
99efdb1
Fix bug: can not receive ack with empty message. And complete retry m…
Woguagua Aug 2, 2024
32932bd
Fix bug: old version will construct a new session each time receiving…
Woguagua Aug 2, 2024
cde70f9
Add response cache, can send the same response for already received r…
Woguagua Aug 2, 2024
6335693
move send message action to channelmanager, but bug exist
Woguagua Aug 2, 2024
05cac75
bug fix, complete: move send message action to channelmanager
Woguagua Aug 5, 2024
808471f
Move all writeAndFlush to DatagramChannelManager.
Woguagua Aug 5, 2024
358a379
code formatting.
Woguagua Aug 5, 2024
525ca0c
code formatting.
Woguagua Aug 5, 2024
a7a7cab
Complete: update to latest messageID in retry manager, to keep increa…
Woguagua Aug 5, 2024
01fa0c7
Complete: refresh subscription each time receiving an ACK
Woguagua Aug 6, 2024
972acaa
Complete: refresh subscription each time receiving an ACK
Woguagua Aug 6, 2024
9f219bc
Complete: remove session if exceed max retry time.
Woguagua Aug 6, 2024
09170a8
remove unused method
Woguagua Aug 6, 2024
f40a134
fix bug: do not update messageID in retry manager if session is null.
Woguagua Aug 6, 2024
b059f4b
Add comments and code formatting for CoapDecoder.
Woguagua Aug 6, 2024
7a848b8
Add comment to CoapEncoder.
Woguagua Aug 6, 2024
144befb
Add comments and code formatting.
Woguagua Aug 6, 2024
78c2a62
Add comments and code formatting.
Woguagua Aug 6, 2024
81c73af
Add comments and code formatting.
Woguagua Aug 6, 2024
6ca8fe5
Add comments and code formatting.
Woguagua Aug 7, 2024
f7fd9b2
Add comments and code formatting.
Woguagua Aug 7, 2024
5b6ea39
Fix bug: code style checking error.
Woguagua Aug 8, 2024
f4e4340
add test
Woguagua Aug 8, 2024
25cdbc3
resolve merge conflict
Woguagua Aug 8, 2024
29f6c20
Add test.
Woguagua Aug 8, 2024
d2bacb4
Add test
Woguagua Aug 8, 2024
387aeb9
Add test
Woguagua Aug 9, 2024
73a0e66
Add test
Woguagua Aug 9, 2024
477a383
Add coapTokenManager, but has not check yet.
Woguagua Aug 9, 2024
d99553d
Add test to CoapTokenManager.
Woguagua Aug 12, 2024
7e20f7b
Add CoapConnectHandler and relative test.
Woguagua Aug 12, 2024
807ccb5
Add CoapDisconnectHandler and relative test.
Woguagua Aug 12, 2024
ae2d6e2
Add CoapHeartbeatHandler and relative test. modify * imports.
Woguagua Aug 12, 2024
34c9e6a
Add PasswordHashUtil and test.
Woguagua Aug 13, 2024
be731de
Add CoapAuthManager and relative test.
Woguagua Aug 13, 2024
fe55309
CoapAuthManager added to relative handler
Woguagua Aug 13, 2024
3aebc91
Fix typo and code formatting.
Woguagua Aug 16, 2024
9a91b4a
Change CoapTokenManager to CoapTokenUtil. Todo: modify other Handler …
Woguagua Aug 19, 2024
7f2f69e
modify other Handler and check authToken in connection mode.
Woguagua Aug 20, 2024
4ad44d9
Remove CoapTokenManager.
Woguagua Aug 20, 2024
4b75813
fix typo
Woguagua Aug 20, 2024
d812bb5
change base64 decoder to urlDecoder.
Woguagua Aug 20, 2024
4c0a933
fix bug: CoapDecoder missing authToken.
Woguagua Aug 20, 2024
e864dc9
fix bug: should not add auth check in CoapAckHandler.
Woguagua Aug 20, 2024
8aafe25
Add response for unauthorized request in connection mode.
Woguagua Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distribution/conf/meta.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
# limitations under the License.


selfAddress=
membersAddress=
selfAddress=11.159.23.108:25000
membersAddress=11.159.23.108:25000,11.159.23.111:25000,11.159.23.104:25000
12 changes: 6 additions & 6 deletions distribution/conf/service.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# limitations under the License.


username=
secretKey=
username=test
secretKey=test

NAMESRV_ADDR=
eventNotifyRetryTopic=
clientRetryTopic=
NAMESRV_ADDR=11.159.23.108:9876
eventNotifyRetryTopic=eventNotifyRetryTopic
clientRetryTopic=clientRetryTopic

metaAddr=
metaAddr=11.159.23.108:25000,11.159.23.111:25000,11.159.23.104:25000
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.common.model;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class CoapMessage {
private int version;
private CoapMessageType type;
private int tokenLength;
private CoapMessageCode code;
private int messageId;
private byte[] token;
private List<CoapMessageOption> options = new ArrayList<>();
private byte[] payload;
private InetSocketAddress remoteAddress;

public CoapMessage(int version, CoapMessageType type, int tokenLength, CoapMessageCode code, int messageId, byte[] token, byte[] payload, InetSocketAddress remoteAddress) {
this.version = version;
this.type = type;
this.tokenLength = tokenLength;
this.code = code;
this.messageId = messageId;
this.token = token;
this.payload = payload;
this.remoteAddress = remoteAddress;
}

public CoapMessage(int version, CoapMessageType type, int tokenLength, CoapMessageCode code, int messageId, byte[] token, InetSocketAddress remoteAddress) {
this.version = version;
this.type = type;
this.tokenLength = tokenLength;
this.code = code;
this.messageId = messageId;
this.token = token;
this.remoteAddress = remoteAddress;
}

public int getVersion() {
return version;
}

public void setVersion(int version) {
this.version = version;
}

public CoapMessageType getType() {
return type;
}

public void setType(CoapMessageType type) {
this.type = type;
}

public int getTokenLength() {
return tokenLength;
}

public void setTokenLength(int tokenLength) {
this.tokenLength = tokenLength;
}

public CoapMessageCode getCode() {
return code;
}

public void setCode(CoapMessageCode code) {
this.code = code;
}

public int getMessageId() {
return messageId;
}

public void setMessageId(int messageId) {
this.messageId = messageId;
}

public byte[] getToken() {
return token;
}

public void setToken(byte[] token) {
this.token = token;
}

public List<CoapMessageOption> getOptions() {
return options;
}

public void clearOptions() {
this.options.clear();
}

public void setOptions(List<CoapMessageOption> options) {
this.options = options;
}

public void addOption(CoapMessageOption option) {
this.options.add(option);
}

public void addObserveOption(int value) {
this.options.add(new CoapMessageOption(CoapMessageOptionNumber.OBSERVE, intToByteArray(value)));
}

public byte[] getPayload() {
return payload;
}

public void setPayload(byte[] payload) {
this.payload = payload;
}

public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}

public void setRemoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}

private byte[] intToByteArray(int value) {
byte[] byteArray = new byte[3];
byteArray[0] = (byte) (value >> 16);
byteArray[1] = (byte) (value >> 8);
byteArray[2] = (byte) value;
return byteArray;
}


@Override
public String toString() {
return "CoapMessage{" +
"version=" + version +
", type=" + type +
", tokenLength=" + tokenLength +
", code=" + code +
", messageId=" + messageId +
", token=" + Arrays.toString(token) +
", options=" + options +
", payload=" + Arrays.toString(payload) +
", remoteAddress=" + remoteAddress +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.common.model;

public enum CoapMessageCode {
EMPTY(0),

// Request Code, 0.xx
GET(1),
POST(2),
PUT(3),
DELETE(4),

// Response Success Code, 2.xx
CREATED(65),
DELETED(66),
Valid(67),
CHANGED(68),
CONTENT(69),

// Response Client Error Code, 4.xx
BAD_REQUEST(128),
UNAUTHORIZED(129),
BAD_OPTION(130),
FORBIDDEN(131),
NOT_FOUND(132),
METHOD_NOT_ALLOWED(133),
NOT_ACCEPTABLE(134),
PRECONDITION_FAILED(140),
REQUEST_ENTITY_TOO_LARGE(141),
UNSUPPORTED_CONTENT_FORMAT(143),

// Response Server Error Code, 5.xx
INTERNAL_SERVER_ERROR(160),
NOT_IMPLEMENTED(161),
BAD_GATEWAY(162),
SERVICE_UNAVAILABLE(163),
GATEWAY_TIMEOUT(164),
PROXYING_NOT_SUPPORTED(165);

private static final CoapMessageCode[] VALUES;
private final int value;

private CoapMessageCode(int value) {
this.value = value;
}

public int value() {
return this.value;
}

public static CoapMessageCode valueOf(int code) {
if (code >= 0 && code < VALUES.length && VALUES[code] != null) {
return VALUES[code];
} else {
throw new IllegalArgumentException("Unknown CoapMessageCode " + code);
}
}

public static boolean isRequestCode(CoapMessageCode code) {
return (code == GET) || (code == POST) || (code == PUT) || (code == DELETE);
}

public static boolean isEmptyCode(CoapMessageCode code) {
return code == EMPTY;
}

static {
CoapMessageCode[] values = values();
VALUES = new CoapMessageCode[192]; // Using 192 since the highest defined code is 192

for (CoapMessageCode code : values) {
int value = code.value;
if (VALUES[value] != null) {
throw new AssertionError("Value already in use: " + value + " by " + VALUES[value]);
}
VALUES[value] = code;
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.common.model;

public class CoapMessageOption {
private CoapMessageOptionNumber optionNumber;
private byte[] optionValue;

public CoapMessageOption(CoapMessageOptionNumber optionNumber, byte[] optionValue) {
this.optionNumber = optionNumber;
this.optionValue = optionValue;
}

public CoapMessageOption(int optionNumber, byte[] optionValue) {
this(CoapMessageOptionNumber.valueOf(optionNumber), optionValue);
}

public CoapMessageOptionNumber getOptionNumber() {
return optionNumber;
}

public void setOptionNumber(CoapMessageOptionNumber optionNumber) {
this.optionNumber = optionNumber;
}

public byte[] getOptionValue() {
return optionValue;
}

public void setOptionValue(byte[] optionValue) {
this.optionValue = optionValue;
}
}
Loading