Skip to content

Commit

Permalink
Merge pull request #13 from Demokratis-ch/feature/publish-data
Browse files Browse the repository at this point in the history
Publish preprocessed data
  • Loading branch information
vitawasalreadytaken authored Dec 4, 2024
2 parents 99eab25 + 51cb2ea commit b31e231
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ DEMOKRATIS_API_PASSWORD="secret"
EXOSCALE_SOS_ACCESS_KEY="EXO..."
EXOSCALE_SOS_SECRET_KEY="secret"
EXOSCALE_SOS_ENDPOINT="https://sos-ch-dk-2.exo.io"

# Hugging Face API token used for uploading datasets
HF_TOKEN="hf_..."
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

<a href="https://demokratis.ch">Demokratis.ch</a> |
<a href="https://join.slack.com/t/demokratispublic/shared_invite/zt-2r5uyt4j8-6U22Z53XkJakFkNYgpMm_A">Slack</a> |
<a href="mailto:[email protected]">[email protected]</a>
<a href="mailto:[email protected]">[email protected]</a> |
<a href="https://huggingface.co/demokratis">🤗 demokratis</a>

<a href="https://github.com/astral-sh/uv">
<img src="https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/uv/main/assets/badge/v0.json" alt="uv"></a>
Expand Down Expand Up @@ -87,8 +88,10 @@ To transform the web platform data into a dataset for training models, we run a
[demokratis_ml/pipelines/preprocess_consultation_documents.py](demokratis_ml/pipelines/preprocess_consultation_documents.py).
The result of this pipeline is a Parquet file conforming to the above-mentioned dataframe schema.

