Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: MongoDB CDC source dealing with canonical timestamps and other structural data types #20249

Open
ClSlaid opened this issue Jan 21, 2025 · 4 comments
Labels
type/bug Something isn't working type/enhancement Improvements to existing implementation.
Milestone

Comments

@ClSlaid
Copy link

ClSlaid commented Jan 21, 2025

Describe the bug

Current MongoDB CDC implementation doesn't work well with timestamps and datetimes in MongoDB.

MongoDB struct:

name: string
email: string
last_login: timestamp
-- data definision in risingwave
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.test.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

CREATE MATERIALIZED VIEW normalized_users AS
SELECT
    payload ->> 'name' as name,
    payload ->> 'email' as email,
    payload ->> 'last_login' as last_login
FROM
    users;

Run following query:

SELECT DISTINCT nu.last_login
FROM normalized_users nu
JOIN (
    SELECT payload->>'name' as name
    FROM users
    ORDER BY _id DESC
) u ON nu.name = u.name LIMIT 10;

Image

P.S. During generating data, I've put both timestamps and date records in the last_login field.

Error message/log

Logical bug? No actual error messages.

To Reproduce

For detailed deployment, check the How did you deploy RisingWave? part.

Expected behavior

MongoDB records its data as BSON, which will encode data along with its type together:

{ "_id": {"$numberLong": "7355608"}, "name": "evawgnisir", "released": { "$date": "1145141919810" }}

It will then be capsuled in debezium messages as:

