Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker disconnects cause database to crash. #18

Closed
johnistan opened this issue May 26, 2016 · 29 comments
Closed

Broker disconnects cause database to crash. #18

johnistan opened this issue May 26, 2016 · 29 comments
Assignees
Labels

Comments

@johnistan
Copy link

This is using the latest dockerfile in pipelinedb.

Pipeline: 9.2
librdkafka: 0.8
kafka brokers: 0.9.0.1

LOG:  [kafka consumer]: intkafka12:9092/12: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1176: "{"XXXXX"
LOG:  [kafka consumer]: intkafka12:9092/12: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1176: "{"XXXXX"
LOG:  [kafka consumer]: intkafka11:9092/11: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1176: "{"XXXXX"
LOG:  [kafka consumer]: intkafka11:9092/11: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1176: "{"XXXXX"
LOG:  [kafka consumer]: intkafka12:9092/12: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1847: "{"XXXXX"
LOG:  [kafka consumer]: intkafka12:9092/12: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1847: "{"XXXXX"
LOG:  [kafka consumer]: intkafka11:9092/11: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 4943: "{"XXXXXX"
LOG:  [kafka consumer]: intkafka11:9092/11: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 4943: "{"XXXXXX"
LOG:  [kafka consumer]: intkafka41:9092/41: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1847: "{"XXXXX"
LOG:  [kafka consumer]: intkafka41:9092/41: Receive failed: Disconnected
CONTEXT:  COPY summary_stream, line 1847: "{"XXXXXX"
LOG:  [kafka consumer] summary_stream <- summary.missing.1 failed to process batch, dropped 10000 messages:
ERROR:  could not find block containing chunk 0x1bc5ba0
LOG:  [kafka consumer] match_summary_stream <- summary.missing.1 failed to process batch, dropped 10000 messages:
ERROR:  could not find block containing chunk 0x1bc6dd0
LOG:  worker process: [kafka consumer] summary_stream <- summary.missing.1 (PID 1075) was terminated by signal 11: Segmentation fault
LOG:  terminating any other active server processes
WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
LOG:  all server processes terminated; reinitializing
@usmanm usmanm added the bug label May 26, 2016
@usmanm usmanm self-assigned this May 26, 2016
@johnistan
Copy link
Author

is this a 0.9 kafka issue? what version of brokers do you support?

@usmanm
Copy link
Collaborator

usmanm commented May 26, 2016

Unsure what's causing this. I'll look into it and report back.

@vryzhov
Copy link

vryzhov commented May 29, 2016

Two (different) instances of Segmentation fault. Maybe related to the reported issue.

Segmentation fault (PID 15086)
version: PipelineDB at revision on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 4.8.4-2ubuntu1~14.04.3) 4.8.4, 64-bit backtrace:
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (debug_segfault+0x15)[0x60eb65]
/lib/x86_64-linux-gnu/libc.so.6(+0x36d40)[0x7f9ff0c43d40]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x7d56c2]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (palloc0+0x31)[0x7d65b1]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (StreamTupleStateCreate+0x32)[0x6628d2] pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (SendTuplesToContWorkers+0x191)[0x652621]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (CopyIntoStream+0x47)[0x652777]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (DoCopy+0x669)[0x560f09]
/data/pipelinedb/lib/pipelinedb/pipeline_kafka.so(kafka_consume_main+0x9da)[0x7f9ff07bf46a]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (StartBackgroundWorker+0x19b)[0x66a3fb]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x676695]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x6771d5]
/lib/x86_64-linux-gnu/libc.so.6(+0x36d40)[0x7f9ff0c43d40]
/lib/x86_64-linux-gnu/libc.so.6(__select+0x13)[0x7f9ff0cfed83]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x470119]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (PostmasterMain+0xea4)[0x678364]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (main+0x3ee)[0x4713ce]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5)[0x7f9ff0c2eec5]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x471435]

