-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hr/streaming #87
Hr/streaming #87
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix timestamps and test cases.
Support: hls, dash, mp4 Supporting SRT: We can not install the pyav with poetry with this flag: --no-binary |
.devcontainer/devcontainer.json
Outdated
@@ -20,7 +21,8 @@ | |||
}, | |||
"mounts": [ | |||
"source=${localEnv:PATH_NAS:/nas},target=/nas,type=bind", | |||
"source=${localEnv:PATH_NAS2:/nas2},target=/nas2,type=bind" | |||
"source=${localEnv:PATH_NAS2:/nas2},target=/nas2,type=bind", | |||
"source=${localEnv:HOME}/.ssh,target=/root/.ssh,readonly,type=bind" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why .ssh is mounted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It provided the ssh key of github, so we can push commits inside the container (in case of using github key).
I can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
aana/models/pydantic/stream_input.py
Outdated
|
||
@field_validator("media_id") | ||
@classmethod | ||
def media_id_must_not_be_empty(cls, media_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it already checked in MediaId type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you are right. But we also have it inside of image_input. I will remove both if funtionality works with the media_id itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
aana/models/pydantic/stream_input.py
Outdated
|
||
@field_validator("url") | ||
@classmethod | ||
def check_url(cls, url: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pydantic has Url type: https://docs.pydantic.dev/latest/api/networks/
Can we use it instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we can do it. I will check to see if it allows ip and port or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
aana/utils/video.py
Outdated
|
||
# Check the stream channel be valid | ||
if len(available_streams) == 0 or channel >= len(available_streams): | ||
raise StreamReadingException(stream_url, msg="selected channel does not exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to mention which channel you are trying to use and how many channels does the stream have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
aana/models/pydantic/stream_input.py
Outdated
description=("the desired channel of stream"), | ||
) | ||
|
||
extract_fps: float = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if extract_fps belongs here. It's kinda specific to frame extraction. If we want to do audio transcription it wouldn't even be necessary. We can keep it here for now but I think it doesn't belong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was single value, so I did not add another class for it.
I've tried the example project for streaming on news stream but I think it lags behind but I don't know. |
aana/utils/video.py
Outdated
for frame in packet.decode(): | ||
if frame_number % frame_rate == 0: | ||
img = Image(numpy=frame.to_rgb().to_ndarray()) | ||
packet_timestamp = packet.dts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the unit for the timestamps? It is definitely not seconds, diff between adjacent frames is 28800. I suggest we convert it into seconds for standardization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
I think I found the way to calculate lag with existing output: import requests, json, time
data = {
"stream": {
# "url": "https://www.youtube.com/watch?v=9bZkp7q19f0",
"url": "https://tagesschau.akamaized.net/hls/live/2020117/tagesschau/tagesschau_3/master-720p-3200.m3u8",
"media_id": "9bZkp7q19f0",
}
}
response = requests.post(
"http://localhost:8000/stream/caption_stream",
data={"body": json.dumps(data)},
stream=True,
)
init_timestamp = None
init_time = None
for chunk in response.iter_content(chunk_size=None):
chunk = json.loads(chunk)
if not init_timestamp:
init_timestamp = chunk["timestamps"][0]/100000
init_time = time.time()
# calculate lag
current_time = time.time()
time_diff = current_time - init_time
timestamp_diff = chunk["timestamps"][0]/100000 - init_timestamp
lag = time_diff - timestamp_diff
print(f"Time diff: {time_diff:.2f}s")
print(f"Timestamp diff: {timestamp_diff:.2f}s")
print(f"Lag: {lag:.2f}s")
print (chunk) And it does lags behind:
I even tried to change extract_fps to 1 but it still lags. |
If I understand correctly, the problem is that frames come in faster than the pipeline can process them, correct? Then we need to either make the pipeline on average faster (e.g. larger batches) or process fewer frames. Processing fewer frames can mean janky results but it may not be possible to make the pipeline sufficiently fast in any case. |
Generally, yes. A faster pipeline means less lag. But that's not the only way to reduce lag. Right now, we are processing one batch of images after another sequentially. The more efficient way would be to process multiple batches at once, like we did it with Mobius Pipeline. I actually tried an older commit (bd49a27) that still runs on Mobius Pipeline and it's much faster, there is no lag (it's actually negative because of the way that I calculate it), even at 3 fps. |
Update on the lag issues Here is a function that runs multiple concurrent tasks on the generator yields: async def run_async_generator_concurrently(async_generator, process, batch_size=2):
queue = asyncio.Queue(batch_size * 2)
result_queue = asyncio.Queue()
num_consumers = batch_size
async def producer():
async for i in async_generator:
await queue.put(i)
# Signal all consumers to shut down by putting a sentinel for each consumer in the queue
for _ in range(num_consumers):
await queue.put(None)
async def consumer():
while True:
item = await queue.get()
if item is None: # Check for the sentinel
queue.task_done()
break
result = await process(item)
await result_queue.put(result)
queue.task_done()
consumers = [consumer() for _ in range(num_consumers)]
# # Setup the producer and consumers to run concurrently
# await asyncio.gather(producer(), *consumers)
producer_task = asyncio.create_task(producer())
consumer_tasks = [asyncio.create_task(c) for c in consumers]
# Yield all results as they are processed
while True:
if (
result_queue.empty()
and all(c.done() for c in consumer_tasks)
and producer_task.done()
):
break
result = await result_queue.get()
yield result
result_queue.task_done()
# Wait for all tasks to complete
await producer_task
for task in consumer_tasks:
await task You can try it on the toy example. import asyncio
async def async_generator(n):
for i in range(n):
yield i
async def process(i):
await asyncio.sleep(0.1)
print(f"i * i = {i * i}")
return i * i
gen = async_generator(10)
async for item in run_async_generator_concurrently(gen, process, 5):
print(item) With 5 consumers it runs 5x faster. So it seems to work. But when I add it to the endpoint it doesn't seems to help much. class CaptionStreamEndpoint(Endpoint):
"""Transcribe video in chunks endpoint."""
async def initialize(self):
"""Initialize the endpoint."""
self.captioning_handle = await AanaDeploymentHandle.create(
"captioning_deployment"
)
async def run(
self,
stream: StreamInput,
batch_size: int,
) -> AsyncGenerator[CaptionStreamOutput, None]:
"""Transcribe video in chunks."""
async def predict_captions(frames_dict):
captioning_output = await self.captioning_handle.generate_batch(
images=frames_dict["frames"]
)
# print("captioning_output", captioning_output)
return {
"captions": captioning_output["captions"],
"timestamps": frames_dict["timestamps"],
}
gen = run_remote(fetch_stream_frames)(stream_input=stream, batch_size=2)
async for item in run_async_generator_concurrently(
gen, predict_captions, batch_size
):
yield item The version with Mobius Pipeline still has a lower lag. But I run it for longer and the lag always goes up. I've tried to reduce FPS, use a version with Mobius Pipeline, and add more consumers. Nothing works, it starts to increase after some time. I've tried to debug it by looking at the BLIP2 latency in metrics but it's not clear to me what it means. BTW Ray added a command to start Prometheus locally but it's only available in the latest versions so we need to update Ray. See https://docs.ray.io/en/latest/cluster/metrics.html#quickstart-running-prometheus-locally. What it all means I'm not sure yet. There is definitely something wrong and we need to figure it out before we can say that we have streaming support. |
Temporarily paused. |
Adding streaming support for the aana
The stream can support fps and channel number
I added fps and channel inside the stream input, but we can do the same with different input, something like video_params