Skip to content

Commit

Permalink
Reddit processing code (#74)
Browse files Browse the repository at this point in the history
* initial commit of reddit processing scripts

* Minor cleanup and added Readme

* removed programming subreddits from one stray dir

* Update sources/reddit/README.md

Co-authored-by: Niklas Muennighoff <[email protected]>

* Apply suggestions from code review

---------

Co-authored-by: Luca Soldaini <[email protected]>
Co-authored-by: Niklas Muennighoff <[email protected]>
  • Loading branch information
3 people committed Dec 20, 2023
1 parent 9c80a8b commit b9dca47
Show file tree
Hide file tree
Showing 35 changed files with 133,239 additions and 0 deletions.
84 changes: 84 additions & 0 deletions sources/reddit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Reddit Source Data

[Reddit](https://www.reddit.com/) is a social news aggregation and discussion website where users can post links and take part in discussions. Until the spring of 2023, Reddit made its data publicly available through an API that many 3rd party developers built upon. The Pushshift effort was a social media data collection, analysis, and archiving platform that collected Reddit data through the API and made it available to researchers (details of this collection can be found in [The Pushshift Reddit Dataset](https://arxiv.org/abs/2001.08435)). Pushshift released hundreds of collected submission and comment and dumps spanning Reddit’s creation in 2006 to the end of the public API in 2023. While these dumps are no longer available from Pushshift, they can still be found in handful of web archives as of the fall of 2023.

Reddit content comes in two flavors related to the nature of the platform: **submissions** and **comments**. Submissions are variably links to articles or other external content, images or videos, or “selftext” (posts with only text written by the submitter to initiate a discussion thread). Comments are user-written dialogue that form a nested, hierarchical, conversational thread discussing a submission. The indeterminate nature of the data allows for a fair amount of freedom when constructing a pretraining dataset, and several variations of the dataset were explored for pretraining data.

# Dataset versions

At a high level, three architectures of the Reddit dataset were explored (with minor variations within these versions):

* **comment threads**
* This format of the dataset assembles comments into conversational threads while leaving submissions unconnected. Comments from the same thread are combined into multi-round dialogue composed of multiple user’s utterances. This is done up to a maximum parent depth, and the assembled dialogues only contain a portion of their parent thread (i.e. documents are snippets of complete thread).

* **atomic content**
* The simplest form, this version of the dataset treats comments and submissions on equal footing and does not assemble comments into conversational threads. Comments and submissions are incorporated as complete documents.

* **complete threads**
* The most complex and structured form, this version combines submissions and complete comment threads into a single document with code-like indentation indicating the position of a comment in the thread's hierarchy. This version most closely replicates the experience of browsing a reddit thread and places content in its most complete context. All comments (and the originating submission text) from the same thread are incorporated in a single document.


## comment_threads_v1

This version assembles all possible conversational threads to a depth of 12. This introduces a great deal of duplication in the dataset, since a single comment is potentially included multiple times in separate conversational paths. V1 was used to ablate various filtering and deduping strategies and it exists in several subversions.

## comment_threads_v2

Similar to v1 in most respects, this version dedupes comments at creation time by keeping only the longest conversational thread for each top-level comment.


## atomic_content_v3

A first experiment with atomic comments.

## complete_threads_codelike_v4

Only one variation of this version was tried.

## atomic_content_v5 (*included in Dolma v 1.5*)

A refined version of atomic_content_v3, v5 uses different length and selection criteria for comments and submissions.


# Running dataflow scripts

After placing the collected comment and submission dumps in a google cloud bucket, most versions of the dataset build comment and submission data seperately by running build_comment_data.py and build_submission_data.py:


```
python build_comment_data.py \
--input_gcs_dir ${DATADIR} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

```
python build_submission_data.py \
--input_gcs_dir ${DATADIR} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

The exception is complete_threads_codelike_v4, which is created with a single script:

```
python build_combined_thread_data.py \
--input_gcs_dir_comments ${DATADIRCOMMENTS} \
--input_gcs_dir_submissions ${DATADIRSUBMISSIONS} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

Once a dataflow job is running, you can continue to monitor it in the launching terminal or on the [Dataflow service console](https://console.cloud.google.com/dataflow).
154 changes: 154 additions & 0 deletions sources/reddit/atomic_content_v3/build_comment_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""A Dataflow script for creating DOLMA formatted pretraining data from
reddit comment dumps.
For usage see README.md.
Adapted from work done at PolyAI:
https://github.com/PolyAI-LDN/conversational-datasets/tree/master/reddit
"""

import logging
from functools import partial
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from utils.shared_utils import (
read_content_from_source,
build_base_parser,
positive_int,
normalize_id,
normalize_string,
load_filtered_subreddit_lists,
write_to_gcs,
isodatetime_from_epoch,
DATA_ACQUISITION_DATE
)


def parse_args(argv=None):
parser = build_base_parser()

parser.add_argument(
"--parent_depth",
type=positive_int,
default=12,
help="How many parent content to consider.",
)
parser.add_argument(
"--max_length",
type=positive_int,
default=2048,
help="Maximum letters in content to include.",
)
parser.add_argument(
"--min_length",
type=positive_int,
default=400,
help="Minimum characters in content to include.",
)
return parser.parse_known_args(argv)


def normalize_comment(comment, max_length):

def trim(text, max_length):
if len(text) <= max_length:
return text
text = text[:max_length + 1]

# Trim until the last two characters are the boundary between an
# alphanumeric character, and a non-alphanumeric character.
while len(text) > 1 and (text[-1].isalnum() == text[-2].isalnum()):
text = text[:-1]
return text[:-1]

comment = {
'id': comment['id'],
'thread_id': normalize_id(comment['link_id']),
'parent_id': normalize_id(comment['parent_id']),
'body': trim(normalize_string(comment['body']), max_length),
'body_is_trimmed': len(comment['body']) > max_length,
'author': normalize_string(comment['author']),
'subreddit': normalize_string(comment['subreddit']),
'created_utc': comment['created_utc'],
'score': comment['score'],
'link_id': comment['link_id']
}
return comment


def create_examples(comment, min_length):
def _should_skip(comment, min_length):
if comment['body_is_trimmed']:
return True
if comment['body'] in {
"[deleted]",
"[removed]",
"[UNICODE ENCODE ERROR]"}:
return True
if comment['subreddit'] in {
"[deleted]",
"[removed]",
"[UNICODE ENCODE ERROR]"}:
return True
if len(comment['body']) < min_length:
return True
if not comment['score'] or comment['score'] < 2:
return True
return False

if _should_skip(comment, min_length):
return None

example = {
'subreddit': comment['subreddit'],
'thread_id': comment['thread_id'],
'created': isodatetime_from_epoch(comment['created_utc']),
'added': DATA_ACQUISITION_DATE,
'id': uuid.uuid4().hex,
'conversational_format': comment['body']
}
yield example


def run(argv=None, comments=None):
args, pipeline_args = parse_args(argv)
banned_subreddits_file = args.banned_subreddits_file
banned_subreddits = load_filtered_subreddit_lists(banned_subreddits_file)
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, min_ram="8GB")
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)

pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,
dataflow_service_options=['enable_prime'],
experiments=['enable_batch_vmr',
'enable_vertical_memory_autoscaling',
'no_use_multiple_sdk_containers'])

p = beam.Pipeline(options=pipeline_options)

comments = read_content_from_source(comments, p, args)

comments |= (
"normalize content" >> beam.Map(
partial(normalize_comment, max_length=args.max_length)))

examples = comments | (
"Create examples" >> beam.FlatMap(
partial(create_examples,
min_length=args.min_length,
)))

examples = examples | (
"Filter none content" >> beam.Filter(
lambda c: c is not None))

write_to_gcs(examples, banned_subreddits, args)

result = p.run()
result.wait_until_finish()


if __name__ == "__main__":
logger = logging.getLogger()
logger.setLevel(logging.INFO)
run()
Loading

0 comments on commit b9dca47

Please sign in to comment.