> [!NOTE]
> We are currently working with our data providers to make our compiled, enriched datasets publicly accessible while following all applicable laws. [Please talk to us on Slack #ml](https://join.slack.com/t/demokratispublic/shared_invite/zt-2r5uyt4j8-6U22Z53XkJakFkNYgpMm_A) to learn more about the data and gain early access.
### Our data is public
Our preprocessed dataset is automatically published to HuggingFace and you can download it directly from
[🤗 demokratis/consultation-documents](https://huggingface.co/datasets/demokratis/consultation-documents).
Don't hesitate to [talk to us on Slack #ml](https://join.slack.com/t/demokratispublic/shared_invite/zt-2r5uyt4j8-6U22Z53XkJakFkNYgpMm_A) if you have any questions about the data!

<!--
### The federal dataset: Fedlex
Expand All @@ -105,13 +108,13 @@ After completing our initial research, we are now open-sourcing our ML work to e

For problems where solid solutions have been developed, we'll be productizing the models and displaying their outputs on the main [Demokratis.ch](https://demokratis.ch) website — always with a human reviewer involved.

We are also awaiting consent from our data providers before making the datasets publicly available.

| Problem | Public dataset? | Open-source code/model? | Initial research | Proof of concept model | Deployed in production |
|-|-|-|-|-|-|
| [I. Classifying consultation topics](#i-classifying-consultation-topics) | ❌ | ❌ | ✅ | ✅ | ❌
| [II. Extracting structure from documents](#ii-extracting-structure-from-documents) | ❌ | ❌ | ✅ | ❌ | ❌
| [III. Classifying document types](#iii-classifying-document-types) | ❌ | ❌ | ✅ | ✅ | ❌
| [I. Classifying consultation topics](#i-classifying-consultation-topics) | ✅ | ❌ | ✅ | ✅ | ❌
| [II. Extracting structure from documents](#ii-extracting-structure-from-documents) | 🟠(*) | ❌ | ✅ | ❌ | ❌
| [III. Classifying document types](#iii-classifying-document-types) | ✅ | ❌ | ✅ | ✅ | ❌

_*) We haven't published our copies of the source PDFs, but our [public dataset](#our-data-is-public) does include links to the original files hosted by cantons and the federal government._

### I. Classifying consultation topics
We need to classify each new consultation into one or more topics (such as *agriculture, energy, health, ...*) so that users can easily filter and browse consultations in their area of interest. We also support email notifications, where users can subscribe to receive new consultations on their selected topics by email.
Expand Down
2 changes: 1 addition & 1 deletion demokratis_ml/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ uv run prefect block register --file demokratis_ml/pipelines/blocks.py
Run this to create block documents (configured block instances), and repeat every time they change:

```
PYTHONPATH=. uv run demokratis_ml/pipelines/create_blocks.py
PYTHONPATH=. uv run --env-file=.env demokratis_ml/pipelines/create_blocks.py
```

Note that you will need some environment variables defined to correctly configure some secrets.
Expand Down
9 changes: 9 additions & 0 deletions demokratis_ml/pipelines/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ class DemokratisAPICredentials(prefect.blocks.core.Block):
DemokratisAPICredentials.register_type_and_schema()


class HuggingFaceDatasetUploadCredentials(prefect.blocks.core.Block):
"""Authentication token for uploading datasets to HuggingFace."""

token: pydantic.SecretStr


HuggingFaceDatasetUploadCredentials.register_type_and_schema()


class ExtendedLocalFileSystem(prefect.filesystems.LocalFileSystem):
"""Local filesystem with extra methods."""

Expand Down
28 changes: 26 additions & 2 deletions demokratis_ml/pipelines/create_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import os

from demokratis_ml.pipelines.blocks import DemokratisAPICredentials, ExtendedLocalFileSystem
import prefect.filesystems

from demokratis_ml.pipelines.blocks import (
DemokratisAPICredentials,
ExtendedLocalFileSystem,
HuggingFaceDatasetUploadCredentials,
)

demokratis_api_credentials = DemokratisAPICredentials(
username=os.environ["DEMOKRATIS_API_USERNAME"],
Expand All @@ -11,6 +17,12 @@
demokratis_api_credentials.save("demokratis-api-credentials", overwrite=True)


hf_credentials = HuggingFaceDatasetUploadCredentials(
token=os.environ["HF_TOKEN"],
)
hf_credentials.save("huggingface-dataset-upload-credentials", overwrite=True)


local_document_storage = ExtendedLocalFileSystem(basepath="data/consultation-documents")
local_document_storage.save("local-document-storage", overwrite=True)

Expand All @@ -19,4 +31,16 @@
local_dataframe_storage.save("local-dataframe-storage", overwrite=True)


# TODO: define equivalent storages on Exoscale SOS.
# TODO: define document storage on Exoscale SOS.

remote_dataframe_storage = prefect.filesystems.RemoteFileSystem(
basepath=f"s3://{os.environ['EXOSCALE_SOS_BUCKET']}/dataframes",
settings={
"key": os.environ["EXOSCALE_SOS_ACCESS_KEY"],
"secret": os.environ["EXOSCALE_SOS_SECRET_KEY"],
"client_kwargs": {
"endpoint_url": os.environ["EXOSCALE_SOS_ENDPOINT"],
},
},
)
remote_dataframe_storage.save("remote-dataframe-storage", overwrite=True)
74 changes: 63 additions & 11 deletions demokratis_ml/pipelines/preprocess_consultation_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import hashlib
import pathlib
import re
import sys

import httpx
import huggingface_hub
import lingua
import magic
import numpy as np
Expand All @@ -15,6 +17,7 @@
import prefect
import prefect.blocks.core
import prefect.cache_policies
import prefect.filesystems
import prefect.futures
import prefect.logging
import prefect.task_runners
Expand All @@ -26,13 +29,17 @@
""" For consultations reviewed after this date, the topics are considered to be manually
reviewed and of the highest quality. """

OPENPARLDATA_DOCUMENT_TYPE_MANUAL_REVIEW_SINCE_START_DATE = pd.Timestamp("2024-11-01T00:00:00")
""" For OpenParlData consultations ingested into the platform after this date, we can trust the document type.
Before this date, the document type wasn't consistently reviewed and defaulted to VARIOUS_TEXT."""


@prefect.flow(
# Max concurrency must be set, otherwise document extraction blows up on too many open files.
task_runner=prefect.task_runners.ThreadPoolTaskRunner(max_workers=8),
)
@pa.check_output(schemata.FullConsultationDocumentSchemaV1.to_schema())
def preprocess_data() -> pd.DataFrame:
def preprocess_data(publish: bool) -> pd.DataFrame:
"""Retrieve all available consultation documents from the Demokratis API and preprocess them.
Main steps:
Expand All @@ -42,6 +49,9 @@ def preprocess_data() -> pd.DataFrame:
- Store the resulting dataframe in a Parquet file.
Only documents with non-empty content are kept in the final dataframe.
:param publish: If true, upload the resulting dataframe to our remote S3-like storage and our public
HuggingFace dataset repository.
"""
logger = prefect.logging.get_run_logger()

Expand All @@ -52,16 +62,22 @@ def preprocess_data() -> pd.DataFrame:

# Language is unreliable for cantonal documents => detect it.
# This assumes that we do have the content of cantonal documents retrieved from the API.
index = df["document_source"] == "openparldata"
detected_languages = detect_document_language(df.loc[index])
# TODO: log the % difference between detected_languages and df.loc[index, "document_language"]
df.loc[index, "document_language"] = detected_languages
openparldata_index = df["document_source"] == "openparldata"
detected_languages = detect_document_language(df.loc[openparldata_index])
# TODO: log the % difference between detected_languages and df.loc[openparldata_index, "document_language"]
df.loc[openparldata_index, "document_language"] = detected_languages
# Remove document_type labels that are not guaranteed to be correct.
df.loc[
openparldata_index
& (df["consultation_start_date"] < OPENPARLDATA_DOCUMENT_TYPE_MANUAL_REVIEW_SINCE_START_DATE),
"document_type",
] = None

# Download Fedlex documents and extract text
index = df["document_source"] == "fedlex"
assert df.loc[index, "document_content_plain"].isna().all(), "Fedlex documents should not have content yet"
extracted_content = download_documents_and_extract_content(df.loc[index])
df.loc[index, "document_content_plain"] = extracted_content
fedlex_index = df["document_source"] == "fedlex"
assert df.loc[fedlex_index, "document_content_plain"].isna().all(), "Fedlex documents should not have content yet"
extracted_content = download_documents_and_extract_content(df.loc[fedlex_index])
df.loc[fedlex_index, "document_content_plain"] = extracted_content

# Drop documents that still don't have any content
missing_content = df[df["document_content_plain"].isna()]
Expand All @@ -70,11 +86,12 @@ def preprocess_data() -> pd.DataFrame:
"Missing content for %d documents (%.1f%%):\n%r",
len(missing_content),
100 * len(missing_content) / len(df),
missing_content.groupby("document_source").size(),
missing_content.groupby("document_source", observed=False).size(),
)
df = df[~df["document_content_plain"].isna()]

# Store the dataframe
logger.info("Serialising dataframe with %d rows to Parquet", len(df))
data = df.to_parquet(compression="gzip")
# TODO: switch between local and S3 storage based on the environment
fs = blocks.ExtendedLocalFileSystem.load("local-dataframe-storage")
Expand All @@ -85,9 +102,39 @@ def preprocess_data() -> pd.DataFrame:
logger.info("Writing %d rows, %.1f MiB to %r", len(df), len(data) / 1024**2, path)
fs.write_path(path, data)

if publish:
# Dispatch the HuggingFace upload task and the remote storage upload task in parallel.
hf_upload = upload_to_huggingface.submit(
repository_id="demokratis/consultation-documents",
# No need to include the date in the filename, as the HF dataset is a Git repository.
file_name="consultation-documents-preprocessed.parquet",
data=data,
)
# Remote storage upload
remote_fs = prefect.filesystems.RemoteFileSystem.load("remote-dataframe-storage")
logger.info("Uploading to %s/%s", remote_fs.basepath, path)
remote_fs.write_path(str(path), data)
# Wait for the HuggingFace upload to finish before returning
hf_upload.result()

return df


@prefect.task()
def upload_to_huggingface(repository_id: str, file_name: str, data: bytes) -> None:
"""Upload our resulting preprocessed dataframe to our HuggingFace dataset repository."""
logger = prefect.logging.get_run_logger()
logger.info("Uploading to HuggingFace repository %s, file %s", repository_id, file_name)
hf_token = blocks.HuggingFaceDatasetUploadCredentials.load("huggingface-dataset-upload-credentials").token
hf_api = huggingface_hub.HfApi(token=hf_token.get_secret_value())
hf_api.upload_file(
repo_id=repository_id,
path_in_repo=file_name,
path_or_fileobj=data,
repo_type="dataset",
)


def _get_document_storage() -> blocks.ExtendedLocalFileSystem:
# TODO: switch between local and S3 storage based on the environment
return blocks.ExtendedLocalFileSystem.load("local-document-storage")
Expand Down Expand Up @@ -137,6 +184,10 @@ def load_consultation_document_metadata() -> pd.DataFrame:
df.loc[~manual_index & (df["document_source"] == "fedlex"), column] = "organisation_rule"
df[column] = df[column].astype("category")
logger.info("Topics label source (documents):\n%r", df[column].value_counts())
logger.info(
"Topics label source (consultations):\n%r",
df.groupby("consultation_id").agg({column: "first"}).value_counts(),
)

# Cast to the correct types
for time_column in ("consultation_start_date", "consultation_end_date", "consultation_reviewed_at"):
Expand Down Expand Up @@ -366,5 +417,6 @@ def extract_text_from_pdf(local_path_pdf: pathlib.Path, local_path_txt: pathlib.


if __name__ == "__main__":
df = preprocess_data()
publish = len(sys.argv) > 1 and sys.argv[1] == "--publish"
df = preprocess_data(publish)
print(df)
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ description = "Machine learning for Swiss consultation procedures"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"boto3>=1.35.69",
"httpx>=0.27.2",
"huggingface-hub>=0.26.3",
"lingua-language-detector>=2.0.2",
"numpy>=1.26.4",
"pandera>=0.20.4",
Expand All @@ -15,6 +15,8 @@ dependencies = [
"pymupdf>=1.24.14",
"python-dotenv>=1.0.1",
"python-magic>=0.4.27",
"s3fs[boto3]>=2024.10.0",
"tqdm>=4.67.1",
]

[tool.uv]
Expand Down
Loading

0 comments on commit b31e231

Please sign in to comment.