{ "payload": { "before": null, "after": { "_id": {"$numberLong": "7355608"}, "name": "evawgnisir", "released": { "$date": "1145141919810" }}}

Risingwave worked well with those naive strings, Unfortunately, we currently parse and store those data as JSON, and letting users access those fields in materialized views. This will surely lead to printing out JSON object instead of timestamps, date times, long integers...

How did you deploy RisingWave?

This could be reproduced with a docker compose project.

  data/
  │ configdb/
  │ db/
  │ | keyfile
  │ └ insert.js
  secrets/
  │ create_mv.sql
  │ create_source.sql
  │ data_check
  │ docker-compose.yaml
  │ query.sql
  └ register-mongodb-connector.sh

The project is largely edited from https://github.com/risingwavelabs/risingwave/tree/711fedf72865c5a799ae0b6d1c892c84d1aff4de/integration_tests/debezium-mongo .

keyfile is a MongoDB key file, you can simply generate it using base64.

// insert.js
// 定义一个函数来生成随机数据
function getRandomUser() {
  const names = [
    "Alice",
    "Bob",
    "Charlie",
    "David",
    "Eva",
    "Frank",
    "Grace",
    "Hank",
    "Ivy",
    "Jack",
  ];
  const domains = ["example.com", "test.com", "demo.com", "sample.com"];

  const randomName = names[Math.floor(Math.random() * names.length)];
  const randomEmail = `${randomName.toLowerCase()}@${domains[Math.floor(Math.random() * domains.length)]}`;

  const randomSeconds = Math.floor(
    Math.random() * (1704067199 - 1577836800) + 1577836800,
  );

  // 生成随机的递增计数器,范围:1 到 100
  const randomIncrement = Math.floor(Math.random() * 100) + 1;

  // 创建 Timestamp
  const randomTimestamp = new Timestamp(randomSeconds, randomIncrement);
  return {
    name: randomName,
    email: randomEmail,
    last_login: randomTimestamp,
  };
}

db.users.drop();
print("'users' collection dropped.");
// 插入 1000 条随机数据
let users = [];
for (let i = 0; i < 1000; i++) {
  users.push(getRandomUser());
}
db.users.insertMany(users);

// 打印插入完成的消息
print(db.users.count(), "documents inserted into 'users' collection.");
-- create-source.sql
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.test.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;
-- create-mv.sql
CREATE MATERIALIZED VIEW normalized_users AS
SELECT
    payload ->> 'name' as name,
    payload ->> 'email' as email,
    payload ->> 'last_login' as last_login
FROM
    users;
# docker-compose.yaml
services:
  message_queue:
    image: "redpandadata/redpanda:latest"
    command:
      - redpanda
      - start
      - "--smp"
      - "1"
      - "--reserve-memory"
      - 0M
      - "--memory"
      - 4G
      - "--overprovisioned"
      - "--node-id"
      - "0"
      - "--check=false"
      - "--kafka-addr"
      - "PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092"
      - "--advertise-kafka-addr"
      - "PLAINTEXT://message_queue:29092,OUTSIDE://localhost:9092"
    expose:
      - "29092"
      - "9092"
      - "9644"
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9644:9644"
      - "8081:8081"
    depends_on: []
    volumes:
      - "message_queue:/var/lib/redpanda/data"
    environment: {}
    container_name: message_queue
    healthcheck:
      test: curl -f localhost:9644/v1/status/ready
      interval: 1s
      timeout: 5s
      retries: 5
    restart: always
  mongodb1:
    image: mongo:5.0
    container_name: mongodb1
    restart: always
    ports:
      - "27017:27017"
    healthcheck:
      test: ["CMD", "mongo", "--eval", "db.adminCommand('ping')"]
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 10s
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    command: mongod --replSet rs0 --bind_ip_all --keyFile /data/keyfile/mongodb-keyfile
    volumes:
      - ./data:/data
      - ./secrets:/data/keyfile
    depends_on:
      - mongodb2
      - mongodb3

  mongodb2:
    image: mongo:5.0
    container_name: mongodb2
    restart: always
    ports:
      - "27018:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    command: mongod --replSet rs0 --bind_ip_all --keyFile /data/keyfile/mongodb-keyfile
    volumes:
      - ./data:/data
      - ./secrets:/data/keyfile

  mongodb3:
    image: mongo:5.0
    container_name: mongodb3
    restart: always
    ports:
      - "27019:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    command: mongod --replSet rs0 --bind_ip_all --keyFile /data/keyfile/mongodb-keyfile
    volumes:
      - ./data:/data
      - ./secrets:/data/keyfile

  debezium:
    image: debezium/connect:1.9
    container_name: debezium
    depends_on:
      - message_queue
      - mongodb1
    ports:
      - "8083:8083"
    healthcheck:
      test: curl -f http://localhost:8083
      interval: 1s
      start_period: 120s
    environment:
      BOOTSTRAP_SERVERS: message_queue:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_mongodb_config
      OFFSET_STORAGE_TOPIC: debezium_mongodb_offset
      STATUS_STORAGE_TOPIC: debezium_mongodb_status
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081

  register-mongodb-connector:
    image: curlimages/curl:7.79.1
    container_name: register-mongodb-connector
    depends_on:
      debezium:
        condition: service_healthy
    command: >
      /bin/sh /register-mongodb-connector.sh
    restart: on-failure
    volumes:
      - ./register-mongodb-connector.sh:/register-mongodb-connector.sh

  kafka-connect-ui:
    image: landoop/kafka-connect-ui:0.9.7
    container_name: kafka-connect-ui
    depends_on:
      debezium:
        condition: service_healthy
    ports:
      - "8000:8000"
    environment:
      CONNECT_URL: http://debezium:8083

  risingwave:
    image: risingwavelabs/risingwave:latest
    container_name: risingwave
    ports:
      - "4566:4566"
    environment:
      RUST_LOG: info
      RW_PG_LISTEN_ADDR: 0.0.0.0:4566
    # depends_on:
    # - kafka

  # Init 容器
  init-mongo:
    image: mongo:5.0
    container_name: init_mongo
    volumes:
      - ./data:/data
    depends_on:
      mongodb1:
        condition: service_healthy
    entrypoint: [
        "sh",
        "-c",
        '
        echo ''Waiting for MongoDB and Debezium to be ready...'';
        sleep 50;
        echo ''Configuring MongoDB replica set...'';
        mongo --host mongodb1:27017 -u root -p example --eval ''rs.initiate({
        _id: "rs0",
        members: [
        { _id: 0, host: "mongodb1:27017" },
        { _id: 1, host: "mongodb2:27017" },
        { _id: 2, host: "mongodb3:27017" }
        ]
        })'';
        echo ''MongoDB replica set configured.'';
        echo ''Import data'';

        mongoimport --host mongodb1:27017 \
        -u root \
        -p example \
        --db test \
        --collection users \
        --file /data/users.json \
        --jsonArray
        ',
      ]

volumes:
  message_queue:

The version of RisingWave

No response

Additional context

No response

@ClSlaid ClSlaid added type/bug Something isn't working type/enhancement Improvements to existing implementation. labels Jan 21, 2025
@github-actions github-actions bot added this to the release-2.3 milestone Jan 21, 2025
@ClSlaid ClSlaid changed the title [cdc] MongoDB CDC source dealing with canonical timestamps and other structural data types Bug: MongoDB CDC source dealing with canonical timestamps and other structural data types Jan 21, 2025
@hzxa21
Copy link
Collaborator

hzxa21 commented Jan 22, 2025

Good finding. Though it won't cause any errors in the parsing logic, I think this is a logical bug that needs to be fixed.

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Jan 22, 2025

Looks related #19982
While the JSON implementation may be as raw as possible (See #17650 (comment) for discussion on DynamoDB, but it is not merged), the strongly typed syntax in #19982 shall handle these to provide a better experience.

@hzxa21
Copy link
Collaborator

hzxa21 commented Jan 22, 2025

+1 for recommending user to use strongly typed schema after #19982.

the JSON implementation may be as raw as possible
@xiangjinwu Do you think we need to convert jsoin with relax bson fields to a more friendly json?

"released": { "$date": "1145141919810" }

to

"released": "1145141919810"

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Jan 22, 2025

the JSON implementation may be as raw as possible

@xiangjinwu Do you think we need to convert jsoin with relax bson fields to a more friendly json?

No. These extra annotations existed for a reason, and stripping them is a lossy conversion. In other words, this "friendly json" can be a third option after strong types and raw json. Essentially we are inventing another format - why not use "released": "2024-12-19T11:23:41.137Z" instead of "released": "1734607421137"?

Update:

Essentially we are inventing another format - why not use "released": "2024-12-19T11:23:41.137Z" instead of "released": "1734607421137"?

If MongoDB has formally defined such an annotation-free format already, then yes we can support converting to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working type/enhancement Improvements to existing implementation.
Projects
None yet
Development

No branches or pull requests

3 participants