Segmentation fault (PID 25066)
version: PipelineDB at revision on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 4.8.4-2ubuntu1~14.04.3) 4.8.4, 64-bit backtrace:
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (debug_segfault+0x15)[0x60eb65]
/lib/x86_64-linux-gnu/libc.so.6(+0x36d40)[0x7f9ff0c43d40]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (MemoryContextDeleteChildren+0x4)[0x7d5ec4]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (MemoryContextDelete+0x9)[0x7d5e69]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (MemoryContextReset+0x15)[0x7d5ef5]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (errstart+0x2fe)[0x7b680e]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (elog_finish+0xb3)[0x7ba0c3]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x7d561d]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (replace_text+0x63)[0x797b03]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (DirectFunctionCall3Coll+0x77)[0x7bc847]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x7b5581]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (errcontext_msg+0xae)[0x7b9a7e]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (CopyFromErrorCallback+0x9c)[0x55cd1c]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (errfinish+0x67)[0x7b6b27]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x71bb6a]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x71cf02]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x71f423]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x7205c3]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x721f43]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x71ce94]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x71f423]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (pg_parse_json+0x204)[0x723554]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (json_in+0x2d)[0x724e9d]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (InputFunctionCall+0xa4)[0x7be324]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (NextCopyFrom+0x238)[0x560388]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (DoCopy+0x558)[0x560df8]
/data/pipelinedb/lib/pipelinedb/pipeline_kafka.so(kafka_consume_main+0x9da)[0x7f9ff07bf46a]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic (StartBackgroundWorker+0x19b)[0x66a3fb]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x676695]
pipeline-server: bgworker: [kafka consumer] ip_stream <- kafka_topic [0x6771d5]
/lib/x86_64-linux-gnu/libc.so.6(+0x36d40)[0x7f9ff0c43d40]
/lib/x86_64-linux-gnu/libc.so.6(__select+0x13)[0x7f9ff0cfed83]

@usmanm
Copy link
Collaborator

usmanm commented May 29, 2016

Is there someway I can repro this?

@vryzhov
Copy link

vryzhov commented May 30, 2016

