Skip to content

Commit

Permalink
Add support for parsing PDF pages in parallel (multiprocessing) (#17)
Browse files Browse the repository at this point in the history
Parse in parallel using multiprocessing library using available CPUs
  • Loading branch information
phoewass authored Apr 3, 2024
1 parent 567520b commit d606d88
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 11 deletions.
6 changes: 6 additions & 0 deletions camelot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def set_config(self, key, value):
default="1",
help="Comma-separated page numbers." " Example: 1,3,4 or 1,4-end or all.",
)
@click.option(
"--parallel",
is_flag=True,
default=False,
help="Read pdf pages in parallel using all CPU cores.",
)
@click.option("-pw", "--password", help="Password for decryption.")
@click.option("-o", "--output", help="Output file path.")
@click.option(
Expand Down
72 changes: 61 additions & 11 deletions camelot/handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import multiprocessing as mp
import os
import sys
from pathlib import Path
Expand Down Expand Up @@ -143,7 +144,12 @@ def _save_page(self, filepath: Union[StrByteType, Path], page, temp):
instream.close()

def parse(
self, flavor="lattice", suppress_stdout=False, layout_kwargs=None, **kwargs
self,
flavor="lattice",
suppress_stdout=False,
parallel=False,
layout_kwargs=None,
**kwargs
):
"""Extracts tables by calling parser.get_tables on all single
page PDFs.
Expand All @@ -153,8 +159,10 @@ def parse(
flavor : str (default: 'lattice')
The parsing method to use ('lattice' or 'stream').
Lattice is used by default.
suppress_stdout : str (default: False)
suppress_stdout : bool (default: False)
Suppress logs and warnings.
parallel : bool (default: False)
Process pages in parallel using all available cpu cores.
layout_kwargs : dict, optional (default: {})
A dict of `pdfminer.layout.LAParams
<https://github.com/euske/pdfminer/blob/master/pdfminer/layout.py#L33>`_ kwargs.
Expand All @@ -171,14 +179,56 @@ def parse(
layout_kwargs = {}

tables = []
parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs)
with TemporaryDirectory() as tempdir:
for p in self.pages:
self._save_page(self.filepath, p, tempdir)
pages = [os.path.join(tempdir, f"page-{p}.pdf") for p in self.pages]
parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs)
for p in pages:
t = parser.extract_tables(
p, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs
)
tables.extend(t)
cpu_count = mp.cpu_count()
# Using multiprocessing only when cpu_count > 1 to prevent a stallness issue
# when cpu_count is 1
if parallel and len(self.pages) > 1 and cpu_count > 1:
with mp.get_context("spawn").Pool(processes=cpu_count) as pool:
jobs = []
for p in self.pages:
j = pool.apply_async(
self._parse_page,(p, tempdir, parser, suppress_stdout, layout_kwargs)
)
jobs.append(j)

for j in jobs:
t = j.get()
tables.extend(t)
else:
for p in self.pages:
t = self._parse_page(p, tempdir, parser, suppress_stdout, layout_kwargs)
tables.extend(t)

return TableList(sorted(tables))

def _parse_page(
self, page, tempdir, parser, suppress_stdout, layout_kwargs
):
"""Extracts tables by calling parser.get_tables on a single
page PDF.
Parameters
----------
page : str
Page number to parse
parser : Lattice or Stream
The parser to use (Lattice or Stream).
suppress_stdout : bool
Suppress logs and warnings.
layout_kwargs : dict, optional (default: {})
A dict of `pdfminer.layout.LAParams <https://github.com/euske/pdfminer/blob/master/pdfminer/layout.py#L33>`_ kwargs.
Returns
-------
tables : camelot.core.TableList
List of tables found in PDF.
"""
self._save_page(self.filepath, page, tempdir)
page_path = os.path.join(tempdir, f"page-{page}.pdf")
tables = parser.extract_tables(
page_path, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs
)
return tables
4 changes: 4 additions & 0 deletions camelot/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def read_pdf(
password=None,
flavor="lattice",
suppress_stdout=False,
parallel=False,
layout_kwargs=None,
**kwargs
):
Expand All @@ -37,6 +38,8 @@ def read_pdf(
Lattice is used by default.
suppress_stdout : bool, optional (default: True)
Print all logs and warnings.
parallel : bool, optional (default: False)
Process pages in parallel using all available cpu cores.
layout_kwargs : dict, optional (default: {})
A dict of `pdfminer.layout.LAParams
<https://github.com/euske/pdfminer/blob/master/pdfminer/layout.py#L33>`_ kwargs.
Expand Down Expand Up @@ -122,6 +125,7 @@ def read_pdf(
tables = p.parse(
flavor=flavor,
suppress_stdout=suppress_stdout,
parallel=parallel,
layout_kwargs=layout_kwargs,
**kwargs
)
Expand Down
20 changes: 20 additions & 0 deletions docs/user/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ By default, Camelot only uses the first page of the PDF to extract tables. To sp

The ``pages`` keyword argument accepts pages as comma-separated string of page numbers. You can also specify page ranges — for example, ``pages=1,4-10,20-30`` or ``pages=1,4-10,20-end``.

Extract tables in parallel
--------------------------

Camelot supports extracting tables in parrallel using all the available CPU cores.

::

>>> tables = camelot.read_pdf('foo.pdf', page='all', parallel=True)
>>> tables
<TableList n=1>

.. tip::
Here's how you can do the same with the :ref:`command-line interface <cli>`.
::
$ camelot --pages all --parallel lattice foo.pdf

.. note:: The reading of the PDF document is parallelized by processing pages by different CPU core.
Therefore, a document with a low page count could be slower to process in parallel.

Reading encrypted PDFs
----------------------

Expand Down
Binary file added tests/files/diesel_engines.pdf
Binary file not shown.
24 changes: 24 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ def test_cli_stream(testdir):
assert format_error in result.output


@skip_on_windows
def test_cli_parallel(testdir):
with TemporaryDirectory() as tempdir:
infile = os.path.join(testdir, "diesel_engines.pdf")
outfile = os.path.join(tempdir, "diesel_engines.csv")
runner = CliRunner()
result = runner.invoke(
cli,
[
"--parallel",
"--pages",
"1,2,3",
"--format",
"csv",
"--output",
outfile,
"lattice",
infile,
],
)
assert result.exit_code == 0
assert result.output == "Found 2 tables\n"


def test_cli_password(testdir):
with TemporaryDirectory() as tempdir:
infile = os.path.join(testdir, "health_protected.pdf")
Expand Down

0 comments on commit d606d88

Please sign in to comment.