Skip to content

Commit

Permalink
Asynchronous Codec methods + updated (deprecated) message processing. (
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 10, 2024
1 parent 88cb752 commit ccb3891
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 81 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
## 3.4.0-dev.2

- Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU).
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- **Allowing custom type codecs**:
- `Codec` interface is used for encoding/decoding value by type OIDs or Dart values.
- `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides
Expand All @@ -13,6 +12,8 @@
- `RelationTracker` tracks information about relations (currently limited to `RelationMessage` caching).
- `RuntimeParameters` to access server-provided parameter status values.
- **Behaviour / soft-breaking changes**:
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
- `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`).
- The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat
Expand Down
5 changes: 3 additions & 2 deletions lib/messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ library messages;

export 'src/buffer.dart' show PgByteDataWriter;
export 'src/messages/client_messages.dart';
export 'src/messages/logical_replication_messages.dart';
export 'src/messages/server_messages.dart';
export 'src/messages/logical_replication_messages.dart'
hide tryAsyncParseLogicalReplicationMessage;
export 'src/messages/server_messages.dart' hide parseXLogDataMessage;
export 'src/messages/shared_messages.dart';
10 changes: 6 additions & 4 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:collection';
import 'dart:typed_data';

Expand All @@ -11,7 +12,7 @@ import 'messages/shared_messages.dart';

const int _headerByteSize = 5;

typedef _ServerMessageFn = ServerMessage Function(
typedef _ServerMessageFn = FutureOr<ServerMessage> Function(
PgByteDataReader reader, int length);

Map<int, _ServerMessageFn> _messageTypeMap = {
Expand Down Expand Up @@ -50,7 +51,7 @@ class MessageFramer {
bool get _isComplete =>
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;

void addBytes(Uint8List bytes) {
Future<void> addBytes(Uint8List bytes) async {
_reader.add(bytes);

while (true) {
Expand All @@ -76,7 +77,7 @@ class MessageFramer {
}

final targetRemainingLength = _reader.remainingLength - _expectedLength;
final msg = msgMaker(_reader, _expectedLength);
final msg = await msgMaker(_reader, _expectedLength);
if (_reader.remainingLength > targetRemainingLength) {
throw StateError(
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
Expand Down Expand Up @@ -111,7 +112,8 @@ class MessageFramer {
/// such as replication messages.
/// Returns a [ReplicationMessage] if the message contains such message.
/// Otherwise, it'll just return the provided bytes as [CopyDataMessage].
ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
Future<ServerMessage> _parseCopyDataMessage(
PgByteDataReader reader, int length) async {
final code = reader.readUint8();
if (code == ReplicationMessageId.primaryKeepAlive) {
return PrimaryKeepAliveMessage.parse(reader);
Expand Down
201 changes: 190 additions & 11 deletions lib/src/messages/logical_replication_messages.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:meta/meta.dart';
import 'package:postgres/src/types/codec.dart';

import '../buffer.dart';
Expand Down Expand Up @@ -47,6 +48,8 @@ class XLogDataLogicalMessage implements XLogDataMessage {

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@Deprecated('This method will be removed from public API. '
'Please file a new issue on GitHub if you are using it.')
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
PgByteDataReader reader, int length) {
// the first byte is the msg type
Expand All @@ -69,13 +72,66 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage(
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return InsertMessage._parse(reader);
return InsertMessage._syncParse(reader);

case LogicalReplicationMessageTypes.update:
return UpdateMessage._parse(reader);
return UpdateMessage._syncParse(reader);

case LogicalReplicationMessageTypes.delete:
return DeleteMessage._parse(reader);
return DeleteMessage._syncParse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);

case LogicalReplicationMessageTypes.unsupported:
// wal2json messages starts with `{` as the first byte
if (firstByte == '{'.codeUnits.single) {
// note this needs the full set of bytes unlike other cases
final bb = BytesBuffer();
bb.addByte(firstByte);
bb.add(reader.read(length - 1));
try {
return JsonMessage(reader.encoding.decode(bb.toBytes()));
} catch (_) {
// ignore
}
}
return null;
}
}

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@internal
Future<LogicalReplicationMessage?> tryAsyncParseLogicalReplicationMessage(
PgByteDataReader reader, int length) async {
// the first byte is the msg type
final firstByte = reader.readUint8();
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
switch (msgType) {
case LogicalReplicationMessageTypes.begin:
return BeginMessage._parse(reader);

case LogicalReplicationMessageTypes.commit:
return CommitMessage._parse(reader);

case LogicalReplicationMessageTypes.origin:
return OriginMessage._parse(reader);

case LogicalReplicationMessageTypes.relation:
return RelationMessage._parse(reader);

case LogicalReplicationMessageTypes.type:
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return await InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return await UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return await DeleteMessage._parse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);
Expand Down Expand Up @@ -381,7 +437,9 @@ class TupleData {
/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
factory TupleData._parse(PgByteDataReader reader, int relationId) {
///
/// NOTE: do not use, will be removed.
factory TupleData._syncParse(PgByteDataReader reader, int relationId) {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
Expand Down Expand Up @@ -437,6 +495,66 @@ class TupleData {
return TupleData(columns: columns);
}

/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
static Future<TupleData> _parse(
PgByteDataReader reader, int relationId) async {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
// reading order matters
final typeId = reader.readUint8();
final tupleDataType = TupleDataType.fromByte(typeId);
late final int length;
late final String data;
final typeOid = reader.codecContext.relationTracker
.getCachedTypeOidForRelationColumn(relationId, i);
Object? value;
switch (tupleDataType) {
case TupleDataType.text:
length = reader.readUint32();
data = reader.encoding.decode(reader.read(length));
value = data;
break;
case TupleDataType.binary:
length = reader.readUint32();
final bytes = reader.read(length);
value = typeOid == null
? UndecodedBytes(
typeOid: 0,
isBinary: true,
bytes: bytes,
encoding: reader.codecContext.encoding,
)
: await reader.codecContext.typeRegistry.decode(
EncodedValue.binary(
bytes,
typeOid: typeOid,
),
reader.codecContext,
);
data = value.toString();
break;
case TupleDataType.null_:
case TupleDataType.toast:
length = 0;
data = '';
break;
}
columns.add(
TupleDataColumn(
typeId: typeId,
length: length,
typeOid: typeOid,
data: data,
value: value,
),
);
}
return TupleData(columns: columns);
}

late final int columnCount = columns.length;

@override
Expand All @@ -451,13 +569,26 @@ class InsertMessage implements LogicalReplicationMessage {
late final int relationId;
late final TupleData tuple;

InsertMessage._parse(PgByteDataReader reader) {
InsertMessage._(this.relationId, this.tuple);

/// NOTE: do not use, will be removed.
InsertMessage._syncParse(PgByteDataReader reader) {
relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
tuple = TupleData._parse(reader, relationId);
tuple = TupleData._syncParse(reader, relationId);
}

static Future<InsertMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
final tuple = await TupleData._parse(reader, relationId);
return InsertMessage._(relationId, tuple);
}

@override
Expand Down Expand Up @@ -511,28 +642,58 @@ class UpdateMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData? newTuple;

UpdateMessage._parse(PgByteDataReader reader) {
UpdateMessage._(
this.relationId, this.oldTupleType, this.oldTuple, this.newTuple);

/// NOTE: do not use, will be removed.
UpdateMessage._syncParse(PgByteDataReader reader) {
// reading order matters
relationId = reader.readUint32();
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = TupleData._parse(reader, relationId);
oldTuple = TupleData._syncParse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = TupleData._parse(reader, relationId);
newTuple = TupleData._syncParse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
}

static Future<UpdateMessage> _parse(PgByteDataReader reader) async {
// reading order matters
final relationId = reader.readUint32();
UpdateMessageTuple? oldTupleType;
TupleData? oldTuple;
TupleData? newTuple;
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = await TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = await TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple);
}

@override
String toString() {
return 'UpdateMessage(relationId: $relationId, oldTupleType: $oldTupleType, oldTuple: $oldTuple, newTuple: $newTuple)';
Expand Down Expand Up @@ -583,18 +744,36 @@ class DeleteMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData oldTuple;

DeleteMessage._parse(PgByteDataReader reader) {
DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple);

/// NOTE: do not use, will be removed.
DeleteMessage._syncParse(PgByteDataReader reader) {
relationId = reader.readUint32();
oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());

switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = TupleData._parse(reader, relationId);
oldTuple = TupleData._syncParse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
}

static Future<DeleteMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());
TupleData? oldTuple;
switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = await TupleData._parse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
return DeleteMessage._(relationId, oldTupleType, oldTuple);
}

@override
Expand Down
Loading

0 comments on commit ccb3891

Please sign in to comment.