diff --git a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala index dfde1661d9..7fadeb95d8 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala @@ -56,7 +56,6 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] { override protected def read(sc: ScioContext, params: ReadP): SCollection[Array[Byte]] = { val filePattern = ScioUtil.filePattern(path, params.suffix) - val desiredBundleSizeBytes = 64 * 1024 * 1024L // 64 mb val coder = ByteArrayCoder.of() val srcFn = Functions.serializableFn { path: String => new BinaryIO.BinarySource(path, params.emptyMatchTreatment, params.reader) @@ -74,7 +73,8 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] { ) .applyTransform( "Read all via FileBasedSource", - new ReadAllViaFileBasedSource[Array[Byte]](desiredBundleSizeBytes, srcFn, coder) + // Setting desiredBundleSizeBytes to Long.MaxValue prevents Beam from trying to split files + new ReadAllViaFileBasedSource[Array[Byte]](Long.MaxValue, srcFn, coder) ) } @@ -326,6 +326,9 @@ object BinaryIO { false } } + + override def allowsDynamicSplitting(): Boolean = + false } } @@ -344,8 +347,13 @@ object BinaryIO { fileMetadata: Metadata, start: Long, end: Long - ): FileBasedSource[Array[Byte]] = + ): FileBasedSource[Array[Byte]] = { + require( + start == 0, + s"Range with offset $start requested, but BinaryIO is unsplittable" + ) new BinarySingleFileSource(binaryFileReader, fileMetadata, start, end) + } override def createSingleFileReader( options: PipelineOptions