Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sichen committed Oct 16, 2024
1 parent 8f92b10 commit 11bf9d9
Show file tree
Hide file tree
Showing 11 changed files with 545 additions and 7 deletions.
9 changes: 6 additions & 3 deletions src/snowflake/cli/_plugins/spcs/image_registry/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ def get_registry_url(self) -> str:
if len(results) == 0:
raise NoImageRepositoriesFoundError()
sample_repository_url = results[0]["repository_url"]
if not self._has_url_scheme(sample_repository_url):
sample_repository_url = f"//{sample_repository_url}"
return urlparse(sample_repository_url).netloc
return self.get_registry_url_from_repo(sample_repository_url)

def get_registry_url_from_repo(self, repo_url) -> str:
if not self._has_url_scheme(repo_url):
repo_url = f"//{repo_url}"
return urlparse(repo_url).netloc

def docker_registry_login(self) -> str:
registry_url = self.get_registry_url()
Expand Down
127 changes: 127 additions & 0 deletions src/snowflake/cli/_plugins/spcs/image_registry/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import datetime
import json
import os
import subprocess
import uuid

import docker

TARGET_ARCH = "amd64"
DOCKER_BUILDER = "snowflake-cli-builder"


class Registry:
def __init__(self, registry_url, logger) -> None:
self._registry_url = registry_url
self._logger = logger
self._docker_client = docker.from_env(timeout=300)
self._is_arm = self._is_arch_arm()
if self._is_arm:
if os.system(f"docker buildx use {DOCKER_BUILDER}") != 0:
os.system(f"docker buildx create --name {DOCKER_BUILDER} --use")

def _is_arch_arm(self):
result = subprocess.run(["uname", "-m"], stdout=subprocess.PIPE)
arch = result.stdout.strip().decode("UTF-8")
self._logger.info("Detected machine architecture: %s", arch)
return arch == "arm64" or arch == "aarch64"

def _raise_error_from_output(self, output: str):
for line in output.splitlines():
try:
jsline = json.loads(line)
if "error" in jsline:
raise docker.errors.APIError(jsline["error"])
except json.JSONDecodeError:
pass # not a json, don't parse, assume no error

def _gen_image_tag(self) -> str:
ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
uid = str(uuid.uuid4()).split("-")[0]
return f"{ts}-{uid}"

def push(self, image_name):
self._logger.info("Uploading image %s", image_name)
output = self._docker_client.images.push(image_name)
self._raise_error_from_output(output)
return output

def pull(self, image_name):
self._logger.info("Pulling image %s", image_name)

n = image_name.rindex(":")
if n >= 0 and "/" not in image_name[n + 1 :]:
# if ':' is present in the last part in image name (separated by '/')
image = image_name[0:n]
tag = image_name[n + 1 :]
else:
image = image_name
tag = None
return self._docker_client.images.pull(image, tag, platform=TARGET_ARCH)

def build_and_push_image(
self,
image_source_local_path: str,
image_path: str,
tag: str = "",
generate_tag: bool = False,
):
"""
builds an image and push it to sf image registry
"""

docker_file_path = os.path.join(image_source_local_path, "Dockerfile")

if not tag and generate_tag:
tag = self._gen_image_tag()

# build and upload image to registry if running remotely
self._logger.info("registry: %s", self._registry_url)
tagged = self._registry_url + image_path
if tag is not None:
tagged = f"{tagged}:{tag}"

if self._is_arm:
self._logger.info("Using docker buildx for building image %s", tagged)

# emulate intel environment on arm - see https://github.com/docker/buildx/issues/464
# os.system(
# "docker run -it --rm --privileged tonistiigi/binfmt --install all"
# )

docker_build_cmd = f"""
docker buildx build --tag {tagged}
--load
--platform linux/amd64
{image_source_local_path}
-f {docker_file_path}
--builder {DOCKER_BUILDER}
--rm
"""

parts = list(
filter(
lambda part: part != "",
[part.strip() for part in docker_build_cmd.split("\n")],
)
)
docker_cmd = " ".join(parts)
self._logger.info("Executing: %s", docker_cmd)
if 0 != os.system(docker_cmd):
assert False, f"failed : unable to build image {tagged} with buildx"

push_output = self.push(tagged)
self._logger.info(push_output)
else:
# build and upload image to registry if running remotely
self._logger.info("Building image %s with docker python sdk", tagged)
_, output = self._docker_client.images.build(
path=image_source_local_path,
dockerfile=docker_file_path,
rm=True,
tag=tagged,
)
for o in output:
self._logger.info(o)
push_output = self.push(tagged)
self._logger.info(push_output)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from urllib.parse import urlparse

from snowflake.cli._plugins.spcs.common import handle_object_already_exists
from snowflake.cli._plugins.spcs.image_registry.registry import Registry
from snowflake.cli.api.constants import ObjectType
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.sql_execution import SqlExecutionMixin
Expand All @@ -32,7 +33,6 @@ def get_role(self):
return self._conn.role

def get_repository_url(self, repo_name: str, with_scheme: bool = True):

repo_row = self.show_specific_object(
"image repositories", repo_name, check_schema=True
)
Expand Down
32 changes: 31 additions & 1 deletion src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import typer
from click import ClickException

from snowflake.cli._plugins.object.command_aliases import (
add_object_command_aliases,
scope_option,
Expand All @@ -30,6 +31,9 @@
validate_and_set_instances,
)
from snowflake.cli._plugins.spcs.services.manager import ServiceManager
from snowflake.cli._plugins.spcs.services.spcs_processor import SpcsProcessor
from snowflake.cli.api.cli_global_context import get_cli_context
from snowflake.cli.api.commands.decorators import with_project_definition
from snowflake.cli.api.commands.flags import (
IfNotExistsOption,
OverrideableOption,
Expand All @@ -41,6 +45,7 @@
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CommandResult,
MessageResult,
QueryJsonValueResult,
QueryResult,
SingleQueryResult,
Expand Down Expand Up @@ -200,7 +205,16 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult:
Retrieves the status of a service.
"""
cursor = ServiceManager().status(service_name=name.identifier)
return QueryJsonValueResult(cursor)
return SingleQueryResult(cursor)


@app.command(requires_connection=True)
def container_status(name: FQN = ServiceNameArgument, **options) -> CommandResult:
"""
Retrieves the container status of a service.
"""
cursor = ServiceManager().container_status(service_name=name.identifier)
return QueryResult(cursor)


@app.command(requires_connection=True)
Expand Down Expand Up @@ -343,3 +357,19 @@ def unset_property(
comment=comment,
)
return SingleQueryResult(cursor)


@app.command("deploy", requires_connection=True)
@with_project_definition()
def service_deploy(
**options,
) -> CommandResult:
"""
Deploys the service in the current schema or creates a new service if it does not exist.
"""
cli_context = get_cli_context()
processor = SpcsProcessor(
project_definition=cli_context.project_definition.spcs,
project_root=cli_context.project_root,
)
return SingleQueryResult(processor.deploy())
9 changes: 7 additions & 2 deletions src/snowflake/cli/_plugins/spcs/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from snowflake.connector.cursor import SnowflakeCursor
from snowflake.connector.errors import ProgrammingError

from snowflake.cli.api.project.schemas.v1.spcs.service import Service


class ServiceManager(SqlExecutionMixin):
def create(
Expand Down Expand Up @@ -74,7 +76,7 @@ def create(
query.append(f"QUERY_WAREHOUSE = {query_warehouse}")

if comment:
query.append(f"COMMENT = {comment}")
query.append(f"COMMENT = $${comment}$$")

if tags:
tag_list = ",".join(f"{t.name}={t.value_string_literal()}" for t in tags)
Expand Down Expand Up @@ -130,7 +132,10 @@ def _read_yaml(self, path: Path) -> str:
return json.dumps(data)

def status(self, service_name: str) -> SnowflakeCursor:
return self._execute_query(f"CALL SYSTEM$GET_SERVICE_STATUS('{service_name}')")
return self._execute_query(f"DESC SERVICE {service_name}")

def container_status(self, service_name: str) -> SnowflakeCursor:
return self._execute_query(f"SHOW SERVICE CONTAINERS IN SERVICE {service_name}")

def logs(
self, service_name: str, instance_id: str, container_name: str, num_lines: int
Expand Down
122 changes: 122 additions & 0 deletions src/snowflake/cli/_plugins/spcs/services/project_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright (c) 2024 Snowflake Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from functools import cached_property
from pathlib import Path
from typing import List

from snowflake.cli._plugins.nativeapp.artifacts import resolve_without_follow
from snowflake.cli.api.project.schemas.v1.native_app.path_mapping import PathMapping
from snowflake.cli.api.project.schemas.v1.spcs.service import Service
from snowflake.cli.api.project.util import (
to_identifier,
)


class ServiceProjectModel:
def __init__(
self,
project_definition: Service,
project_root: Path,
):
self._project_definition = project_definition
self._project_root = resolve_without_follow(project_root)

@property
def project_root(self) -> Path:
return self._project_root

@property
def definition(self) -> Service:
return self._project_definition

@cached_property
def service_name(self) -> str:
return self._project_definition.name

@cached_property
def spec(self) -> PathMapping:
return self.definition.spec

@cached_property
def images(self) -> List[PathMapping]:
return self.definition.images

@cached_property
def image_sources(self) -> List[PathMapping]:
source_image_paths = []
for image in self.images:
source_image_paths.append(PathMapping(src=image.src))
return source_image_paths

@cached_property
def source_repo_path(self) -> str:
return self.definition.source_repo

@cached_property
def source_repo_fqn(self) -> str:
repo_path = self.definition.source_repo
return repo_path.strip("/").replace("/", ".")

@cached_property
def source_stage_fqn(self) -> str:
return self.definition.source_stage

@cached_property
def bundle_root(self) -> Path:
return self.project_root / self.definition.bundle_root

@cached_property
def deploy_root(self) -> Path:
return self.project_root / self.definition.deploy_root

@cached_property
def generated_root(self) -> Path:
return self.deploy_root / self.definition.generated_root

@cached_property
def project_identifier(self) -> str:
return to_identifier(self.definition.name)

@cached_property
def query_warehouse(self) -> str:
return to_identifier(self.definition.query_warehouse)

@cached_property
def compute_pool(self) -> str:
return to_identifier(self.definition.compute_pool)

@cached_property
def min_instances(self) -> int:
return self.definition.min_instances

@cached_property
def max_instances(self) -> int:
return self.definition.max_instances

@cached_property
def comment(self) -> str:
return self.definition.comment

# def get_bundle_context(self) -> BundleContext:
# return BundleContext(
# package_name=self.package_name,
# artifacts=self.artifacts,
# project_root=self.project_root,
# bundle_root=self.bundle_root,
# deploy_root=self.deploy_root,
# generated_root=self.generated_root,
# )
Loading

0 comments on commit 11bf9d9

Please sign in to comment.