Skip to content

Commit

Permalink
Add Scalafix rule for LogicalTypeSupplier removal (#5178)
Browse files Browse the repository at this point in the history
Co-authored-by: Michel Davit <[email protected]>
  • Loading branch information
clairemcginty and RustedBones authored Jan 31, 2024
1 parent 5c112ff commit 994aff3
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 2 deletions.
3 changes: 2 additions & 1 deletion scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def scio(version: String): List[ModuleID] = {
case _ =>
List(
"scio-google-cloud-platform", // replaced scio-bigquery
"scio-extra" // new in 0.10
"scio-extra", // new in 0.10
"scio-smb"
)
})

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
rule = FixLogicalTypeSupplier
*/
package fix.v0_14_0

import com.spotify.scio.ScioContext
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.avro._
import com.spotify.scio.parquet.avro.LogicalTypeSupplier
import com.spotify.scio.values.SCollection
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}

object FixLogicalTypeSuppliers {
implicit val c: Coder[GenericRecord] = ???
val sc = ScioContext()

sc.parquetAvroFile[GenericRecord](
"input",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
))

sc.parquetAvroFile[GenericRecord](
"input",
null,
null,
ParquetConfiguration.of(
(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[LogicalTypeSupplier])
))

sc.parquetAvroFile[GenericRecord](
"input",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier],
"foo" -> "bar"
))

sc.parquetAvroFile[GenericRecord](
"input",
null,
null,
ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier],
"foo" -> "bar"
))

val data: SCollection[GenericRecord] = ???
data.saveAsParquetAvroFile(
"output",
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)

data.saveAsParquetAvroFile(
"output",
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier],
"foo" -> "bar"
)
)

val conf = new Configuration()
conf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[LogicalTypeSupplier], classOf[AvroDataSupplier])
conf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[LogicalTypeSupplier], classOf[LogicalTypeSupplier])
conf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
conf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[LogicalTypeSupplier])
conf.setClass("someClass", classOf[String], classOf[CharSequence])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package fix.v0_14_0

import com.spotify.scio.ScioContext
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.avro._
import com.spotify.scio.values.SCollection
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}

object FixLogicalTypeSuppliers {
implicit val c: Coder[GenericRecord] = ???
val sc = ScioContext()

sc.parquetAvroFile[GenericRecord]("input")

sc.parquetAvroFile[GenericRecord]("input", null, null)

sc.parquetAvroFile[GenericRecord]("input", conf = ParquetConfiguration.of("foo" -> "bar"))

sc.parquetAvroFile[GenericRecord]("input", null, null, ParquetConfiguration.of("foo" -> "bar"))

val data: SCollection[GenericRecord] = ???
data.saveAsParquetAvroFile("output")

data.saveAsParquetAvroFile("output", conf = ParquetConfiguration.of("foo" -> "bar"))

val conf = new Configuration()




conf.setClass("someClass", classOf[String], classOf[CharSequence])
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ fix.v0_14_0.FixAvroSchemasPackage
fix.v0_14_0.FixAvroCoder
fix.v0_14_0.FixDynamicAvro
fix.v0_14_0.FixGenericAvro
fix.v0_14_0.FixLogicalTypeSupplier

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package fix.v0_14_0

import scalafix.v1._
import scala.meta._

object FixLogicalTypeSupplier {
val ParquetConfigurationMatcher: SymbolMatcher = SymbolMatcher.normalized(
"com/spotify/scio/parquet/package/ParquetConfiguration#of"
)
val SetClassMatcher: SymbolMatcher = SymbolMatcher.normalized(
"org/apache/hadoop/conf/Configuration#setClass"
)

val JavaClassMatcher: SymbolMatcher = SymbolMatcher.normalized("java/lang/Class")

private val ParquetAvroPrefix = "com/spotify/scio/parquet/avro"
val LogicalTypeSupplierMatcher: SymbolMatcher = SymbolMatcher.normalized(
s"$ParquetAvroPrefix/LogicalTypeSupplier",
"org/apache/beam/sdk/extensions/smb/AvroLogicalTypeSupplier"
)

private val ParquetAvroMatcher = SymbolMatcher.normalized(
s"$ParquetAvroPrefix/syntax/ScioContextOps#parquetAvroFile",
s"$ParquetAvroPrefix/syntax/SCollectionOps#saveAsParquetAvroFile"
)
}

class FixLogicalTypeSupplier extends SemanticRule("FixLogicalTypeSupplier") {
import FixLogicalTypeSupplier._

private def isLogicalTypeSupplier(term: Term)(implicit doc: SemanticDocument): Boolean =
term match {
case q"classOf[$tpe]" => LogicalTypeSupplierMatcher.matches(tpe.symbol)
case _ =>
term.symbol.info
.map(_.signature)
.collect { case MethodSignature(_, _, returnedType) => returnedType }
.collect { case TypeRef(_, sym, tpe :: Nil) if JavaClassMatcher.matches(sym) => tpe }
.collect { case TypeRef(_, sym, _) => sym }
.exists(LogicalTypeSupplierMatcher.matches)
}

private def parquetConfigurationArgs(
confArgs: List[Term]
)(implicit doc: SemanticDocument): List[Term] = confArgs.filterNot {
case q"($_, $rhs)" => isLogicalTypeSupplier(rhs)
case q"$_ -> $rhs" => isLogicalTypeSupplier(rhs)
case _ => false
}

private def updateIOArgs(fnArgs: List[Term])(implicit doc: SemanticDocument): List[Term] = {
fnArgs.flatMap {
case q"$lhs = $fn(..$confArgs)" if ParquetConfigurationMatcher.matches(fn.symbol) =>
val filtered = parquetConfigurationArgs(confArgs)
if (filtered.isEmpty) None else Some(q"$lhs = ParquetConfiguration.of(..$filtered)")
case q"$fn(..$confArgs)" if ParquetConfigurationMatcher.matches(fn.symbol) =>
val filtered = parquetConfigurationArgs(confArgs)
if (filtered.isEmpty) None else Some(q"ParquetConfiguration.of(..$filtered)")
case a =>
Some(a)
}
}

override def fix(implicit doc: SemanticDocument): Patch = {
doc.tree.collect {
case method @ q"$fn(..$args)" if ParquetAvroMatcher.matches(fn.symbol) =>
val newArgs = updateIOArgs(args)
Patch.replaceTree(method, q"$fn(..$newArgs)".syntax)
case method @ q"$_.$fn($_, $theClass, $xface)" if SetClassMatcher.matches(fn.symbol) =>
if (isLogicalTypeSupplier(theClass) || isLogicalTypeSupplier(xface)) {
Patch.removeTokens(method.tokens)
} else {
Patch.empty
}
case importer"com.spotify.scio.parquet.avro.{..$importees}" =>
importees.collect {
case i @ importee"LogicalTypeSupplier" => Patch.removeImportee(i)
case _ => Patch.empty
}.asPatch
case importer"org.apache.beam.sdk.extensions.smb.{..$importees}" =>
importees.collect {
case i @ importee"AvroLogicalTypeSupplier" => Patch.removeImportee(i)
case _ => Patch.empty
}.asPatch
}.asPatch
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ final class ConsistenceJoinNames extends SemanticRule("ConsistenceJoinNames") {
val updatedArgs = renameNamedArgs(args)
Patch.replaceTree(t, q"$qual.$updatedFn(..$updatedArgs)".syntax)
case t @ q"$qual.$fn(..$args)" =>
println(fn.symbol)
Patch.empty
}
}.asPatch
Expand Down

0 comments on commit 994aff3

Please sign in to comment.