Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'File partition' option and 'document' directory specification #213

Merged
merged 21 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dolma"
version = "1.0.9"
version = "1.0.14"
edition = "2021"
license = "Apache-2.0"

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dolma"
version = "1.0.14.post1"
version = "1.0.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want 1.0.15 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No actually, when I started making dev branches I misread the version. The most recent not dev is 1.0.4

description = "Data filters"
license = { text = "Apache-2.0" }
readme = "README.md"
Expand Down
24 changes: 22 additions & 2 deletions python/dolma/cli/deduper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import fnmatch
import os
from contextlib import ExitStack
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -99,6 +101,13 @@ class DedupeConfig:
partition_index: Optional[int] = field(
default=0, help="The index of the partition being processed, in the range [0, num_partitions)."
)
file_partition: Optional[bool] = field(
default=False, help="Whether or not to partition at the document level (vs at the span level)"
)
document_dir: Optional[str] = field(
default="documents",
help="The folder in source paths to replace with 'attributes' to store results, if not 'documents'",
)


@dataclass
Expand Down Expand Up @@ -135,7 +144,6 @@ def run(cls, parsed_config: DeduperConfig):
logger = get_logger("tagger")

dict_config: Dict[str, Any] = {}

with ExitStack() as stack:
work_dirs = stack.enter_context(make_workdirs(parsed_config.work_dir))

Expand All @@ -146,6 +154,8 @@ def run(cls, parsed_config: DeduperConfig):
"min_words": parsed_config.dedupe.min_words,
"num_partitions": parsed_config.dedupe.num_partitions,
"partition_index": parsed_config.dedupe.partition_index,
"file_partition": parsed_config.dedupe.file_partition,
"document_dir": parsed_config.dedupe.document_dir,
}
try_name = parsed_config.dedupe.name if not om.is_missing(parsed_config.dedupe, "name") else None

Expand Down Expand Up @@ -182,7 +192,17 @@ def run(cls, parsed_config: DeduperConfig):
# perform some path validation to make sure we don't call the mixer with invalid config
total_matching_documents = 0
for document in parsed_config.documents:
dict_config.setdefault("documents", []).append(str(document))

if not any(
fnmatch.fnmatch(dict_config["dedupe"]["document_dir"], part) for part in document.split(os.sep)
):
raise DolmaConfigError(
f"Path ({document}) does not contain expected document directory: '/{dict_config['dedupe']['document_dir']}/'. "
)

doc = str(document)

dict_config.setdefault("documents", []).append(doc)

current_matching_documents = sum(1 for _ in glob_path(document))
if current_matching_documents == 0:
Expand Down
28 changes: 26 additions & 2 deletions src/deduper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use crate::s3_util;
use crate::shard::shard_config::{CompressionConfig, WorkDirConfig};
use crate::shard::{find_objects_matching_patterns, FileCache};
use crate::wimbd::tokens::tokenize;

use ahash::RandomState;
use deduper_config::*;
use std::hash::{BuildHasher, Hash, Hasher};

