Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix #5472) Set desiredBundleSizeBytes to Long.MaxValue BinaryIO reads to prevent file subrange-splitting #5473

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.SerializableFunctions
import org.apache.beam.sdk.util.MimeTypes
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions
import org.apache.commons.compress.compressors.CompressorStreamFactory
import org.typelevel.scalaccompat.annotation.unused

Expand All @@ -56,7 +57,6 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] {

override protected def read(sc: ScioContext, params: ReadP): SCollection[Array[Byte]] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val desiredBundleSizeBytes = 64 * 1024 * 1024L // 64 mb
val coder = ByteArrayCoder.of()
val srcFn = Functions.serializableFn { path: String =>
new BinaryIO.BinarySource(path, params.emptyMatchTreatment, params.reader)
Expand All @@ -74,7 +74,8 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] {
)
.applyTransform(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource[Array[Byte]](desiredBundleSizeBytes, srcFn, coder)
// Setting desiredBundleSizeBytes to Long.MaxValue prevents Beam from trying to split files
new ReadAllViaFileBasedSource[Array[Byte]](Long.MaxValue, srcFn, coder)
)
}

Expand Down Expand Up @@ -326,6 +327,9 @@ object BinaryIO {
false
}
}

override def allowsDynamicSplitting(): Boolean =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant (it's inherited from FileBasedSource#isSplittable if not set) but I noticed that Beam IOs set this just to be explicit, so why not

false
}
}

Expand All @@ -344,8 +348,14 @@ object BinaryIO {
fileMetadata: Metadata,
start: Long,
end: Long
): FileBasedSource[Array[Byte]] =
): FileBasedSource[Array[Byte]] = {
Preconditions.checkArgument(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in scala land, we've always used require. IMHO we should keep checkArgument for java land

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks!

start == 0,
"Range with offset {} requested, but BinaryIO is unsplittable",
start
)
new BinarySingleFileSource(binaryFileReader, fileMetadata, start, end)
}

override def createSingleFileReader(
options: PipelineOptions
Expand Down