Skip to content

Commit

Permalink
Fix video decoder cache for multiple GPUs (#4517)
Browse files Browse the repository at this point in the history
- the video decoder cache is global and doesn't
  account for multiple GPUs in the system. In the end,
  the decoder created for one GPU can be used in another
  causing failure. This PR adds device_id logic to the
  decoder cache

Signed-off-by: Janusz Lisiecki <[email protected]>
  • Loading branch information
JanuszL authored and stiepan committed Dec 16, 2022
1 parent ebaa9cf commit 5dfe17b
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 39 deletions.
75 changes: 43 additions & 32 deletions dali/operators/reader/loader/video/frames_decoder_gpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,44 @@ const char *codec_to_string(cudaVideoCodec in) {

class NVDECCache {
public:
static NVDECCache &GetCache() {
static NVDECCache cache_inst;
return cache_inst;
static NVDECCache &GetCache(int device_id = -1) {
static NVDECCache cache_inst[32];
if (device_id == -1) {
CUDA_CALL(cudaGetDevice(&device_id));
}
return cache_inst[device_id];
}

static NVDECLease GetDecoderFromCache(CUVIDEOFORMAT *video_format, int device_id = -1) {
if (device_id == -1) {
CUDA_CALL(cudaGetDevice(&device_id));
}
return GetCache(device_id).GetDecoder(video_format, device_id);
}

void ReturnDecoder(DecInstance &decoder) {
std::unique_lock lock(access_lock);
auto range = dec_cache.equal_range(decoder.codec_type);
for (auto it = range.first; it != range.second; ++it) {
if (it->second.decoder == decoder.decoder) {
it->second.used = false;
return;
}
}
DALI_FAIL("Cannot return decoder that is not from the cache");
}

private:
NVDECCache() {}

~NVDECCache() {
std::lock_guard lock(access_lock);
for (auto &it : dec_cache) {
cuvidDestroyDecoder(it.second.decoder);
}
}

NVDECLease GetDecoder(CUVIDEOFORMAT *video_format) {
NVDECLease GetDecoder(CUVIDEOFORMAT *video_format, int device_id) {
std::unique_lock lock(access_lock);

auto codec_type = video_format->codec;
Expand All @@ -100,8 +132,7 @@ class NVDECCache {
num_decode_surfaces = 20;

auto range = dec_cache.equal_range(codec_type);

std::unordered_map<cudaVideoCodec, DecInstance>::iterator best_match = range.second;
codec_map::iterator best_match = range.second;
for (auto it = range.first; it != range.second; ++it) {
if (best_match == range.second && it->second.used == false) {
best_match = it;
Expand All @@ -111,6 +142,7 @@ class NVDECCache {
it->second.chroma_format == chroma_format &&
it->second.bit_depth_luma_minus8 == bit_depth_luma_minus8) {
it->second.used = true;
assert(it->second.device_id == device_id);
return NVDECLease(it->second);
}
}
Expand Down Expand Up @@ -209,6 +241,7 @@ class NVDECCache {
decoder_inst.chroma_format = chroma_format;
decoder_inst.bit_depth_luma_minus8 = bit_depth_luma_minus8;
decoder_inst.used = true;
decoder_inst.device_id = device_id;

lock.lock();
dec_cache.insert({codec_type, decoder_inst});
Expand All @@ -226,38 +259,16 @@ class NVDECCache {
return NVDECLease(decoder_inst);
}

void ReturnDecoder(DecInstance &decoder) {
std::unique_lock lock(access_lock);
auto range = dec_cache.equal_range(decoder.codec_type);
for (auto it = range.first; it != range.second; ++it) {
if (it->second.decoder == decoder.decoder) {
it->second.used = false;
return;
}
}
DALI_FAIL("Cannot return decoder that is not from the cache");
}

private:
NVDECCache() {}

~NVDECCache() {
std::scoped_lock lock(access_lock);
for (auto &it : dec_cache) {
cuvidDestroyDecoder(it.second.decoder);
}
}

std::unordered_multimap<cudaVideoCodec, DecInstance> dec_cache;

using codec_map = std::unordered_multimap<cudaVideoCodec, DecInstance>;
codec_map dec_cache;
std::mutex access_lock;

static constexpr int CACHE_SIZE_LIMIT = 100;
};

NVDECLease::~NVDECLease() {
if (decoder.used) {
frame_dec_gpu_impl::NVDECCache::GetCache().ReturnDecoder(decoder);
frame_dec_gpu_impl::NVDECCache::GetCache(decoder.device_id).ReturnDecoder(decoder);
}
}

Expand Down Expand Up @@ -332,7 +343,7 @@ cudaVideoCodec FramesDecoderGpu::GetCodecType() {
void FramesDecoderGpu::InitGpuDecoder(CUVIDEOFORMAT *video_format) {
if (!nvdecode_state_->decoder) {
is_full_range_ = video_format->video_signal_description.video_full_range_flag;
nvdecode_state_->decoder = frame_dec_gpu_impl::NVDECCache::GetCache().GetDecoder(video_format);
nvdecode_state_->decoder = frame_dec_gpu_impl::NVDECCache::GetDecoderFromCache(video_format);
}
}

Expand Down
1 change: 1 addition & 0 deletions dali/operators/reader/loader/video/frames_decoder_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct DecInstance {
unsigned max_width = 0;
unsigned int bit_depth_luma_minus8 = 0;
bool used = false;
int device_id = 0;
};

class NVDECLease {
Expand Down
43 changes: 41 additions & 2 deletions dali/test/python/decoder/test_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import cv2
import nvidia.dali.types as types
import glob
from test_utils import get_dali_extra_path
from itertools import cycle
from test_utils import get_dali_extra_path, is_mulit_gpu
from nvidia.dali.backend import TensorListGPU
from nose2.tools import params

from nose import SkipTest
from nose.plugins.attrib import attr

filenames = glob.glob(f'{get_dali_extra_path()}/db/video/[cv]fr/*.mp4')
# filter out HEVC because some GPUs do not support it
Expand Down Expand Up @@ -144,3 +146,40 @@ def test_pipeline():
right = out
absdiff = np.abs(left.astype(int) - right.astype(int))
assert np.mean(absdiff) < 2


@attr('multi_gpu')
@params('cpu', 'mixed')
def test_multi_gpu_video(device):
if not is_mulit_gpu():
raise SkipTest()

batch_size = 1

def input_gen(batch_size):
filenames = glob.glob(f'{get_dali_extra_path()}/db/video/[cv]fr/*.mp4')
filenames = filter(lambda filename: 'mpeg4' not in filename, filenames)
filenames = filter(lambda filename: 'hevc' not in filename, filenames)
filenames = cycle(filenames)
while True:
batch = []
for _ in range(batch_size):
batch.append(np.fromfile(next(filenames), dtype=np.uint8))
yield batch

@pipeline_def
def test_pipeline():
vid = fn.external_source(device='cpu', source=input_gen(batch_size))
seq = fn.experimental.decoders.video(vid, device=device)
return seq

video_pipeline_0 = test_pipeline(batch_size=1, num_threads=1, device_id=0)
video_pipeline_1 = test_pipeline(batch_size=1, num_threads=1, device_id=1)

video_pipeline_0.build()
video_pipeline_1.build()

iters = 5
for _ in range(iters):
video_pipeline_0.run()
video_pipeline_1.run()
11 changes: 11 additions & 0 deletions dali/test/python/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def get_arch(device_id=0):
return compute_cap


def is_mulit_gpu():
try:
import pynvml
pynvml.nvmlInit()
is_mulit_gpu_var = pynvml.nvmlDeviceGetCount() != 1
except ModuleNotFoundError:
print("Python bindings for NVML not found")

return is_mulit_gpu_var


def get_dali_extra_path():
try:
dali_extra_path = os.environ['DALI_EXTRA_PATH']
Expand Down
3 changes: 1 addition & 2 deletions qa/TL0_multigpu/test_body.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ test_py_with_framework() {
}

test_py() {
# placeholder function
:
${python_new_invoke_test} -s decoder -A 'multi_gpu'
}

test_gtest() {
Expand Down
2 changes: 1 addition & 1 deletion qa/TL0_multigpu/test_nofw.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e
# used pip packages
pip_packages='${python_test_runner_package}'
pip_packages='${python_test_runner_package} numpy>=1.17 librosa==0.8.1 scipy nvidia-ml-py==11.450.51 psutil dill cloudpickle opencv-python'
target_dir=./dali/test/python

# test_body definition is in separate file so it can be used without setup
Expand Down
4 changes: 2 additions & 2 deletions qa/TL0_python-self-test-readers-decoders/test_body.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
test_py_with_framework() {
# numpy seems to be extremly slow with sanitizers to dissable it
if [ -n "$DALI_ENABLE_SANITIZERS" ]; then
FILTER_PATTERN="test_external_source_parallel.py\|test_external_source_parallel_custom_serialization\|test_external_source_parallel_garbage_collection_order"
FILTER_PATTERN="test_external_source_parallel.py\|test_external_source_parallel_custom_serialization\|test_external_source_parallel_garbage_collection_order"
else
FILTER_PATTERN="#"
fi
Expand Down Expand Up @@ -37,7 +37,7 @@ test_py_with_framework() {
${python_new_invoke_test} -s reader
fi


${python_new_invoke_test} -s decoder
}

Expand Down

0 comments on commit 5dfe17b

Please sign in to comment.