From e48ae599d4fe6bba0c7434e7c7a1f154839b505d Mon Sep 17 00:00:00 2001 From: hufeng Date: Thu, 8 Sep 2022 20:13:26 +0800 Subject: [PATCH] implements seata-tcp-transport and format some codes --- .../seata-js/src/seata-common/byte-buffer.ts | 5 +- .../seata-js/src/seata-common/retry.test.ts | 20 ++++ packages/seata-js/src/seata-common/retry.ts | 99 +++++++++++++++++++ .../config-core/abstract-configuration.ts | 2 +- .../config-core/configuration-cache.ts | 5 +- .../src/seata-config/config-core/index.ts | 3 +- .../config-nacos/nacos-configuration.ts | 2 +- .../discovery-core/registry/index.ts | 4 +- .../nacos-registry-provider.ts | 4 +- .../discovery-nacos/nacos-registry-service.ts | 5 +- .../src/seata-rpc-client/seata-queue.test.ts | 2 +- .../src/seata-rpc-client/seata-queue.ts | 6 +- .../transport/seata-abstract-remoting.ts | 33 +++---- .../transport/seata-tcp-buffer.test.ts | 5 +- .../transport/seata-tcp-buffer.ts | 7 +- .../transport/seata-tcp-heartbeat.ts | 5 +- .../transport/seata-tcp-transport.ts | 94 +++++++++++++----- .../v1/protocol-v1-decoder.ts | 4 +- .../v1/protocol-v1-encoder.ts | 4 +- scripts/check-license.py | 2 +- 20 files changed, 233 insertions(+), 78 deletions(-) create mode 100644 packages/seata-js/src/seata-common/retry.test.ts create mode 100644 packages/seata-js/src/seata-common/retry.ts diff --git a/packages/seata-js/src/seata-common/byte-buffer.ts b/packages/seata-js/src/seata-common/byte-buffer.ts index c9d696e..df8f5c5 100644 --- a/packages/seata-js/src/seata-common/byte-buffer.ts +++ b/packages/seata-js/src/seata-common/byte-buffer.ts @@ -17,11 +17,12 @@ import { Buffer } from 'node:buffer' +const DEFAULT_ALLOC_SIZE = 1024 + export interface ByteBufferProp { buffer?: Buffer defaultAllocSize?: number } - export interface ReadWriteProp { /** * set read or write index @@ -44,8 +45,6 @@ export interface ReadWriteProp { len?: number } -const DEFAULT_ALLOC_SIZE = 1024 - /** * BufferBuffer is a buffer wrapper class * which can be used to read and write data to buffer. diff --git a/packages/seata-js/src/seata-common/retry.test.ts b/packages/seata-js/src/seata-common/retry.test.ts new file mode 100644 index 0000000..8042df8 --- /dev/null +++ b/packages/seata-js/src/seata-common/retry.test.ts @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +describe('restry test suite', () => { + it('test init retry', () => {}) +}) diff --git a/packages/seata-js/src/seata-common/retry.ts b/packages/seata-js/src/seata-common/retry.ts new file mode 100644 index 0000000..acc5605 --- /dev/null +++ b/packages/seata-js/src/seata-common/retry.ts @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import debug from 'debug' +import { noop } from './util' + +const MAX_RETRIES = 10 +const dlog = debug('seata-common:retry~') + +export interface RetryProps { + initialDelay?: number + period?: number + maxRetry?: number + onFailedEnd?: Function + run: (onSuccess: Function, onFailed: Function) => void +} + +export class Retry { + private retryTime: number + private readonly initialDelay: number + private readonly period: number + private readonly maxRetryCount: number + + private run: (onSuccess: Function, onFailed: Function) => void + private onFailedEnd: Function + + static from(props: RetryProps) { + return new Retry(props) + } + + constructor(props: RetryProps) { + this.maxRetryCount = props.maxRetry || MAX_RETRIES + this.retryTime = this.maxRetryCount + + this.initialDelay = props.initialDelay || 0 + this.period = props.period || 1000 + + dlog(`init props: %j`, { + initialDelay: this.initialDelay, + period: this.period, + maxRetry: this.maxRetryCount, + }) + + this.run = props.run + this.onFailedEnd = props.onFailedEnd || noop + } + + start() { + dlog(`starting retry, current retry num:%d`, this.retryTime) + + // first run + if (this.retryTime === this.maxRetryCount) { + setTimeout(() => { + this.run(this.onSuccess, this.onFailed) + this.retryTime-- + }, this.initialDelay) + return + } + + // stop retry + if (this.retryTime === 0) { + this.onFailedEnd() + return + } + + // retry + setTimeout(() => { + this.run(this.onSuccess, this.onFailed) + this.retryTime-- + }, this.period) + } + + reset() { + dlog('reset') + this.retryTime = this.maxRetryCount + } + + private onSuccess() { + this.reset() + } + + private onFailed() { + this.start() + } +} diff --git a/packages/seata-js/src/seata-config/config-core/abstract-configuration.ts b/packages/seata-js/src/seata-config/config-core/abstract-configuration.ts index f0509b5..6644aae 100644 --- a/packages/seata-js/src/seata-config/config-core/abstract-configuration.ts +++ b/packages/seata-js/src/seata-config/config-core/abstract-configuration.ts @@ -46,4 +46,4 @@ export class AbstractConfiguration { getConfig(dataId: string, defaultValue: string) { return '' } -} \ No newline at end of file +} diff --git a/packages/seata-js/src/seata-config/config-core/configuration-cache.ts b/packages/seata-js/src/seata-config/config-core/configuration-cache.ts index 36cf8e0..b86bb02 100644 --- a/packages/seata-js/src/seata-config/config-core/configuration-cache.ts +++ b/packages/seata-js/src/seata-config/config-core/configuration-cache.ts @@ -15,7 +15,4 @@ * limitations under the License. */ - -export class ConfigurationCache { - -} \ No newline at end of file +export class ConfigurationCache {} diff --git a/packages/seata-js/src/seata-config/config-core/index.ts b/packages/seata-js/src/seata-config/config-core/index.ts index b5785aa..e7c848f 100644 --- a/packages/seata-js/src/seata-config/config-core/index.ts +++ b/packages/seata-js/src/seata-config/config-core/index.ts @@ -37,8 +37,7 @@ export class ConfigurationFactory { static instance = null - private static load() { - } + private static load() {} /** * Gets instance. diff --git a/packages/seata-js/src/seata-config/config-nacos/nacos-configuration.ts b/packages/seata-js/src/seata-config/config-nacos/nacos-configuration.ts index 445ba57..908b9b3 100644 --- a/packages/seata-js/src/seata-config/config-nacos/nacos-configuration.ts +++ b/packages/seata-js/src/seata-config/config-nacos/nacos-configuration.ts @@ -19,7 +19,7 @@ import { AbstractConfiguration } from '../config-core/abstract-configuration' /** * the type Nacos configuration - * + * * @author godkun */ export class NacosConfiguration extends AbstractConfiguration { diff --git a/packages/seata-js/src/seata-discovery/discovery-core/registry/index.ts b/packages/seata-js/src/seata-discovery/discovery-core/registry/index.ts index 069cfe4..74ad8ac 100644 --- a/packages/seata-js/src/seata-discovery/discovery-core/registry/index.ts +++ b/packages/seata-js/src/seata-discovery/discovery-core/registry/index.ts @@ -15,7 +15,7 @@ * limitations under the License. */ -import { RegistryService, RegistryType } from "../../discovery" +import { RegistryService, RegistryType } from '../../discovery' import { NacosRegistryProvider } from '../../discovery-nacos/nacos-registry-provider' @@ -37,6 +37,6 @@ export default class RegistryFactory { private static buildRegistryService(): RegistryService { let registryType: RegistryType registryType = RegistryType.Nacos - return (new NacosRegistryProvider).provide() + return new NacosRegistryProvider().provide() } } diff --git a/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-provider.ts b/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-provider.ts index 0607a21..f3f547f 100644 --- a/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-provider.ts +++ b/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-provider.ts @@ -20,11 +20,11 @@ import { NacosRegistryService } from './nacos-registry-service' /** * the nacos registry provider - * + * * @author godkun */ export class NacosRegistryProvider implements RegistryProvider { provide(): RegistryService { return NacosRegistryService.getInstance() } -} \ No newline at end of file +} diff --git a/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-service.ts b/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-service.ts index 72fc13b..4db6b55 100644 --- a/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-service.ts +++ b/packages/seata-js/src/seata-discovery/discovery-nacos/nacos-registry-service.ts @@ -44,7 +44,6 @@ export class NacosRegistryService implements RegistryService { private static instance: NacosRegistryService - static getInstance(): NacosRegistryService { if (!NacosRegistryService.instance) { NacosRegistryService.instance = new NacosRegistryService() @@ -67,10 +66,10 @@ export class NacosRegistryService implements RegistryService { unsubscribe(cluster: string, listener) { // TODO: } - + lookup(key: string): Array { return ['1'] } close() {} -} \ No newline at end of file +} diff --git a/packages/seata-js/src/seata-rpc-client/seata-queue.test.ts b/packages/seata-js/src/seata-rpc-client/seata-queue.test.ts index 6951ceb..6b54f06 100644 --- a/packages/seata-js/src/seata-rpc-client/seata-queue.test.ts +++ b/packages/seata-js/src/seata-rpc-client/seata-queue.test.ts @@ -15,9 +15,9 @@ * limitations under the License. */ +import { SeataQueue } from './seata-queue' import config from '../seata-config/config' import { RpcMessage } from '../seata-protocol/rpc-message' -import { SeataQueue } from './seata-queue' describe('seata queue test suites', () => { // set max req timeout diff --git a/packages/seata-js/src/seata-rpc-client/seata-queue.ts b/packages/seata-js/src/seata-rpc-client/seata-queue.ts index 3cd6d03..77be784 100644 --- a/packages/seata-js/src/seata-rpc-client/seata-queue.ts +++ b/packages/seata-js/src/seata-rpc-client/seata-queue.ts @@ -21,6 +21,9 @@ import config from '../seata-config/config' import { RpcMessage } from '../seata-protocol/rpc-message' import { SeataContext } from './seata-context' +// init log +const log = debug('seata:rpc:seata-queue') + export type SeataQueueId = number export type SeataRpcResponse = { err: Error | null @@ -28,9 +31,6 @@ export type SeataRpcResponse = { } export type SeataQueueSubscribe = (id: SeataQueueId, msg: SeataContext) => void -// init log -const log = debug('seata:rpc:seata-queue') - /** * seata-rpc-queue */ diff --git a/packages/seata-js/src/seata-rpc-client/transport/seata-abstract-remoting.ts b/packages/seata-js/src/seata-rpc-client/transport/seata-abstract-remoting.ts index be8ce5a..199b555 100644 --- a/packages/seata-js/src/seata-rpc-client/transport/seata-abstract-remoting.ts +++ b/packages/seata-js/src/seata-rpc-client/transport/seata-abstract-remoting.ts @@ -15,14 +15,14 @@ * limitations under the License. */ +import { Socket } from 'net' import { genNextId } from '../seata-id' import prot from '../../seata-protocol/protocol-constants' import { RpcMessage } from '../../seata-protocol/rpc-message' -import { Socket } from 'net' -import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message' import { ProtocolV1Encoder } from '../v1/protocol-v1-encoder' +import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message' -export default class AbstractSeataRemoting { +export default abstract class AbstractSeataRemoting { protected transport: Socket constructor(transport: Socket) { @@ -30,13 +30,12 @@ export default class AbstractSeataRemoting { } protected buildRequestMessage(msg: Object, messageType: number) { - const rpcMessage = new RpcMessage() - rpcMessage.setId(genNextId()) - rpcMessage.setMessageType(messageType) - rpcMessage.setCodec(prot.CONFIGURED_CODEC) - rpcMessage.setCompressor(prot.CONFIGURED_COMPRESSOR) - rpcMessage.setBody(msg) - return rpcMessage + return new RpcMessage() + .setId(genNextId()) + .setMessageType(messageType) + .setCodec(prot.CONFIGURED_CODEC) + .setCompressor(prot.CONFIGURED_COMPRESSOR) + .setBody(msg) } protected buildResponseMessage( @@ -44,19 +43,19 @@ export default class AbstractSeataRemoting { msg: Object, messageType: number, ) { - const rpcMsg = new RpcMessage() - rpcMsg.setMessageType(messageType) - rpcMsg.setCodec(rpcMessage.getCodec()) // same with request - rpcMsg.setCompressor(rpcMessage.getCompressor()) - rpcMsg.setBody(msg) - rpcMsg.setId(rpcMessage.getId()) - return rpcMsg + return new RpcMessage() + .setMessageType(messageType) + .setCodec(rpcMessage.getCodec()) // same with request + .setCompressor(rpcMessage.getCompressor()) + .setBody(msg) + .setId(rpcMessage.getId()) } protected send(msg: Object) { if (!this.transport) { return } + const rpcMessage = this.buildRequestMessage( msg, msg instanceof HeartbeatMessage diff --git a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.test.ts b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.test.ts index 008d67e..0564f3d 100644 --- a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.test.ts +++ b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.test.ts @@ -15,9 +15,8 @@ * limitations under the License. */ -import { Socket } from 'net' -import { EventEmitter } from 'events' - +import { Socket } from 'node:net' +import { EventEmitter } from 'node:events' import SeataTcpBuffer from './seata-tcp-buffer' import { RpcMessage } from '../../seata-protocol/rpc-message' import { MessageType } from '../../seata-protocol/message-type' diff --git a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.ts b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.ts index 80f9c2d..24d7acb 100644 --- a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.ts +++ b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-buffer.ts @@ -15,20 +15,19 @@ * limitations under the License. */ -import { Socket } from 'net' +import { Socket } from 'node:net' import { Buffer } from 'node:buffer' import debug from 'debug' - import { noop } from '../../seata-common/util' import ByteBuffer from '../../seata-common/byte-buffer' import prot from '../../seata-protocol/protocol-constants' +const log = debug('seata:tcp-buffer') + export interface SeataTcpBufferSubscriber { (data: Buffer): void } -const log = debug('seata:tcp-buffer') - /** * 在并发的tcp数据传输中,会出现少包,粘包的现象 * 好在tcp的传输是可以保证顺序的 diff --git a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-heartbeat.ts b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-heartbeat.ts index 4e3f723..ce0b4f6 100644 --- a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-heartbeat.ts +++ b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-heartbeat.ts @@ -16,9 +16,11 @@ */ import { Socket } from 'net' +import debug from 'debug' import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message' import AbstractSeataRemoting from './seata-abstract-remoting' +const log = debug('seata:heartbeat~') // reference: NettyBasicConfig.java const DEFAULT_WRITE_IDLE_SECONDS = 5_000 @@ -60,8 +62,9 @@ export class SeataHeartBeat extends AbstractSeataRemoting { /** * receive heartbeat response message */ - async receive(): Promise { + receive() { this.lastActivityTime = Date.now() + log(`receive heartbeat pong`) } destroy() { diff --git a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-transport.ts b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-transport.ts index 7beb067..d42786b 100644 --- a/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-transport.ts +++ b/packages/seata-js/src/seata-rpc-client/transport/seata-tcp-transport.ts @@ -17,62 +17,104 @@ import { Socket } from 'net' import debug from 'debug' +import { SeataTransport } from './transport' +import SeataTcpBuffer from './seata-tcp-buffer' +import { Retry } from '../../seata-common/retry' +import { SeataHeartBeat } from './seata-tcp-heartbeat' import { RpcMessage } from '../../seata-protocol/rpc-message' import { ProtocolV1Decoder } from '../v1/protocol-v1-decoder' import { ProtocolV1Encoder } from '../v1/protocol-v1-encoder' -import SeataTcpBuffer from './seata-tcp-buffer' -import { SeataHeartBeat } from './seata-tcp-heartbeat' -import { SeataTransport } from './transport' +import { TransportStatus } from './seata-transport-status' -type id = number +const log = debug(`seata:rpc:transport`) +type id = number interface SeataTransportPromise { resolve: (value?: any) => void reject: (reason?: any) => void } -const log = debug(`seata:rpc:transport`) - export class SeataTcpTransport implements SeataTransport { - private readonly requestQueue: Map - private readonly transport: Socket private readonly host: string - - private heartBeat: SeataHeartBeat + private readonly requestQueue: Map + private status: TransportStatus + private transport: Socket + private heartBeat!: SeataHeartBeat constructor(hostname: string, port: number) { this.host = `${hostname}:${port}` - this.requestQueue = new Map() - - // set transport + this.status = TransportStatus.PADDING this.transport = new Socket() - this.transport.setNoDelay() - this.transport - .connect(port, hostname, this.handleSocketConnected) - .on('error', this.handleSocketErr) - .on('close', this.handleSocketClose) + this.requestQueue = new Map() - // set tcp buffer - new SeataTcpBuffer(this.transport).subscribe(this.handleTcpBuffer) + Retry.from({ + initialDelay: 0, + maxRetry: 100, + period: 100, // 100ms + run: async (onSuccess, onFailed) => { + try { + await this.initTransport(hostname, port) + onSuccess() + } catch (err) { + onFailed() + } + }, + onFailedEnd: () => { + // TODO emit status + this.status = TransportStatus.CLOSED + }, + }) + } - // set heartbeat - this.heartBeat = new SeataHeartBeat(this.transport) + getStatus() { + return this.status } send(msg: RpcMessage): Promise { return new Promise((resolve, reject) => { const id = msg.getId() this.requestQueue.set(id, { resolve, reject }) - this.transport.write(ProtocolV1Encoder.encode(msg)) + this.transport.write(ProtocolV1Encoder.encode(msg), (err) => { + log(`write error %s`, err) + }) this.heartBeat.setLastActivityTime(Date.now()) }) } + private initTransport(hostname: string, port: number) { + return new Promise((resolve, reject) => { + // set transport + this.transport.setNoDelay() + this.transport + .connect(port, hostname, () => { + this.handleSocketConnected() + resolve(null) + }) + .on('error', (err) => { + this.handleSocketErr(err) + reject(err) + }) + .on('close', this.handleSocketClose) + }) + } + private handleSocketConnected = () => { - log('tcp-transport = connecting => %s', this.host) + log('tcp-transport = connected => %s', this.host) + this.status = TransportStatus.CONNECTED + // stop retry + this.heartBeat = new SeataHeartBeat(this.transport) + // set tcp buffer + new SeataTcpBuffer(this.transport).subscribe(this.handleTcpBuffer) + } + + private handleSocketErr = (err: Error) => { + log(`connecting => ${this.host} error %s`, err) + this.status = TransportStatus.RETRY + } + + private handleSocketClose = () => { + log(`socket transport close => ${this.host}`) } - private handleSocketErr = () => {} - private handleSocketClose = () => {} private handleTcpBuffer = (data: Buffer) => { // decode message diff --git a/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-decoder.ts b/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-decoder.ts index 8164f40..7947cd0 100644 --- a/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-decoder.ts +++ b/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-decoder.ts @@ -24,6 +24,8 @@ import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message' import { CompressorFactory } from '../../seata-compressor' import SerializerFactory from '../../seata-serializer' +const log = debug(`prot:v1:decoder`) + /** *
  * 0     1     2     3     4     5     6     7     8     9    10     11    12    13    14    15    16
@@ -48,8 +50,6 @@ import SerializerFactory from '../../seata-serializer'
  * https://github.com/seata/seata/issues/893
  */
 
-const log = debug(`prot:v1:decoder`)
-
 export class ProtocolV1Decoder {
   static decode(buffer: Buffer) {
     const frame = new ByteBuffer({ buffer })
diff --git a/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-encoder.ts b/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-encoder.ts
index 71c55d6..14f5217 100644
--- a/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-encoder.ts
+++ b/packages/seata-js/src/seata-rpc-client/v1/protocol-v1-encoder.ts
@@ -23,6 +23,8 @@ import SerializerFactory from '../../seata-serializer'
 import { CompressorFactory } from '../../seata-compressor'
 import { HeadMapSerializer } from './headmap-serializer'
 
+const log = debug(`seata:prot:v1:encoder`)
+
 /**
  * 
  * 0     1     2     3     4     5     6     7     8     9    10     11    12    13    14    15    16
@@ -47,8 +49,6 @@ import { HeadMapSerializer } from './headmap-serializer'
  * https://github.com/seata/seata/issues/893
  */
 
-const log = debug(`seata:prot:v1:encoder`)
-
 export class ProtocolV1Encoder {
   static encode(msg: RpcMessage) {
     log(`encode rpc message %j`, msg)
diff --git a/scripts/check-license.py b/scripts/check-license.py
index 975ad35..58f7146 100644
--- a/scripts/check-license.py
+++ b/scripts/check-license.py
@@ -50,7 +50,7 @@ def fixed_license_file(file):
     with open(file, 'r+') as f:
         raw = f.read()
         f.seek(0)
-        f.write(LICENSE + '\n' + raw)
+        f.write(f"{LICENSE}\n\n{raw}")
 
 
 if __name__ == '__main__':