I have restated the consumer with format = 'json' and have not seen any errors so far. Our payload is quite big, the message of ~ 20K-30K characters is not unusual, sometimes even bigger. It could be some buffer overflow when `format = 'text', but I am speculating.

I'll keep watching the stream overnight and update the thread.

@vryzhov
Copy link

vryzhov commented May 31, 2016

I have been watching it for some time now, but can't pinpoint the source of error.
Below are some observations that may be helpful:

1. Segfault are sporadic, but almost always occur after Receive failed: Disconnected message as reported by @jofusa
2. Not each "Receive failed" is followed by an error. There are some extended periods when the system works fine despite of these Disconnects
3. Receive failed: Disconnected happen in 10 minutes intervals starting from the pipeline_kafka.consume_begin() call. Some of them may be skipped, but 10 minutes pattern is very clear.

Here is an example. Notice the timestamp sequence.

[2016-05-30 09:21:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 09:31:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 09:41:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 09:51:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 10:01:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 10:11:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 10:21:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 10:31:55 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-30 10:31:55 PDT] LOG: 00000: worker process: [kafka consumer] ip_stream <- geoip-annotated.3 (PID 22050) was terminated by signal 11: Segmentation fault
[2016-05-30 10:31:55 PDT] LOCATION: LogChildExit, postmaster.c:3561
[2016-05-30 10:31:55 PDT] LOG: 00000: terminating any other active server processes
[2016-05-30 10:31:55 PDT] LOCATION: HandleChildCrash, postmaster.c:3249

4. Once in an while the error log contains more low level information with backtrace identical (except for memory addresses) to the first snippet above (that which contains references to StreamTupleStateCreate and palloc).

5. Most crashes results in the database restart in recovery mode.

6. Very rarely I see lines suggesting binary data coming from the stream

[2016-05-31 11:03:41 PDT] LOG: 00000: [kafka consumer]: intkafka71:9092/71: Receive failed: Disconnected
[2016-05-31 11:03:41 PDT] CONTEXT: COPY ip_stream, line 863: "<80><BD>6<F6>%<D7>^A"
[2016-05-31 11:03:41 PDT] LOCATION: consumer_logger, pipeline_kafka.c:321

Most of the time CONTEXT: COPY looks legitimate, but I can only see the beginning of the message.

7. The text Receive failed: Disconnected itself is generated by librdkafka when a broker error is encountered. (https://github.com/edenhill/librdkafka/blob/52c3fa28f84cdca41e607def6e4035012b14f209/src/rdkafka_broker.c, lines 1168-1172). The code suggests it's not considered an error. I use the most recent versions of librdkafka and pipeline_kafka (I had to add -lssl to link it to the new librdkafka)

8. Stable periodicity of these Disconnects with possible garbage coming from the topic makes me think of a kafka process checking some condition each 10 minutes and abruptly shutting down brokers as needed. We use kafka 0.9. Unfortunately I am unable to confirm this theory. :(

9. I may try to reduce the payload size (as I explained, it's quite big) and see if that makes any difference. It will take me a while though. I will report back in case something interesting pops up. Let me know please if you have any other ideas to test.

Cheers,
vr

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

Thanks so much for postmortem report @vryzhov! I'll look into this tomorrow and report my findings + fix the issue.

@loadzero
Copy link

loadzero commented Jun 1, 2016

Hi @vryzhov, this could possibly be a memory leak issue, as I am currently looking at this also. To determine this, you can use htop and look at the memory being consumed. If it continually increases, then there is a leak.

You may also get messages like this in your kernel ring buffer:

ubuntu@host:~$ dmesg |grep Killed
[ 9360.365633] Killed process 9392 (pipelinedb) total-vm:4157816kB, anon-rss:3475152kB, file-rss:78228kB

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

Yes, you are right, I can see out of memory errors

~$ dmesg |grep Killed
[2772235.351863] Killed process 7616 (pipeline-server) total-vm:39943008kB, anon-rss:30437932kB, file-rss:145176kB
[2772235.364636] Killed process 7618 (pipeline-server) total-vm:39943008kB, anon-rss:30438628kB, file-rss:145332kB
[2795265.537487] Killed process 14854 (pipeline-server) total-vm:40008584kB, anon-rss:30437832kB, file-rss:144924kB

~$ dmesg | grep -i memory
[2772235.340642] [<ffffffff811537c4>] out_of_memory+0x414/0x450
[2772235.340833] Out of memory: Kill process 7616 (pipeline-server) score 930 or sacrifice child
[2772235.360775] [<ffffffff811537c4>] out_of_memory+0x414/0x450
[2772235.360965] Out of memory: Kill process 7618 (pipeline-server) score 930 or sacrifice child
[2795265.533026] [<ffffffff811537c4>] out_of_memory+0x414/0x450
[2795265.533221] Out of memory: Kill process 14854 (pipeline-server) score 930 or sacrifice child

They are killed by signal 9:

~$ grep -E "7616|7618|14854" *.log
LOG: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 7616) was terminated by signal 9: Killed
2016-05-30 01:55:51 PDT: LOG: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 14854) was terminated by signal 9: Killed
PID: 17616
PID: 17618

Most of the errors are segmentation faults, signal 11:

~$ grep Segmentation *.log | tail -3
[2016-05-31 15:19:33 PDT] LOG: 00000: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 24383) was terminated by signal 11: Segmentation fault
[2016-05-31 15:39:29 PDT] LOG: 00000: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 26281) was terminated by signal 11: Segmentation fault
[2016-05-31 17:50:56 PDT] LOG: 00000: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 26998) was terminated by signal 11: Segmentation fault

~$ grep " was terminated by signal" *.log | sed -e 's/.*\:\(.*\)/\1/g' | sort | uniq -c
1 Aborted
2 Killed
39 Segmentation fault

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

This explains the magic 10 minute interval you're seeing: confluentinc/librdkafka#437

@johnistan
Copy link
Author

Thanks for digging. Can we confirm the failure is a memory leak. The 10
minute logging seems like a nuseance but not fatal.

On Wed, Jun 1, 2016, 8:51 AM Usman Masood [email protected] wrote:

This explains the magic 10 minute interval you're seeing:
confluentinc/librdkafka#437
confluentinc/librdkafka#437


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#18 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAw3nUyUMY01bue0GBEYC7E9cdw_lFA0ks5qHZxVgaJpZM4In_Q5
.

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

@usmanm: Thank you for the link and reference on librdkafka page

Here are more logs illustrating palloc() error.

[2016-06-01 03:58:57.521 PDT] LOG: 00000: [kafka consumer] qos_stream <- geoip-annotated.3 failed to process batch, dropped 3000 messages:
[2016-06-01 03:58:57.521 PDT] LOCATION: kafka_consume_main, pipeline_kafka.c:919
[2016-06-01 03:58:57.521 PDT] ERROR: XX000: invalid memory alloc request size 18446744073709551604
[2016-06-01 03:58:57.521 PDT] CONTEXT: COPY qos_stream, line 2524: "<F8>3ESC%4<D7>^A"
[2016-06-01 03:58:57.521 PDT] LOCATION: palloc, mcxt.c:824

[2016-06-01 03:58:57.698 PDT] LOG: 00000: worker process: [kafka consumer] qos_stream <- geoip-annotated.3 (PID 10079) was terminated by signal 11: Segmentation fault
[2016-06-01 03:58:57.698 PDT] LOCATION: LogChildExit, postmaster.c:3561

@loadzero; @jofusa: Yes, the memory leak is real.

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

I'm trying to repro the memory leak, but that doesn't seem like the issue being causes by the timeouts. The segfaults I think are being causes by the Postgres logging system not being thread safe (even though there's a mutex about the elog function call).

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

@vryzhov: Could you give me details of your setup? # of brokers, # partitions and parallelism in pipeline_kafka.consume_begin?

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

3 brokers, 3 partitions, parallel = 1

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

@usmanm: I suspected that there is something weird with the logging. Log messages are not always the same - some of them may go missing. Last night I added some tracing in the code by calling elog(). There is no consistency in the results - some messages may be there and some not.

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

I'm still trying to repro the segfault and memory leak. Once @loadzero gets in, I'll ask him to help me repro the leak. These broker disconnects happen during idle periods?

@johnistan
Copy link
Author

In my example it was during high load. Replaying a consumer from the start
of a topic.

On Wed, Jun 1, 2016, 11:50 AM Usman Masood [email protected] wrote:

I'm still trying to repro the segfault and memory leak. Once @loadzero
https://github.com/loadzero gets in, I'll ask him to help me repro the
leak. These broker disconnects happen during idle periods?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#18 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAw3nRYtTHYYvYgTdPqY7X4bn9YhPrwsks5qHcZOgaJpZM4In_Q5
.

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

I also was replaying from the beginning. Still catching up

Sent from my iPhone

On Jun 1, 2016, at 11:05 AM, John Dennison [email protected] wrote:

In my example it was during high load. Replaying a consumer from the start
of a topic.

On Wed, Jun 1, 2016, 11:50 AM Usman Masood [email protected] wrote:

I'm still trying to repro the segfault and memory leak. Once @loadzero
https://github.com/loadzero gets in, I'll ask him to help me repro the
leak. These broker disconnects happen during idle periods?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#18 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAw3nRYtTHYYvYgTdPqY7X4bn9YhPrwsks5qHcZOgaJpZM4In_Q5
.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

I'm going to push a fix out in a bit!

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

@jofusa Illustration of the memory leak: free memory in 5 sec intervals. Crashes are due to Segfaults
image

@usmanm
Copy link
Collaborator

usmanm commented Jun 1, 2016

Can you guys try building the master branch and see if that resolves the segfault issue?

@vryzhov
Copy link

vryzhov commented Jun 1, 2016

Will do later today.

On Wed, Jun 1, 2016 at 2:27 PM, Usman Masood [email protected]
wrote:

Can you guys try building the master branch and see if that resolves the
segfault issue?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#18 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AOzpFIRdfGWpSCi7mFAkJ_aryo-g5FiRks5qHfksgaJpZM4In_Q5
.

@vryzhov
Copy link

vryzhov commented Jun 2, 2016

I have it running for a little over 2 hours by now. It all looks pretty awesome so far. No issues at all. Memory is stable. I see "Receive failed: Disconnected" messages in the log, but there are none of segfaults. I'll keep it running overnight and report tomorrow.

@usmanm
Copy link
Collaborator

usmanm commented Jun 2, 2016

Great, let me know if you see any issues!

@vryzhov
Copy link

vryzhov commented Jun 2, 2016

Everything is working flawlessly. No issues at all since my last comment.
@usmanm, @loadzero, Thank you so much for the quick response and the awesome work you do!!

@usmanm
Copy link
Collaborator

usmanm commented Jun 2, 2016

You're welcome! Happy to hear things are working smoothly now. Thanks for all your help in figuring this out!

@gb198871
Copy link

hi,
I only use pipeline_kafka.consume_begin is no propelm, but use pipeline_kafka.emit_tuple have this propelm.

CREATE STREAM kafka_userlog_stream (time_tamp bigint, uuid text, age bigint,a1 text,a2 bigint,a3 numeric,a4 bool);

CREATE CONTINUOUS VIEW LOG_COUNT_VIEW WITH (sw = '30 minute') as select count(*) from kafka_userlog_stream;

CREATE TABLE DIM_AGE(begin_age int,end_age int, catalog varchar(300),PRIMARY KEY(begin_age,end_age));
insert into DIM_AGE values(0,30,'少年');
insert into DIM_AGE values(30,60,'中年');
insert into DIM_AGE values(60,120,'老年');

CREATE CONTINUOUS TRANSFORM CT_USER_AGE_CATALOG_TOKAFKA AS SELECT s.time_tamp::bigint, s.uuid::text,a.begin_age::int,s.a2::text,a.catalog::text FROM kafka_userlog_stream s JOIN DIM_AGE a ON s.age >= a.begin_age and s.age >= a.end_age THEN EXECUTE PROCEDURE pipeline_kafka.emit_tuple('pipelinedbTriggerTest');

SELECT pipeline_kafka.consume_begin('pipelineUserlog', 'kafka_userlog_stream',format := 'text', delimiter := E'|',batchsize := 1000, maxbytes := 32000000, parallelism := 5,start_offset := '-1');

error log:

LOG: worker process: worker0 [pipeline] (PID 16887) was terminated by signal 11: Segmentation fault
DETAIL: Failed process was running: worker0 [pipeline]
LOG: terminating any other active server processes
WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and repeat your command.
WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and repeat your command.
WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and repeat your command.
LOG: all server processes terminated; reinitializing
LOG: database system was interrupted; last known up at 2017-11-06 18:14:24 CST
LOG: database system was not properly shut down; automatic recovery in progress
LOG: redo starts at 18/44E7DEE8
LOG: unexpected pageaddr 17/FA5CC000 in log segment 000000010000001800000045, offset 6078464
LOG: redo done at 18/455CBB68
LOG: last completed transaction was at log time 2017-11-06 18:16:55.555403+08
LOG: MultiXact member wraparound protections are now enabled

@derekjn
Copy link
Contributor

derekjn commented Nov 17, 2017

@gb198871 let's use #72 for the remainder of the conversation as we get your issue fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants