-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* initial commit of reddit processing scripts * Minor cleanup and added Readme * removed programming subreddits from one stray dir * Update sources/reddit/README.md Co-authored-by: Niklas Muennighoff <[email protected]> * Apply suggestions from code review --------- Co-authored-by: Luca Soldaini <[email protected]> Co-authored-by: Niklas Muennighoff <[email protected]>
- Loading branch information
1 parent
2872730
commit afab18c
Showing
35 changed files
with
133,239 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# Reddit Source Data | ||
|
||
[Reddit](https://www.reddit.com/) is a social news aggregation and discussion website where users can post links and take part in discussions. Until the spring of 2023, Reddit made its data publicly available through an API that many 3rd party developers built upon. The Pushshift effort was a social media data collection, analysis, and archiving platform that collected Reddit data through the API and made it available to researchers (details of this collection can be found in [The Pushshift Reddit Dataset](https://arxiv.org/abs/2001.08435)). Pushshift released hundreds of collected submission and comment and dumps spanning Reddit’s creation in 2006 to the end of the public API in 2023. While these dumps are no longer available from Pushshift, they can still be found in handful of web archives as of the fall of 2023. | ||
|
||
Reddit content comes in two flavors related to the nature of the platform: **submissions** and **comments**. Submissions are variably links to articles or other external content, images or videos, or “selftext” (posts with only text written by the submitter to initiate a discussion thread). Comments are user-written dialogue that form a nested, hierarchical, conversational thread discussing a submission. The indeterminate nature of the data allows for a fair amount of freedom when constructing a pretraining dataset, and several variations of the dataset were explored for pretraining data. | ||
|
||
# Dataset versions | ||
|
||
At a high level, three architectures of the Reddit dataset were explored (with minor variations within these versions): | ||
|
||
* **comment threads** | ||
* This format of the dataset assembles comments into conversational threads while leaving submissions unconnected. Comments from the same thread are combined into multi-round dialogue composed of multiple user’s utterances. This is done up to a maximum parent depth, and the assembled dialogues only contain a portion of their parent thread (i.e. documents are snippets of complete thread). | ||
|
||
* **atomic content** | ||
* The simplest form, this version of the dataset treats comments and submissions on equal footing and does not assemble comments into conversational threads. Comments and submissions are incorporated as complete documents. | ||
|
||
* **complete threads** | ||
* The most complex and structured form, this version combines submissions and complete comment threads into a single document with code-like indentation indicating the position of a comment in the thread's hierarchy. This version most closely replicates the experience of browsing a reddit thread and places content in its most complete context. All comments (and the originating submission text) from the same thread are incorporated in a single document. | ||
|
||
|
||
## comment_threads_v1 | ||
|
||
This version assembles all possible conversational threads to a depth of 12. This introduces a great deal of duplication in the dataset, since a single comment is potentially included multiple times in separate conversational paths. V1 was used to ablate various filtering and deduping strategies and it exists in several subversions. | ||
|
||
## comment_threads_v2 | ||
|
||
Similar to v1 in most respects, this version dedupes comments at creation time by keeping only the longest conversational thread for each top-level comment. | ||
|
||
|
||
## atomic_content_v3 | ||
|
||
A first experiment with atomic comments. | ||
|
||
## complete_threads_codelike_v4 | ||
|
||
Only one variation of this version was tried. | ||
|
||
## atomic_content_v5 (*included in Dolma v 1.5*) | ||
|
||
A refined version of atomic_content_v3, v5 uses different length and selection criteria for comments and submissions. | ||
|
||
|
||
# Running dataflow scripts | ||
|
||
After placing the collected comment and submission dumps in a google cloud bucket, most versions of the dataset build comment and submission data seperately by running build_comment_data.py and build_submission_data.py: | ||
|
||
|
||
``` | ||
python build_comment_data.py \ | ||
--input_gcs_dir ${DATADIR} \ | ||
--output_dir ${OUTPUTDIR} \ | ||
--runner DataflowRunner \ | ||
--temp_location ${OUTPUTDIR}/temp \ | ||
--staging_location ${OUTPUTDIR}/staging \ | ||
--project ${PROJECT} \ | ||
--setup_file ./setup.py | ||
``` | ||
|
||
``` | ||
python build_submission_data.py \ | ||
--input_gcs_dir ${DATADIR} \ | ||
--output_dir ${OUTPUTDIR} \ | ||
--runner DataflowRunner \ | ||
--temp_location ${OUTPUTDIR}/temp \ | ||
--staging_location ${OUTPUTDIR}/staging \ | ||
--project ${PROJECT} \ | ||
--setup_file ./setup.py | ||
``` | ||
|
||
The exception is complete_threads_codelike_v4, which is created with a single script: | ||
|
||
``` | ||
python build_combined_thread_data.py \ | ||
--input_gcs_dir_comments ${DATADIRCOMMENTS} \ | ||
--input_gcs_dir_submissions ${DATADIRSUBMISSIONS} \ | ||
--output_dir ${OUTPUTDIR} \ | ||
--runner DataflowRunner \ | ||
--temp_location ${OUTPUTDIR}/temp \ | ||
--staging_location ${OUTPUTDIR}/staging \ | ||
--project ${PROJECT} \ | ||
--setup_file ./setup.py | ||
``` | ||
|
||
Once a dataflow job is running, you can continue to monitor it in the launching terminal or on the [Dataflow service console](https://console.cloud.google.com/dataflow). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
"""A Dataflow script for creating DOLMA formatted pretraining data from | ||
reddit comment dumps. | ||
For usage see README.md. | ||
Adapted from work done at PolyAI: | ||
https://github.com/PolyAI-LDN/conversational-datasets/tree/master/reddit | ||
""" | ||
|
||
import logging | ||
from functools import partial | ||
import uuid | ||
import apache_beam as beam | ||
from apache_beam.options.pipeline_options import PipelineOptions | ||
from utils.shared_utils import ( | ||
read_content_from_source, | ||
build_base_parser, | ||
positive_int, | ||
normalize_id, | ||
normalize_string, | ||
load_filtered_subreddit_lists, | ||
write_to_gcs, | ||
isodatetime_from_epoch, | ||
DATA_ACQUISITION_DATE | ||
) | ||
|
||
|
||
def parse_args(argv=None): | ||
parser = build_base_parser() | ||
|
||
parser.add_argument( | ||
"--parent_depth", | ||
type=positive_int, | ||
default=12, | ||
help="How many parent content to consider.", | ||
) | ||
parser.add_argument( | ||
"--max_length", | ||
type=positive_int, | ||
default=2048, | ||
help="Maximum letters in content to include.", | ||
) | ||
parser.add_argument( | ||
"--min_length", | ||
type=positive_int, | ||
default=400, | ||
help="Minimum characters in content to include.", | ||
) | ||
return parser.parse_known_args(argv) | ||
|
||
|
||
def normalize_comment(comment, max_length): | ||
|
||
def trim(text, max_length): | ||
if len(text) <= max_length: | ||
return text | ||
text = text[:max_length + 1] | ||
|
||
# Trim until the last two characters are the boundary between an | ||
# alphanumeric character, and a non-alphanumeric character. | ||
while len(text) > 1 and (text[-1].isalnum() == text[-2].isalnum()): | ||
text = text[:-1] | ||
return text[:-1] | ||
|
||
comment = { | ||
'id': comment['id'], | ||
'thread_id': normalize_id(comment['link_id']), | ||
'parent_id': normalize_id(comment['parent_id']), | ||
'body': trim(normalize_string(comment['body']), max_length), | ||
'body_is_trimmed': len(comment['body']) > max_length, | ||
'author': normalize_string(comment['author']), | ||
'subreddit': normalize_string(comment['subreddit']), | ||
'created_utc': comment['created_utc'], | ||
'score': comment['score'], | ||
'link_id': comment['link_id'] | ||
} | ||
return comment | ||
|
||
|
||
def create_examples(comment, min_length): | ||
def _should_skip(comment, min_length): | ||
if comment['body_is_trimmed']: | ||
return True | ||
if comment['body'] in { | ||
"[deleted]", | ||
"[removed]", | ||
"[UNICODE ENCODE ERROR]"}: | ||
return True | ||
if comment['subreddit'] in { | ||
"[deleted]", | ||
"[removed]", | ||
"[UNICODE ENCODE ERROR]"}: | ||
return True | ||
if len(comment['body']) < min_length: | ||
return True | ||
if not comment['score'] or comment['score'] < 2: | ||
return True | ||
return False | ||
|
||
if _should_skip(comment, min_length): | ||
return None | ||
|
||
example = { | ||
'subreddit': comment['subreddit'], | ||
'thread_id': comment['thread_id'], | ||
'created': isodatetime_from_epoch(comment['created_utc']), | ||
'added': DATA_ACQUISITION_DATE, | ||
'id': uuid.uuid4().hex, | ||
'conversational_format': comment['body'] | ||
} | ||
yield example | ||
|
||
|
||
def run(argv=None, comments=None): | ||
args, pipeline_args = parse_args(argv) | ||
banned_subreddits_file = args.banned_subreddits_file | ||
banned_subreddits = load_filtered_subreddit_lists(banned_subreddits_file) | ||
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, min_ram="8GB") | ||
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True) | ||
|
||
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, | ||
dataflow_service_options=['enable_prime'], | ||
experiments=['enable_batch_vmr', | ||
'enable_vertical_memory_autoscaling', | ||
'no_use_multiple_sdk_containers']) | ||
|
||
p = beam.Pipeline(options=pipeline_options) | ||
|
||
comments = read_content_from_source(comments, p, args) | ||
|
||
comments |= ( | ||
"normalize content" >> beam.Map( | ||
partial(normalize_comment, max_length=args.max_length))) | ||
|
||
examples = comments | ( | ||
"Create examples" >> beam.FlatMap( | ||
partial(create_examples, | ||
min_length=args.min_length, | ||
))) | ||
|
||
examples = examples | ( | ||
"Filter none content" >> beam.Filter( | ||
lambda c: c is not None)) | ||
|
||
write_to_gcs(examples, banned_subreddits, args) | ||
|
||
result = p.run() | ||
result.wait_until_finish() | ||
|
||
|
||
if __name__ == "__main__": | ||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
run() |
Oops, something went wrong.