pub fn run(config: DeduperConfig) -> Result<u32, u32> {
let bloom_filter = BloomFilter::initialize(&config.bloom_filter).unwrap();
Expand All @@ -33,7 +34,20 @@ pub fn run(config: DeduperConfig) -> Result<u32, u32> {
let threadpool = ThreadPool::new(config.processes);
let failed_shard_count = AtomicU32::new(0);
let failed_shard_count_ref = Arc::new(failed_shard_count);
let hash_builder = RandomState::with_seeds(0, 1, 2, 3);

for p in paths {
let mut hasher = hash_builder.build_hasher();
p.hash(&mut hasher);
let hashed_path = hasher.finish();

if config.dedupe.file_partition.unwrap_or(false)
&& hashed_path % config.dedupe.num_partitions.unwrap_or(1)
!= config.dedupe.partition_index.unwrap_or(0)
{
continue;
}

let path = p.clone();
let work_dirs = config.work_dir.clone();
let dedupe = config.dedupe.clone();
Expand Down Expand Up @@ -123,7 +137,15 @@ fn write_attributes(

let attrs_location = {
let attr_prefix = format!("/attributes/{}/", attr_key);
docs_location.replace("/documents/", &attr_prefix)
docs_location.replace(
&format!(
"/{}/",
dedupe_config
.document_dir
.unwrap_or(String::from("documents"))
),
&attr_prefix,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test for this?

};
let local_output = cache.prepare_output(&attrs_location, label_temp)?;
let mut num_processed = 0;
Expand Down Expand Up @@ -546,6 +568,8 @@ pub mod deduper_config {
pub skip_empty: Option<bool>,
pub num_partitions: Option<u64>,
pub partition_index: Option<u64>,
pub file_partition: Option<bool>,
pub document_dir: Option<String>,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down
28 changes: 28 additions & 0 deletions tests/config/filepath-bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"documents": [
"tests/data/provided/deduper/pathnotd0cumentz/000.json.gz"
],
"work_dir": {
"input": "tests/work/temp/dedupe-para/input",
"output": "tests/work/temp/dedupe-para/output"
},
"dedupe": {
"name": "dedupe_paragraph_ngrams",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans",
"by_ngram": {
"ngram_length": 6,
"stride": 3,
"overlap_threshold": 0.5
}
}
},
"bloom_filter": {
"file": "tests/work/para_bloom_filter.bin",
"size_in_bytes": 0,
"read_only": false,
"estimated_doc_count": 1000,
"desired_false_positive_rate": 0.001
},
"processes": 1
}
29 changes: 29 additions & 0 deletions tests/config/filepath-good.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"documents": [
"tests/data/provided/deduper/pathnotd0cumentz/000.json.gz"
],
"work_dir": {
"input": "tests/work/temp/dedupe-para/input",
"output": "tests/work/temp/dedupe-para/output"
},
"dedupe": {
"name": "dedupe_paragraph_ngrams",
"document_dir": "pathnotd0cumentz",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans",
"by_ngram": {
"ngram_length": 6,
"stride": 3,
"overlap_threshold": 0.5
}
}
},
"bloom_filter": {
"file": "tests/work/para_bloom_filter.bin",
"size_in_bytes": 0,
"read_only": false,
"estimated_doc_count": 1000,
"desired_false_positive_rate": 0.001
},
"processes": 1
}
Binary file not shown.
37 changes: 34 additions & 3 deletions tests/python/test_deduper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing_extensions import TypedDict

from dolma.cli.__main__ import main
from dolma.core.errors import DolmaConfigError
from dolma.core.utils import split_words

from .utils import (
Expand All @@ -24,6 +25,9 @@

TEST_DIR = Path(__file__).parent.parent
DEDUPE_BY_URL = TEST_DIR / "config/dedupe-by-url.json"
DEDUPE_BAD_FILENAME = TEST_DIR / "config/filepath-bad.json"
DEDUPE_GOOD_FILENAME = TEST_DIR / "config/filepath-good.json"

DEDUPE_PARAGRAPHS = TEST_DIR / "config/dedupe-paragraphs.json"
DEDUPE_PARAGRAPH_NGRAMS = TEST_DIR / "config/dedupe-paragraph-ngrams.json"

Expand All @@ -48,13 +52,13 @@ def setUp(self) -> None:

# upload test data
upload_s3_prefix(
s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/documents/*.gz"
s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/*/*.gz"
)

# copy provided config files to local temp dir
shutil.copytree(
"tests/data/provided/deduper/documents",
f"{self.local_temp_dir}/tests/data/provided/deduper/documents",
"tests/data/provided/deduper",
f"{self.local_temp_dir}/tests/data/provided/deduper",
dirs_exist_ok=True,
)

Expand Down Expand Up @@ -82,6 +86,33 @@ def test_dedupe_by_url(self):
)
return self._compare_dedupe_output(expected, computed) # pyright: ignore

def test_dedupe_bad_filepath(self):
with open(DEDUPE_BAD_FILENAME, "r") as f:
config = json.load(f)

config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}'
config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}'

with NamedTemporaryFile("w") as f:
json.dump(config, f)
f.flush()

with self.assertRaises(DolmaConfigError):
main(argv=["-c", f.name, "dedupe"])

def test_dedupe_good_filepath(self):
with open(DEDUPE_GOOD_FILENAME, "r") as f:
config = json.load(f)

config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}'
config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}'

with NamedTemporaryFile("w") as f:
json.dump(config, f)
f.flush()

main(argv=["-c", f.name, "dedupe"])

def test_dedupe_paragraphs(self):
with open(DEDUPE_PARAGRAPHS, "r") as f:
config = json.load(f)
Expand Down
Loading