Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reddit processing code #74

Merged
merged 8 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions sources/reddit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Reddit Source Data

[Reddit](https://www.reddit.com/) is a social news aggregation and discussion website where users can post links and take part in discussions. Until the spring of 2023, Reddit made its data publicly available through an API that many 3rd party developers built upon. The Pushshift effort was a social media data collection, analysis, and archiving platform that collected Reddit data through the API and made it available to researchers (details of this collection can be found in [The Pushshift Reddit Dataset](https://arxiv.org/abs/2001.08435)). Pushshift released hundreds of collected submission and comment and dumps spanning Reddit’s creation in 2006 to the end of the public API in 2023. While these dumps are no longer available from Pushshift, they can still be found in handful of web archives as of the fall of 2023.

Reddit content comes in two flavors related to the nature of the platform: **submissions** and **comments**. Submissions are variably links to articles or other external content, images or videos, or “selftext” (posts with only text written by the submitter to initiate a discussion thread). Comments are user-written dialogue that form a nested, hierarchical, conversational thread discussing a submission. The indeterminate nature of the data allows for a fair amount of freedom when constructing a preatraining dataset, and several variations of the dataset were explored for pretraining data.
soldni marked this conversation as resolved.
Show resolved Hide resolved

# Dataset versions

At a high level, three architectures of the Reddit dataset were explored (with minor variations within these versions):

* **comment threads**
* This format of the dataset assembles comments into conversational threads while leaving submissions unconnected. Comments from the same thread are combined into multi-round dialogue composed of multiple user’s utterances. This is done up to a maximum parent depth, and the assembled dialogues only contain a portion of their parent thread (i.e. documents are snippets of complete thread).

* **atomic content**
* The simplest form, this version of the dataset treats comments and submissions on equal footing and does not assemble comments into conversational threads. Comments and submissions are incorporated as complete documents.

* **complete threads**
* The most complex and structured form, this version combines submissions and complete comment threads into a single document with code-like indentation indicating the position of a comment in the thread's hierarchy. This version most closely replicates the experience of browsing a reddit thread and places content in its most complete context. All comments (and the originating submission text) from the same thread are incorporated in a single document.


## comment_threads_v1

This version assembles all possible conversational threads to a depth of 12. This introduces a great deal of duplication in the dataset, since a single comment is potentially included multiple times in separate conversational paths. V1 was used to ablate various filtering and deduping strategies and it exists in several subversions.

## comment_threads_v2

Similar to v1 in most respects, this version dedupes comments at creation time by keeping only the longest conversational thread for each top-level comment.


## atomic_content_v3

A first experiment with atomic comments.

## complete_threads_codelike_v4

Only one variation of this version was tried.

## atomic_content_v5
soldni marked this conversation as resolved.
Show resolved Hide resolved

A refined version of atomic_content_v3, v5 uses different length and selection criteria for comments and submissions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is atomic_content_v5 the final one to be included in Dolma?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!



# Running dataflow scripts

After placing the collected comment and submission dumps in a google cloud bucket, most versions of the dataset build comment and submission data seperately by running build_comment_data.py and build_submission_data.py:


```
python build_comment_data.py \
--input_gcs_dir ${DATADIR} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

```
python build_submission_data.py \
--input_gcs_dir ${DATADIR} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

The exception is complete_threads_codelike_v4, which is created with a single script:

```
python build_combined_thread_data.py \
--input_gcs_dir_comments ${DATADIRCOMMENTS} \
--input_gcs_dir_submissions ${DATADIRSUBMISSIONS} \
--output_dir ${OUTPUTDIR} \
--runner DataflowRunner \
--temp_location ${OUTPUTDIR}/temp \
--staging_location ${OUTPUTDIR}/staging \
--project ${PROJECT} \
--setup_file ./setup.py
```

Once a dataflow job is running, you can continue to monitor it in the launching terminal or on the [Dataflow service console](https://console.cloud.google.com/dataflow).
154 changes: 154 additions & 0 deletions sources/reddit/atomic_content_v3/build_comment_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""A Dataflow script for creating DOLMA formatted pretraining data from
reddit comment dumps.

For usage see README.md.

Adapted from work done at PolyAI:
https://github.com/PolyAI-LDN/conversational-datasets/tree/master/reddit
"""

import logging
from functools import partial
import uuid
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from utils.shared_utils import (
read_content_from_source,
build_base_parser,
positive_int,
normalize_id,
normalize_string,
load_filtered_subreddit_lists,
write_to_gcs,
isodatetime_from_epoch,
DATA_ACQUISITION_DATE
)


def parse_args(argv=None):
parser = build_base_parser()

parser.add_argument(
"--parent_depth",
type=positive_int,
default=12,
help="How many parent content to consider.",
)
parser.add_argument(
"--max_length",
type=positive_int,
default=2048,
help="Maximum letters in content to include.",
)
parser.add_argument(
"--min_length",
type=positive_int,
default=400,
help="Minimum characters in content to include.",
)
return parser.parse_known_args(argv)


def normalize_comment(comment, max_length):

def trim(text, max_length):
if len(text) <= max_length:
return text
text = text[:max_length + 1]

# Trim until the last two characters are the boundary between an
# alphanumeric character, and a non-alphanumeric character.
while len(text) > 1 and (text[-1].isalnum() == text[-2].isalnum()):
text = text[:-1]
return text[:-1]

comment = {
'id': comment['id'],
'thread_id': normalize_id(comment['link_id']),
'parent_id': normalize_id(comment['parent_id']),
'body': trim(normalize_string(comment['body']), max_length),
'body_is_trimmed': len(comment['body']) > max_length,
'author': normalize_string(comment['author']),
'subreddit': normalize_string(comment['subreddit']),
'created_utc': comment['created_utc'],
'score': comment['score'],
'link_id': comment['link_id']
}
return comment


def create_examples(comment, min_length):
def _should_skip(comment, min_length):
if comment['body_is_trimmed']:
return True
if comment['body'] in {
"[deleted]",
"[removed]",
"[UNICODE ENCODE ERROR]"}:
return True
if comment['subreddit'] in {
"[deleted]",
"[removed]",
"[UNICODE ENCODE ERROR]"}:
return True
if len(comment['body']) < min_length:
return True
if not comment['score'] or comment['score'] < 2:
return True
return False

if _should_skip(comment, min_length):
return None

example = {
'subreddit': comment['subreddit'],
'thread_id': comment['thread_id'],
'created': isodatetime_from_epoch(comment['created_utc']),
'added': DATA_ACQUISITION_DATE,
'id': uuid.uuid4().hex,
'conversational_format': comment['body']
}
yield example


def run(argv=None, comments=None):
args, pipeline_args = parse_args(argv)
banned_subreddits_file = args.banned_subreddits_file
banned_subreddits = load_filtered_subreddit_lists(banned_subreddits_file)
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, min_ram="8GB")
# pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)

pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,
dataflow_service_options=['enable_prime'],
experiments=['enable_batch_vmr',
'enable_vertical_memory_autoscaling',
'no_use_multiple_sdk_containers'])

p = beam.Pipeline(options=pipeline_options)

comments = read_content_from_source(comments, p, args)

comments |= (
"normalize content" >> beam.Map(
partial(normalize_comment, max_length=args.max_length)))

examples = comments | (
"Create examples" >> beam.FlatMap(
partial(create_examples,
min_length=args.min_length,
)))

examples = examples | (
"Filter none content" >> beam.Filter(
lambda c: c is not None))

write_to_gcs(examples, banned_subreddits, args)

result = p.run()
result.wait_until_finish()


if __name__ == "__main__":
logger = logging.getLogger()
logger.setLevel(logging.INFO)
run()
Loading
Loading