Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spark including Spark SQL #297

Open
2 of 10 tasks
anjackson opened this issue Sep 27, 2022 · 5 comments
Open
2 of 10 tasks

Support Spark including Spark SQL #297

anjackson opened this issue Sep 27, 2022 · 5 comments
Assignees

Comments

@anjackson
Copy link
Contributor

anjackson commented Sep 27, 2022

To support more modern patterns of usage, and more complex processing, it would be good to support Spark.

Long term, this should likely integrate with the Archives Unleashed Toolkit, but at the moment this is not easy for us to transition to using that. This is mostly due to how it handles the record contents, which gets embedded in the data frames, which leads to some heavy memory pressure (TBA some notes).

The current hadoop3 branch WarcLoader provides an initial implementation. It works by building a RDD stream of WARC Records, but also supports running the analyser on that stream, which is able to work on the full 'local' byte streams as long as no re-partitioning has happened. This can then output a stream of objects that contain the extracted metadata fields and are no longer tied to the original WARC input streams. This can then be turned into a DataFrame and SQL can be run on it. It can also be exported as Parquet etc.

  • Convert from input to POJO (POJO allows mapping JavaRDD to a Dataframe): a Memento with:
    • Named fields for core properties, naming consistent with CDX etc.
    • The source file name and offset etc.
    • A @transient reference to the underlying WritableArchiveRecord wrapped as HashCached etc.
    • A Hash<String,String> for arbitrary metadata extracted fields.
  • Add a mechanism for configuring the analyser process behindloadAndAnalyse and createDataFrame (short term).
  • Reconsider Review configuration mechanism and consider a 'fluent' API #107 and maybe separate out the different analysis steps.
  • Possibly an 'enrich' convention, a bit like ArchiveSpark, with the WARC Indexer being wrapped to create an enriched Memento from a basic one.
    • e.g. rdd.mapPartitions(EnricherFunction) wrapped as...
    • JavaRDD rdd = WebArchiveLoader.load("paths", JavaSparkContext).enrich(WarcIndexerEnricherFunction);
  • Use Spark SQL:
    • Register dataframe as temp table df.createOrReplaceTempView("mementos")
  • Use Spark SQL + Iceberg to take the temp table and MERGE INTO a destination MegaTable.

The POJO approach does mean we end up with a very wide schema with a lot of nulls if there's not been much analysis. Supporting a more dynamic schema would be nice, but then again fixing the schema aligns with the Solr schema.

@anjackson
Copy link
Contributor Author

Example of usage:

// Set load the WARCs:
Dataset<Row> df = WarcLoader.createDataFrame("test.warc.gz", spark);
df.createOrReplaceTempView("mementos");

// Run queries as SQL:
Dataset<Row> dft = spark.sql("SELECT url, `content.*` FROM mementos WHERE contentType IS NOT NULL");
dft.show();

@anjackson anjackson self-assigned this Sep 27, 2022
@ruebot
Copy link
Contributor

ruebot commented Sep 27, 2022

@anjackson we moved over to using Sparkling for our processing earlier this year. It's been really great for us. aut is basically a utility for creating a common set of DataFrames from what Sparkling processes.

...and I'm reminded of conversations we had 5 or 6 years ago about our projects working towards some common code base.

@anjackson
Copy link
Contributor Author

Thanks @ruebot - I have spent some time looking at Sparkling, but right now it feels like the gap is too big for me to switch over. Our current implementation is based on locally caching and repeatedly processing the payload (without passing large byte[] arrays around) because we're tight for memory. My understanding is that Sparkling can support working in that way, but porting over to it would be quite a lot of work, and right now I don't really understand what benefits it would bring. I suspect this is because I've not fully grasped what Spark and Sparkling can do!

As I don't have much time to work on this, I'm starting off by learning how to use Spark, and porting the current process over with minimal changes. As I learn more I can hopefully bring things closer together, but it looks like being a long road.

@anjackson
Copy link
Contributor Author

anjackson commented Sep 28, 2022

Some experimentation with how to setup up the extraction without knowing all fields ahead of time... See 62de913.

As per https://spark.apache.org/docs/latest/sql-getting-started.html#programmatically-specifying-the-schema this can work, but needs a bit of help. The composite Map function or functions that get applied via mapPartitions needs to declare what fields it's going to add and what their types are. (Note that I tried doing this by grabbing the .first() item in the RDD but that's rather costly as the source file gets reprocessed.)

The wrapper can then declare the schema, and the WARC-based RDD can be transformed to an JavaRDD<Row> that matches the declared schema, both based on the sequence of field names.

Current implementation hardcodes the mapping and stores the types as metadata classes, so it can cope when the values are null. It seems that a more elegant implementation would require changes to the current indexer so the analysis declares the expected fields ahead of time.

Or we could just declare them all for now.

@anjackson
Copy link
Contributor Author

anjackson commented Sep 29, 2022

I realised older Hadoop support was needed, so experimented with some of these ideas. But after hacking things together, it's clear the Parquet writer is going to be painful to run under old Hadoop.

attempt_20220929220613710_0001_r_000000_0: 2022-09-29 22:06:34 ERROR TaskTracker:203 - Error running child : java.lang.NoSuchMethodError: 'long org.apache.hadoop.fs.FileSystem.getDefaultBlockSize(org.apache.hadoop.fs.Path)'
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.util.HadoopOutputFile.defaultBlockSize(HadoopOutputFile.java:93)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:338)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:480)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:435)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:92)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:433)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
attempt_20220929220613710_0001_r_000000_0: 	at org.apache.hadoop.mapred.Child.main(Child.java:170)

So, idea is that old Hadoop will just output JSONL, and then this can be transferred to the newer cluster as needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants