Skip to content

Commit

Permalink
feat(targets): Support a x-sql-datatype annotation to let targets c…
Browse files Browse the repository at this point in the history
…ustomize SQL type handling
  • Loading branch information
edgarrmondragon committed Jan 15, 2025
1 parent 8638b07 commit 9adb647
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
24 changes: 18 additions & 6 deletions singer_sdk/_singerlib/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
"anyOf",
"patternProperties",
"allOf",
# JSON Schema extensions
"x-sql-datatype",
]


Expand Down Expand Up @@ -84,6 +86,9 @@ class Schema:
contentMediaType: str | None = None # noqa: N815
contentEncoding: str | None = None # noqa: N815

# JSON Schema extensions
x_sql_datatype: str | None = None

def to_dict(self) -> dict[str, t.Any]:
"""Return the raw JSON Schema as a (possibly nested) dict.
Expand All @@ -99,12 +104,14 @@ def to_dict(self) -> dict[str, t.Any]:
result["items"] = self.items.to_dict()

for key in STANDARD_KEYS:
if self.__dict__.get(key) is not None:
result[key] = self.__dict__[key]
attr = key.replace("-", "_")
if (val := self.__dict__.get(attr)) is not None:
result[key] = val

for key in META_KEYS:
if self.__dict__.get(key) is not None:
result[f"${key}"] = self.__dict__[key]
attr = key.replace("-", "_")
if (val := self.__dict__.get(attr)) is not None:
result[f"${key}"] = val

return result

Expand Down Expand Up @@ -142,6 +149,7 @@ def from_dict(
... "description": "Age in years which must be equal to or greater than zero.",
... "type": "integer",
... "minimum": 0,
... "x-sql-datatype": "smallint",
... },
... },
... "required": ["firstName", "lastName"],
Expand All @@ -153,6 +161,8 @@ def from_dict(
"The person's first name."
>>> schema.properties["age"].minimum
0
>>> schema.properties["age"].x_sql_datatype
'smallint'
>>> schema.schema
'http://json-schema.org/draft/2020-12/schema'
""" # noqa: E501
Expand All @@ -168,12 +178,14 @@ def from_dict(
kwargs["items"] = cls.from_dict(items, **schema_defaults)

for key in STANDARD_KEYS:
attr = key.replace("-", "_")
if key in data:
kwargs[key] = data[key]
kwargs[attr] = data[key]

for key in META_KEYS:
attr = key.replace("-", "_")
if f"${key}" in data:
kwargs[key] = data[f"${key}"]
kwargs[attr] = data[f"${key}"]

return cls(**kwargs)

Expand Down
32 changes: 28 additions & 4 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ class JSONSchemaToSQL:
This class provides a mapping from JSON Schema types to SQLAlchemy types.
.. versionadded:: 0.42.0
.. versionchanged:: 0.44.0
Added the
:meth:`singer_sdk.connectors.sql.JSONSchemaToSQL.register_sql_datatype_handler`
method to map custom ``x-sql-datatype`` annotations into SQLAlchemy types.
"""

def __init__(self, *, max_varchar_length: int | None = None) -> None:
Expand Down Expand Up @@ -276,6 +280,8 @@ def __init__(self, *, max_varchar_length: int | None = None) -> None:
"ipv6": lambda _: sa.types.VARCHAR(45),
}

self._sql_datatype_mapping: dict[str, JSONtoSQLHandler] = {}

self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR

def _invoke_handler( # noqa: PLR6301
Expand Down Expand Up @@ -338,6 +344,20 @@ def register_format_handler(
""" # noqa: E501
self._format_handlers[format_name] = handler

def register_sql_datatype_handler(
self,
sql_datatype: str,
handler: JSONtoSQLHandler,
) -> None:
"""Register a custom x-sql-datatype handler.
Args:
sql_datatype: The x-sql-datatype string.
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
"""
self._sql_datatype_mapping[sql_datatype] = handler

def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: # noqa: ARG002, PLR6301
"""Handle multiple types by returning a VARCHAR.
Expand Down Expand Up @@ -374,10 +394,14 @@ def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
Returns:
SQL type if one can be determined, None otherwise.
"""
# Check if this is a string with format first
if schema.get("type") == "string" and "format" in schema:
format_type = self._handle_format(schema)
if format_type is not None:
# Check x-sql-datatype first
if x_sql_datatype := schema.get("x-sql-datatype"): # noqa: SIM102
if handler := self._sql_datatype_mapping.get(x_sql_datatype):
return self._invoke_handler(handler, schema)

# Check if this is a string with format then
if schema.get("type") == "string" and "format" in schema: # noqa: SIM102
if (format_type := self._handle_format(schema)) is not None:
return format_type

# Then check regular types
Expand Down
7 changes: 7 additions & 0 deletions tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ def handle_raw_string(self, schema):
result = json_schema_to_sql.to_sql_type(image_type)
assert isinstance(result, sa.types.LargeBinary)

def test_annotation_sql_datatype(self):
json_schema_to_sql = JSONSchemaToSQL()
json_schema_to_sql.register_sql_datatype_handler("json", sa.types.JSON)
jsonschema_type = {"type": ["string"], "x-sql-datatype": "json"}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.JSON)


def test_bench_discovery(benchmark, tmp_path: Path):
def _discover_catalog(connector):
Expand Down

0 comments on commit 9adb647

Please sign in to comment.