From db08aac6182f845904a09cb97821702800bc6916 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 17 May 2024 09:31:58 +0200 Subject: [PATCH] [scio-core](feature) Add readFiles and readFilesWithPath apis (#5350) --- .../scio/avro/syntax/SCollectionSyntax.scala | 68 ++++- .../spotify/scio/avro/AvroJobTestTest.scala | 62 ++++- .../values/FileSCollectionFunctions.scala | 237 ++++++++++++++++++ .../com/spotify/scio/values/SCollection.scala | 120 ++++++--- .../values/FileSCollectionFunctionsTest.scala | 90 +++++++ .../spotify/scio/values/SCollectionTest.scala | 1 - .../spotify/scio/testing/JobTestTest.scala | 59 +++++ site/src/main/paradox/io/Binary.md | 2 +- site/src/main/paradox/io/ReadFiles.md | 74 ++++-- 9 files changed, 651 insertions(+), 62 deletions(-) create mode 100644 scio-core/src/main/scala/com/spotify/scio/values/FileSCollectionFunctions.scala create mode 100644 scio-core/src/test/scala/com/spotify/scio/values/FileSCollectionFunctionsTest.scala diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala index 129613ef0f..8a09bf241d 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala @@ -22,13 +22,13 @@ import com.spotify.scio.avro._ import com.spotify.scio.avro.types.AvroType.HasAvroAnnotation import com.spotify.scio.coders.Coder import com.spotify.scio.io.ClosedTap -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values._ import org.apache.avro.Schema import org.apache.avro.file.CodecFactory -import org.apache.avro.specific.SpecificRecord +import org.apache.avro.specific.{SpecificData, SpecificRecord} import org.apache.avro.generic.GenericRecord -import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory +import org.apache.beam.sdk.extensions.avro.io.{AvroDatumFactory, AvroIO => BAvroIO, AvroSource} import scala.reflect.ClassTag import scala.reflect.runtime.universe._ @@ -207,6 +207,63 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T } } +final class FilesSCollectionOps(private val self: SCollection[String]) extends AnyVal { + + def readAvroGenericFiles( + schema: Schema, + datumFactory: AvroDatumFactory[GenericRecord] = GenericRecordIO.ReadParam.DefaultDatumFactory + ): SCollection[GenericRecord] = { + val df = Option(datumFactory).getOrElse(GenericRecordDatumFactory) + implicit val coder: Coder[GenericRecord] = avroCoder(df, schema) + val transform = BAvroIO + .readFilesGenericRecords(schema) + .withDatumReaderFactory(df) + self.readFiles(filesTransform = transform) + } + + def readAvroSpecificFiles[T <: SpecificRecord: ClassTag]( + datumFactory: AvroDatumFactory[T] = SpecificRecordIO.ReadParam.DefaultDatumFactory + ): SCollection[T] = { + val recordClass = ScioUtil.classOf[T] + val schema = SpecificData.get().getSchema(recordClass) + val df = Option(datumFactory).getOrElse(new SpecificRecordDatumFactory(recordClass)) + implicit val coder: Coder[T] = avroCoder(df, schema) + val transform = BAvroIO + .readFiles(recordClass) + .withDatumReaderFactory(df) + self.readFiles(filesTransform = transform) + } + + def readAvroGenericFilesWithPath( + schema: Schema, + datumFactory: AvroDatumFactory[GenericRecord] = GenericRecordIO.ReadParam.DefaultDatumFactory + ): SCollection[(String, GenericRecord)] = { + val df = Option(datumFactory).getOrElse(GenericRecordDatumFactory) + implicit val coder: Coder[GenericRecord] = avroCoder(df, schema) + self.readFilesWithPath() { f => + AvroSource + .from(f) + .withSchema(schema) + .withDatumReaderFactory(df) + } + } + + def readAvroSpecificFilesWithPath[T <: SpecificRecord: ClassTag]( + datumFactory: AvroDatumFactory[T] = SpecificRecordIO.ReadParam.DefaultDatumFactory + ): SCollection[(String, T)] = { + val recordClass = ScioUtil.classOf[T] + val schema = SpecificData.get().getSchema(recordClass) + val df = Option(datumFactory).getOrElse(new SpecificRecordDatumFactory(recordClass)) + implicit val coder: Coder[T] = avroCoder(df, schema) + self.readFilesWithPath() { f => + AvroSource + .from(f) + .withSchema(recordClass) + .withDatumReaderFactory(df) + } + } +} + /** Enhanced with Avro methods. */ trait SCollectionSyntax { implicit def avroGenericRecordSCollectionOps( @@ -228,4 +285,9 @@ trait SCollectionSyntax { implicit def avroProtobufSCollectionOps[T <: Message]( c: SCollection[T] ): ProtobufSCollectionOps[T] = new ProtobufSCollectionOps[T](c) + + implicit def avroFilesSCollectionOps[T]( + c: SCollection[T] + )(implicit ev: T <:< String): FilesSCollectionOps = + new FilesSCollectionOps(c.covary_) } diff --git a/scio-avro/src/test/scala/com/spotify/scio/avro/AvroJobTestTest.scala b/scio-avro/src/test/scala/com/spotify/scio/avro/AvroJobTestTest.scala index 7c0423be9b..447e769922 100644 --- a/scio-avro/src/test/scala/com/spotify/scio/avro/AvroJobTestTest.scala +++ b/scio-avro/src/test/scala/com/spotify/scio/avro/AvroJobTestTest.scala @@ -20,6 +20,7 @@ package com.spotify.scio.avro import com.spotify.scio._ import com.spotify.scio.avro.AvroUtils._ import com.spotify.scio.coders.Coder +import com.spotify.scio.io.ReadIO import com.spotify.scio.testing.PipelineSpec import org.apache.avro.generic.GenericRecord @@ -54,6 +55,29 @@ object GenericAvroFileJob { } } +object ReadGenericAvroFilesJob { + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + sc.parallelize(args.list("input")) + .readAvroGenericFiles(AvroUtils.schema) + .saveAsAvroFile(args("output"), schema = AvroUtils.schema) + sc.run() + () + } +} + +object ReadSpecificAvroFilesWithPathJob { + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + sc.parallelize(args.list("input")) + .readAvroSpecificFilesWithPath[TestRecord]() + .map { case (f, r) => TestRecord.newBuilder(r).setStringField(f).build() } + .saveAsAvroFile(args("output")) + sc.run() + () + } +} + object GenericParseFnAvroFileJob { implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroUtils.schema) @@ -114,7 +138,8 @@ class AvroJobTestTest extends PipelineSpec { } def testGenericAvroFileJob(xs: Seq[GenericRecord]): Unit = { - implicit val coder = avroGenericRecordCoder + implicit val coder: Coder[GenericRecord] = + avroGenericRecordCoder(AvroUtils.schema) JobTest[GenericAvroFileJob.type] .args("--input=in.avro", "--output=out.avro") .input(AvroIO[GenericRecord]("in.avro"), (1 to 3).map(newGenericRecord)) @@ -137,7 +162,8 @@ class AvroJobTestTest extends PipelineSpec { def testGenericParseAvroFileJob(xs: Seq[GenericRecord]): Unit = { import GenericParseFnAvroFileJob.PartialFieldsAvro - implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder + implicit val coder: Coder[GenericRecord] = + avroGenericRecordCoder(AvroUtils.schema) JobTest[GenericParseFnAvroFileJob.type] .args("--input=in.avro", "--output=out.avro") .input(AvroIO[PartialFieldsAvro]("in.avro"), (1 to 3).map(PartialFieldsAvro)) @@ -148,7 +174,7 @@ class AvroJobTestTest extends PipelineSpec { .run() } - it should "pass when correct generic parsed records" in { + it should "pass when correct parsed generic records" in { testGenericParseAvroFileJob((1 to 3).map(newGenericRecord)) } @@ -160,4 +186,34 @@ class AvroJobTestTest extends PipelineSpec { testGenericParseAvroFileJob((1 to 4).map(newGenericRecord)) } } + + "Read avro files" should "pass when correct specific records" in { + implicit val coder: Coder[GenericRecord] = + avroGenericRecordCoder(AvroUtils.schema) + val expected = (1 to 6).map(newGenericRecord) + val (part1, part2) = expected.splitAt(3) + JobTest[ReadGenericAvroFilesJob.type] + .args("--input=in1.avro", "--input=in2.avro", "--output=out.avro") + .input(ReadIO[GenericRecord]("in1.avro"), part1) + .input(ReadIO[GenericRecord]("in2.avro"), part2) + .output(AvroIO[GenericRecord]("out.avro"))(coll => coll should containInAnyOrder(expected)) + .run() + } + + "Read avro files with path" should "pass when correct specific records" in { + val input = (1 to 6).map(newSpecificRecord) + val (part1, part2) = input.splitAt(3) + val expected = (1 to 6).map { i => + val r = newSpecificRecord(i) + r.setStringField(if (i <= 3) "in1.avro" else "in2.avro") + r + } + + JobTest[ReadSpecificAvroFilesWithPathJob.type] + .args("--input=in1.avro", "--input=in2.avro", "--output=out.avro") + .input(ReadIO[TestRecord]("in1.avro"), part1) + .input(ReadIO[TestRecord]("in2.avro"), part2) + .output(AvroIO[TestRecord]("out.avro"))(coll => coll should containInAnyOrder(expected)) + .run() + } } diff --git a/scio-core/src/main/scala/com/spotify/scio/values/FileSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/FileSCollectionFunctions.scala new file mode 100644 index 0000000000..ee5bd512c0 --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/values/FileSCollectionFunctions.scala @@ -0,0 +1,237 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.values + +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.io.ReadIO +import com.spotify.scio.testing.TestDataManager +import com.spotify.scio.util.Functions +import com.spotify.scio.util.TupleFunctions.kvToTuple +import org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment +import org.apache.beam.sdk.{io => beam} +import org.apache.beam.sdk.io.{ + Compression, + FileBasedSource, + ReadAllViaFileBasedSource, + ReadAllViaFileBasedSourceWithFilename +} +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider +import org.apache.beam.sdk.transforms.{PTransform, ParDo} +import org.apache.beam.sdk.values.{KV, PCollection} + +object FileSCollectionFunctions { + + // from beam TextIO/AvroIO + // 64MB is a reasonable value that allows to amortize the cost of opening files, + // but is not so large as to exhaust a typical runner's maximum amount of output per ProcessElement call. + val DefaultBundleSizeBytes: Long = 64 * 1024 * 1024L +} + +class FileSCollectionFunctions(self: SCollection[String]) { + + import FileSCollectionFunctions._ + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @return + * each line of the input files. + */ + def readTextFiles(): SCollection[String] = + readFiles(beam.TextIO.readFiles()) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @return + * each file fully read as [[Array[Byte]]. + */ + def readFilesAsBytes(): SCollection[Array[Byte]] = + readFiles(_.readFullyAsBytes()) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @return + * each file fully read as [[String]]. + */ + def readFilesAsString(): SCollection[String] = + readFiles(_.readFullyAsUTF8String()) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @see + * [[readFilesAsBytes]], [[readFilesAsString]] + */ + def readFiles[A: Coder]( + f: beam.FileIO.ReadableFile => A + ): SCollection[A] = + readFiles(DirectoryTreatment.SKIP, Compression.AUTO)(f) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @see + * [[readFilesAsBytes]], [[readFilesAsString]] + * + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFiles[A: Coder](directoryTreatment: DirectoryTreatment, compression: Compression)( + f: beam.FileIO.ReadableFile => A + ): SCollection[A] = { + val transform = ParDo.of(Functions.mapFn[beam.FileIO.ReadableFile, A](f)) + readFiles(transform, directoryTreatment, compression) + } + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFiles[A: Coder]( + desiredBundleSizeBytes: Long, + directoryTreatment: DirectoryTreatment, + compression: Compression + )(f: String => FileBasedSource[A]): SCollection[A] = { + val createSource = Functions.serializableFn(f) + val bcoder = CoderMaterializer.beam(self.context, Coder[A]) + val fileTransform = new ReadAllViaFileBasedSource(desiredBundleSizeBytes, createSource, bcoder) + readFiles(fileTransform, directoryTreatment, compression) + } + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. + * + * @see + * [[readFilesAsBytes]], [[readFilesAsString]], [[readFiles]] + * + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFiles[A: Coder]( + filesTransform: PTransform[_ >: PCollection[beam.FileIO.ReadableFile], PCollection[A]], + directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, + compression: Compression = Compression.AUTO + ): SCollection[A] = + if (self.context.isTest) { + val id = self.context.testId.get + self.flatMap(s => TestDataManager.getInput(id)(ReadIO[A](s)).asIterable.get) + } else { + self + .applyTransform(new PTransform[PCollection[String], PCollection[A]]() { + override def expand(input: PCollection[String]): PCollection[A] = + input + .apply(beam.FileIO.matchAll()) + .apply( + beam.FileIO + .readMatches() + .withCompression(compression) + .withDirectoryTreatment(directoryTreatment) + ) + .apply(filesTransform) + }) + } + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @return + * origin file name paired with read line. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readTextFilesWithPath( + desiredBundleSize: Long = DefaultBundleSizeBytes, + directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, + compression: Compression = Compression.AUTO + ): SCollection[(String, String)] = { + readFilesWithPath( + desiredBundleSize, + directoryTreatment, + compression + ) { f => + new beam.TextSource( + StaticValueProvider.of(f), + EmptyMatchTreatment.DISALLOW, + Array('\n'.toByte), + 0 + ) + } + } + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @return + * origin file name paired with read element. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFilesWithPath[A: Coder]( + desiredBundleSizeBytes: Long = DefaultBundleSizeBytes, + directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, + compression: Compression = Compression.AUTO + )( + f: String => FileBasedSource[A] + ): SCollection[(String, A)] = { + if (self.context.isTest) { + val id = self.context.testId.get + self.flatMap { s => + TestDataManager + .getInput(id)(ReadIO[A](s)) + .asIterable + .get + .map(x => s -> x) + } + } else { + val createSource = Functions.serializableFn(f) + val bcoder = CoderMaterializer.beam(self.context, Coder[KV[String, A]]) + val fileTransform = new ReadAllViaFileBasedSourceWithFilename( + desiredBundleSizeBytes, + createSource, + bcoder + ) + readFiles(fileTransform, directoryTreatment, compression).map(kvToTuple) + } + } + +} diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 80700b3c1a..6c1ddff736 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -37,7 +37,7 @@ import com.spotify.scio.util.random.{BernoulliSampler, PoissonSampler} import com.twitter.algebird.{Aggregator, Monoid, MonoidAggregator, Semigroup} import org.apache.beam.sdk.coders.{ByteArrayCoder, Coder => BCoder} import org.apache.beam.sdk.schemas.SchemaCoder -import org.apache.beam.sdk.io.Compression +import org.apache.beam.sdk.io.{Compression, FileBasedSource} import org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement, Timestamp} import org.apache.beam.sdk.transforms._ @@ -1359,38 +1359,37 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { // Read operations // ======================================================================= + /** @deprecated Use readTextFiles */ + @deprecated("Use readTextFiles", "0.14.5") + def readFiles(implicit ev: T <:< String): SCollection[String] = + readFiles(beam.TextIO.readFiles()) + /** * Reads each file, represented as a pattern, in this [[SCollection]]. * * @return * each line of the input files. - * @see - * [[readFilesAsBytes]], [[readFilesAsString]] */ - def readFiles(implicit ev: T <:< String): SCollection[String] = - readFiles(beam.TextIO.readFiles()) + def readTextFiles(implicit ev: T <:< String): SCollection[String] = + new FileSCollectionFunctions(this.covary_).readTextFiles() /** * Reads each file, represented as a pattern, in this [[SCollection]]. * * @return * each file fully read as [[Array[Byte]]. - * @see - * [[readFilesAsBytes]], [[readFilesAsString]] */ def readFilesAsBytes(implicit ev: T <:< String): SCollection[Array[Byte]] = - readFiles(_.readFullyAsBytes()) + new FileSCollectionFunctions(this.covary_).readFilesAsBytes() /** * Reads each file, represented as a pattern, in this [[SCollection]]. * * @return * each file fully read as [[String]]. - * @see - * [[readFilesAsBytes]], [[readFilesAsString]] */ def readFilesAsString(implicit ev: T <:< String): SCollection[String] = - readFiles(_.readFullyAsUTF8String()) + new FileSCollectionFunctions(this.covary_).readFilesAsString() /** * Reads each file, represented as a pattern, in this [[SCollection]]. @@ -1401,7 +1400,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { def readFiles[A: Coder]( f: beam.FileIO.ReadableFile => A )(implicit ev: T <:< String): SCollection[A] = - readFiles(DirectoryTreatment.SKIP, Compression.AUTO)(f) + new FileSCollectionFunctions(this.covary_).readFiles(f) /** * Reads each file, represented as a pattern, in this [[SCollection]]. @@ -1416,13 +1415,27 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { */ def readFiles[A: Coder](directoryTreatment: DirectoryTreatment, compression: Compression)( f: beam.FileIO.ReadableFile => A - )(implicit ev: T <:< String): SCollection[A] = { - val transform = - ParDo - .of(Functions.mapFn[beam.FileIO.ReadableFile, A](f)) - .asInstanceOf[PTransform[PCollection[beam.FileIO.ReadableFile], PCollection[A]]] - readFiles(transform, directoryTreatment, compression) - } + )(implicit ev: T <:< String): SCollection[A] = + new FileSCollectionFunctions(this.covary_).readFiles(directoryTreatment, compression)(f) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFiles[A: Coder]( + desiredBundleSizeBytes: Long, + directoryTreatment: DirectoryTreatment, + compression: Compression + )(f: String => FileBasedSource[A])(implicit ev: T <:< String): SCollection[A] = + new FileSCollectionFunctions(this.covary_) + .readFiles(desiredBundleSizeBytes, directoryTreatment, compression)(f) /** * Reads each file, represented as a pattern, in this [[SCollection]]. @@ -1436,29 +1449,58 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. */ def readFiles[A: Coder]( - filesTransform: PTransform[PCollection[beam.FileIO.ReadableFile], PCollection[A]], + filesTransform: PTransform[_ >: PCollection[beam.FileIO.ReadableFile], PCollection[A]], directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, compression: Compression = Compression.AUTO )(implicit ev: T <:< String): SCollection[A] = - if (context.isTest) { - val id = context.testId.get - this.flatMap(s => TestDataManager.getInput(id)(ReadIO[A](ev(s))).asIterable.get) - } else { - this - .covary_[String] - .applyTransform(new PTransform[PCollection[String], PCollection[A]]() { - override def expand(input: PCollection[String]): PCollection[A] = - input - .apply(beam.FileIO.matchAll()) - .apply( - beam.FileIO - .readMatches() - .withCompression(compression) - .withDirectoryTreatment(directoryTreatment) - ) - .apply(filesTransform) - }) - } + new FileSCollectionFunctions(this.covary_) + .readFiles(filesTransform, directoryTreatment, compression) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @return + * origin file name paired with read line. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readTextFilesWithPath( + desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes, + directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, + compression: Compression = Compression.AUTO + )(implicit ev: T <:< String): SCollection[(String, String)] = + new FileSCollectionFunctions(this.covary_) + .readTextFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression) + + /** + * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into + * multiple offset ranges and read with the [[FileBasedSource]]. + * + * @return + * origin file name paired with read element. + * + * @param desiredBundleSizeBytes + * Desired size of bundles read by the sources. + * @param directoryTreatment + * Controls how to handle directories in the input. + * @param compression + * Reads files using the given [[org.apache.beam.sdk.io.Compression]]. + */ + def readFilesWithPath[A: Coder]( + desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes, + directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP, + compression: Compression = Compression.AUTO + )( + f: String => FileBasedSource[A] + )(implicit ev: T <:< String): SCollection[(String, A)] = + new FileSCollectionFunctions(this.covary_) + .readFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression)(f) /** * Pairs each element with the value of the provided [[SideInput]] in the element's window. diff --git a/scio-core/src/test/scala/com/spotify/scio/values/FileSCollectionFunctionsTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/FileSCollectionFunctionsTest.scala new file mode 100644 index 0000000000..75abea0bcb --- /dev/null +++ b/scio-core/src/test/scala/com/spotify/scio/values/FileSCollectionFunctionsTest.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.values + +import com.spotify.scio.testing.PipelineSpec +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider +import org.apache.beam.sdk.{io => beam} +import org.apache.commons.io.FileUtils + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path} + +class FileSCollectionFunctionsTest extends PipelineSpec { + + private def withTempDir(test: Path => Any): Unit = { + val dir = Files.createTempDirectory("scio-test-") + try { + test(dir) + } finally { + FileUtils.deleteDirectory(dir.toFile) + } + } + + private def writerTestFiles(dir: Path): Unit = { + for { + partition <- Seq("a", "b") + idx <- Seq(1, 2) + } yield { + val p = dir.resolve(partition) + Files.createDirectories(p) + val f = p.resolve(s"part-$idx.txt") + val data = s"$partition$idx" + Files.write(f, data.getBytes(StandardCharsets.UTF_8)) + } + } + + "FileSCollection" should "support reading file patterns" in withTempDir { dir => + writerTestFiles(dir) + runWithRealContext() { sc => + val actual = sc + .parallelize(Seq(s"$dir/a/part-*", s"$dir/b/part-*")) + .readFiles(beam.TextIO.readFiles()) + actual should containInAnyOrder(Seq("a1", "a2", "b1", "b2")) + } + } + + it should "support reading file patterns with retaining path" in withTempDir { dir => + writerTestFiles(dir) + runWithRealContext() { sc => + val actual = sc + .parallelize(Seq(s"$dir/a/part-*", s"$dir/b/part-*")) + .readFilesWithPath( + 100L, + beam.FileIO.ReadMatches.DirectoryTreatment.PROHIBIT, + beam.Compression.AUTO + ) { f => + new beam.TextSource( + StaticValueProvider.of(f), + beam.fs.EmptyMatchTreatment.DISALLOW, + Array('\n'.toByte), + 0 + ) + } + + actual should containInAnyOrder( + Seq( + s"$dir/a/part-1.txt" -> "a1", + s"$dir/a/part-2.txt" -> "a2", + s"$dir/b/part-1.txt" -> "b1", + s"$dir/b/part-2.txt" -> "b2" + ) + ) + } + } + +} diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index 3a67a58d59..1b33835818 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -949,5 +949,4 @@ class SCollectionTest extends PipelineSpec { coll should containInAnyOrder(Seq(Seq.empty[Int])) } } - } diff --git a/scio-test/core/src/test/scala/com/spotify/scio/testing/JobTestTest.scala b/scio-test/core/src/test/scala/com/spotify/scio/testing/JobTestTest.scala index 038567f600..03b6fa6476 100644 --- a/scio-test/core/src/test/scala/com/spotify/scio/testing/JobTestTest.scala +++ b/scio-test/core/src/test/scala/com/spotify/scio/testing/JobTestTest.scala @@ -25,12 +25,14 @@ import com.spotify.scio.transforms.DoFnWithResource.ResourceType import com.spotify.scio.transforms.{BaseAsyncLookupDoFn, GuavaAsyncDoFn} import com.spotify.scio.values.SCollection import org.apache.beam.sdk.Pipeline.PipelineExecutionException +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment import org.apache.beam.sdk.metrics.DistributionResult import org.apache.beam.sdk.{io => beam} import org.scalatest.exceptions.TestFailedException import scala.io.Source import org.apache.beam.sdk.metrics.{Counter, Distribution, Gauge} +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.ParDo import org.apache.beam.sdk.values.KV @@ -248,6 +250,26 @@ object ReadAllBytesJob { } } +object ReadAllWithPathJob { + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + val bundleSizeBytes = 64 * 1024 * 1024L + sc.textFile(args("input")) + .readFilesWithPath(bundleSizeBytes) { f => + new beam.TextSource( + StaticValueProvider.of(f), + EmptyMatchTreatment.DISALLOW, + Array('\n'.toByte), + 0 + ) + } + .map { case (f, x) => s"$f:$x" } + .saveAsTextFile(args("output")) + sc.run() + () + } +} + object JobWithoutRun { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) @@ -442,6 +464,43 @@ class JobTestTest extends PipelineSpec { s"can't be converted to Iterable[T] to test this ScioIO type" } + def testReadAllWithPathJob(xs: String*): Unit = + JobTest[ReadAllWithPathJob.type] + .args("--input=in1.txt", "--output=out.txt") + .input(TextIO("in1.txt"), Seq("a", "b")) + .input(ReadIO("a"), Seq("a1", "a2")) + .input(ReadIO("b"), Seq("b1", "b2")) + .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs)) + .run() + + it should "pass correct path ReadIO" in { + testReadAllWithPathJob("a:a1", "a:a2", "b:b1", "b:b2") + } + + it should "fail correct path ReadIO" in { + an[AssertionError] should be thrownBy { testReadAllWithPathJob("a:a1", "a:a2") } + an[AssertionError] should be thrownBy { + testReadAllWithPathJob("a:a1", "a:a2", "b:b1", "b:b2", "c:c1") + } + } + + it should "fail path ReadIO used with TestStream input" in { + val testStream = testStreamOf[Array[Byte]] + .addElements("a1".getBytes, "a2".getBytes) + .advanceWatermarkToInfinity() + + the[PipelineExecutionException] thrownBy { + JobTest[ReadAllWithPathJob.type] + .args("--input=in.txt", "--output=out.txt") + .input(TextIO("in.txt"), Seq("a")) + .inputStream(ReadIO("a"), testStream) + .output(TextIO("out.txt")) { _ => } + .run() + } should have message + s"java.lang.UnsupportedOperationException: Test input TestStream(${testStream.getEvents}) " + + s"can't be converted to Iterable[T] to test this ScioIO type" + } + // ======================================================================= // Handling incorrect test wiring // ======================================================================= diff --git a/site/src/main/paradox/io/Binary.md b/site/src/main/paradox/io/Binary.md index a0f7d14724..1fb2f958ac 100644 --- a/site/src/main/paradox/io/Binary.md +++ b/site/src/main/paradox/io/Binary.md @@ -2,7 +2,7 @@ ## Read Binary files -See @ref:[read as binary](ReadFiles.md#read-as-binary) for reading an entire file as a single binary record. +See @ref:[read entire file as binary](ReadFiles.md#read-entire-file-as-binary) for reading an entire file as a single binary record. Binary reads are supported via the @scaladoc[binaryFile](com.spotify.scio.ScioContext#binaryFile(path:String,reader:com.spotify.scio.io.BinaryIO.BinaryFileReader,compression:org.apache.beam.sdk.io.Compression,emptyMatchTreatment:org.apache.beam.sdk.io.fs.EmptyMatchTreatment,suffix:String):com.spotify.scio.values.SCollection[Array[Byte]]), with a @scaladoc[BinaryFileReader](com.spotify.scio.io.BinaryIO.BinaryFileReader) instance provided that can parse the underlying binary file format. diff --git a/site/src/main/paradox/io/ReadFiles.md b/site/src/main/paradox/io/ReadFiles.md index e82d85f682..b975db1aeb 100644 --- a/site/src/main/paradox/io/ReadFiles.md +++ b/site/src/main/paradox/io/ReadFiles.md @@ -1,10 +1,10 @@ # ReadFiles -Scio supports reading file paths from an `SCollection[String]` into various formats. +Scio supports reading file paths/patterns from an `SCollection[String]` into various formats. ## Read as text lines -Reading to `String` text lines via @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[String]): +Reading to `String` text lines via @scaladoc[readTextFiles](com.spotify.scio.values.SCollection#readTextFiles(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[String]): ```scala mdoc:compile-only import com.spotify.scio.ScioContext @@ -12,12 +12,13 @@ import com.spotify.scio.values.SCollection val sc: ScioContext = ??? val paths: SCollection[String] = ??? -val fileBytes: SCollection[String] = paths.readFiles + +val lines: SCollection[String] = paths.readTextFiles ``` ## Read entire file as String -Reading to `String` text lines via @scaladoc[readFilesAsString](com.spotify.scio.values.SCollection#readFilesAsString(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[String]): +Reading entire files to `String` via @scaladoc[readFilesAsString](com.spotify.scio.values.SCollection#readFilesAsString(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[String]): ```scala mdoc:compile-only import com.spotify.scio.ScioContext @@ -25,12 +26,13 @@ import com.spotify.scio.values.SCollection val sc: ScioContext = ??? val paths: SCollection[String] = ??? -val fileBytes: SCollection[String] = paths.readFiles + +val files: SCollection[String] = paths.readFilesAsString ``` -## Read as binary +## Read entire file as binary -Reading to binary `Array[Byte]` via @scaladoc[readFilesAsBytes](com.spotify.scio.values.SCollection#readFilesAsBytes(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[Array[Byte]]): +Reading entire files to binary `Array[Byte]` via @scaladoc[readFilesAsBytes](com.spotify.scio.values.SCollection#readFilesAsBytes(implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[Array[Byte]]): ```scala mdoc:compile-only import com.spotify.scio.ScioContext @@ -38,12 +40,13 @@ import com.spotify.scio.values.SCollection val sc: ScioContext = ??? val paths: SCollection[String] = ??? -val fileBytes: SCollection[Array[Byte]] = paths.readFilesAsBytes + +val files: SCollection[Array[Byte]] = paths.readFilesAsBytes ``` -## Read as a custom type +## Read entire file as a custom type -Reading to a custom type with a user-defined function from `FileIO.ReadableFile` to the output type via @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles[A](f:org.apache.beam.sdk.io.FileIO.ReadableFile=%3EA)(implicitevidence$24:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[A]): +Reading entire files to a custom type with a user-defined function from `FileIO.ReadableFile` to the output type via @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles[A](f:org.apache.beam.sdk.io.FileIO.ReadableFile=%3EA)(implicitevidence$24:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[A]): ```scala mdoc:compile-only import com.spotify.scio.ScioContext @@ -53,13 +56,15 @@ import org.apache.beam.sdk.{io => beam} case class A(i: Int, s: String) val sc: ScioContext = ??? val paths: SCollection[String] = ??? + val userFn: beam.FileIO.ReadableFile => A = ??? val fileBytes: SCollection[A] = paths.readFiles(userFn) ``` ## Read with a Beam transform -If there is an existing beam `PTransform` from `FileIO.ReadableFile` to `A` (as an example, beam's `TextIO.readFiles()`), this can be reused via another variant of @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles[A](filesTransform:org.apache.beam.sdk.transforms.PTransform[org.apache.beam.sdk.values.PCollection[org.apache.beam.sdk.io.FileIO.ReadableFile],org.apache.beam.sdk.values.PCollection[A]],directoryTreatment:org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment,compression:org.apache.beam.sdk.io.Compression)(implicitevidence$26:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[A]) +Reading a file can be done with a beam `PTransform` from a `PCollection[FileIO.ReadableFile]` to `PCollection[T]` (as an example, beam's `TextIO.readFiles()`), +via another variant of @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles[A](filesTransform:org.apache.beam.sdk.transforms.PTransform[org.apache.beam.sdk.values.PCollection[org.apache.beam.sdk.io.FileIO.ReadableFile],org.apache.beam.sdk.values.PCollection[A]],directoryTreatment:org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment,compression:org.apache.beam.sdk.io.Compression)(implicitevidence$26:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[A]) ```scala mdoc:compile-only import com.spotify.scio.ScioContext @@ -68,10 +73,49 @@ import org.apache.beam.sdk.{io => beam} import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.values.PCollection -case class A(i: Int, s: String) +case class Record(i: Int, s: String) + +val sc: ScioContext = ??? +val paths: SCollection[String] = ??? + +val userTransform: PTransform[PCollection[beam.FileIO.ReadableFile], PCollection[Record]] = ??? +val records: SCollection[Record] = paths.readFiles(userTransform) +``` + +## Read with a Beam source + +Reading a file can be done with a beam `FileBasedSource[T]` (as example, beam's `TextSource`) +via another variant of @scaladoc[readFiles](com.spotify.scio.values.SCollection#readFiles[A](desiredBundleSizeBytes:Long,directoryTreatment:org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment,compression:org.apache.beam.sdk.io.Compression)(f:String=%3Eorg.apache.beam.sdk.io.FileBasedSource[A])(implicitevidence$27:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[A]). + +When using @scaladoc[readFilesWithPath](com.spotify.scio.values.SCollection#readFilesWithPath[A](desiredBundleSizeBytes:Long,directoryTreatment:org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment,compression:org.apache.beam.sdk.io.Compression)(filesSource:String=%3Eorg.apache.beam.sdk.io.FileBasedSource[A])(implicitevidence$29:com.spotify.scio.coders.Coder[A],implicitev:T%3C:%3CString):com.spotify.scio.values.SCollection[(String,A)]), the origin file path +will be passed along with all elements emitted by the source. + +The source will be created with the given file paths, and then split in sub-ranges depending on the desired bundle size. + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.{io => beam} + +case class Record(i: Int, s: String) val sc: ScioContext = ??? val paths: SCollection[String] = ??? -val userTransform: PTransform[PCollection[beam.FileIO.ReadableFile], PCollection[A]] = ??? -val fileBytes: SCollection[A] = paths.readFiles(userTransform) -``` \ No newline at end of file + +val desiredBundleSizeBytes: Long = ??? +val directoryTreatment: beam.FileIO.ReadMatches.DirectoryTreatment = ??? +val compression: beam.Compression = ??? +val createSource: String => beam.FileBasedSource[Record] = ??? + +val records: SCollection[Record] = paths.readFiles( + desiredBundleSizeBytes, + directoryTreatment, + compression +) { file => createSource(file) } + +val recordsWithPath: SCollection[(String, Record)] = paths.readFilesWithPath( + desiredBundleSizeBytes, + directoryTreatment, + compression +) { file => createSource(file) } +```