Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improving stability and cleanup in case of disconnection / improving report in case of disconnection #131

Merged
merged 7 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
from rstream import (
AMQPMessage,
Consumer,
DisconnectionErrorInfo,
MessageContext,
amqp_decoder,
)

STREAM = "my-test-stream"


async def on_connection_closed(reason: Exception) -> None:
print("connection has been closed for reason: " + str(reason))
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)


async def consume():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import time

from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
Producer,
)

STREAM = "my-test-stream"
MESSAGES = 10000000
connection_is_closed = False


async def publish():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)

global connection_is_closed
connection_is_closed = True

await producer.close()

# avoid to use async context in this case as we are closing the producer ourself in the callback
# in this case we avoid double closing
producer = Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
)

await producer.start()
# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)

# sending a million of messages in AMQP format
start_time = time.perf_counter()

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import asyncio
import time

from rstream import AMQPMessage, Producer
from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
Producer,
)

STREAM = "my-test-stream"
MESSAGES = 1000000


async def on_connection_closed(reason: Exception) -> None:
print("connection has been closed for reason: " + str(reason))
connection_is_closed = False


async def publish():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)

# clean close or reconnect
await producer.close()
global connection_is_closed
connection_is_closed = True

async with Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
Expand All @@ -21,13 +34,17 @@ async def publish():

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
await producer.send(stream=STREAM, message=amqp_message)
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import signal

from rstream import (
AMQPMessage,
ConsumerOffsetSpecification,
DisconnectionErrorInfo,
MessageContext,
OffsetType,
SuperStreamConsumer,
amqp_decoder,
)

count = 0


async def on_message(msg: AMQPMessage, message_context: MessageContext):
global count
count += 1
if (count % 100000) == 0:
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))


async def consume():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)
await consumer.close()

consumer = SuperStreamConsumer(
host="localhost",
port=5552,
vhost="/",
username="guest",
password="guest",
super_stream="invoices",
connection_closed_handler=on_connection_closed,
)

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None)
await consumer.start()
await consumer.subscribe(
callback=on_message, decoder=amqp_decoder, offset_specification=offset_specification
)
await consumer.run()


asyncio.run(consume())
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import time

from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
RouteType,
SuperStreamProducer,
)

SUPER_STREAM = "invoices"
MESSAGES = 10000000

connection_is_closed = False


async def publish():
# this value will be hashed using mumh3 hashing algorithm to decide the partition resolution for the message
async def routing_extractor(message: AMQPMessage) -> str:
return message.application_properties["id"]

async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:

print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)
global connection_is_closed
connection_is_closed = True

await super_stream_producer.close()

# avoiding using async context as we close the producer ourself in on_connection_closed callback
super_stream_producer = SuperStreamProducer(
"localhost",
username="guest",
password="guest",
routing_extractor=routing_extractor,
routing=RouteType.Hash,
connection_closed_handler=on_connection_closed,
super_stream=SUPER_STREAM,
)

await super_stream_producer.start()

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
application_properties={"id": "{}".format(i)},
)

# send is asynchronous
if connection_is_closed is False:
await super_stream_producer.send(message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())
3 changes: 3 additions & 0 deletions rstream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from importlib import metadata

from .utils import DisconnectionErrorInfo

try:
__version__ = metadata.version(__package__)
__license__ = metadata.metadata(__package__)["license"]
Expand Down Expand Up @@ -59,4 +61,5 @@
"StreamDoesNotExist",
"OffsetSpecification",
"EventContext",
"DisconnectionErrorInfo",
]
Loading