Skip to content

Commit

Permalink
Merge pull request #264 from bjwswang/dp
Browse files Browse the repository at this point in the history
feat: add kubeenv for data processing
  • Loading branch information
bjwswang authored Nov 22, 2023
2 parents 26373e6 + a821d92 commit a751edf
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 21 deletions.
7 changes: 4 additions & 3 deletions data-processing/data_manipulation/db/data_process_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
# 1) 基本功能实现
###

import ulid
from datetime import datetime

import ujson
import ulid
from sanic.response import json

from datetime import datetime
from utils import pg_utils
from sanic.response import json


async def list_by_page(request, opt={}):
Expand Down
4 changes: 2 additions & 2 deletions data-processing/data_manipulation/file_handle/csv_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
# 1) 基本功能实现
###

import asyncio
import csv
import logging
import os

import asyncio

import pandas as pd
import ulid

from transform.text import clean_transform, privacy_transform
from utils import date_time_utils, file_utils

Expand Down
11 changes: 5 additions & 6 deletions data-processing/data_manipulation/file_handle/pdf_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@

import logging
import os
import pandas as pd

from common import config
from file_handle import csv_handle
import pandas as pd
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import SpacyTextSplitter
from pypdf import PdfReader
from transform.text import clean_transform, privacy_transform, QA_transform
from utils import file_utils


from common import config
from file_handle import csv_handle
from transform.text import QA_transform, clean_transform, privacy_transform
from utils import file_utils

logger = logging.getLogger('pdf_handle')

Expand Down
60 changes: 60 additions & 0 deletions data-processing/data_manipulation/kube/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os

from custom_resources import (arcadia_resource_datasets,
arcadia_resource_datasources,
arcadia_resource_versioneddatasets)
from kubernetes import client, config
from kubernetes.client import CustomObjectsApi


class NamespacedName:
def __init__(self, namespace, name):
self.namespace = namespace
self.name = name

def get_namespace(self):
return self.namespace

def get_name(self):
return self.name


class KubeEnv:
def __init__(self):
self.pod_namespace = os.environ.get('POD_NAMESPACE')
self.kubeconfig_path = os.environ.get('KUBECONFIG')
if self.kubeconfig_path:
print("load kubeconfig from ", self.kubeconfig_path)
config.load_kube_config(self.kubeconfig_path)
else:
try:
print("try load kubeconfig from incluster config")
config.load_incluster_config()
except config.ConfigException:
raise RuntimeError(
"Failed to load incluster config. Make sure the code is running inside a Kubernetes cluster.")

def list_datasources(self, namespace: str, **kwargs):
return CustomObjectsApi().list_namespaced_custom_object(
arcadia_resource_datasources.get_group(),
arcadia_resource_datasources.get_version(),
namespace,
arcadia_resource_datasources.get_name(),
**kwargs
)

def list_datasets(self, namespace: str, **kwargs):
return CustomObjectsApi().list_namespaced_custom_object(
arcadia_resource_datasets.get_group(),
arcadia_resource_datasets.get_version(),
namespace, arcadia_resource_datasets.get_name(),
**kwargs
)

def list_versioneddatasets(self, namespace: str, **kwargs):
return CustomObjectsApi().list_namespaced_custom_object(
arcadia_resource_versioneddatasets.get_group(),
arcadia_resource_versioneddatasets.get_version(),
namespace, arcadia_resource_versioneddatasets.get_name(),
**kwargs
)
30 changes: 30 additions & 0 deletions data-processing/data_manipulation/kube/custom_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class GroupVersion:
def __init__(self, name, version):
self.name = name
self.version = version


class CustomResource:
def __init__(self, group_version, name):
self.group_version = group_version
self.name = name

def get_group(self):
return self.group_version.name

def get_version(self):
return self.group_version.version

def get_name(self):
return self.name


# Arcadia
arcadia_group = GroupVersion("arcadia.kubeagi.k8s.com.cn", "v1alpha1")
# CRD Datasource
arcadia_resource_datasources = CustomResource(arcadia_group, "datasources")
# CRD Dataset
arcadia_resource_datasets = CustomResource(arcadia_group, "datasets")
# CRD Versioneddataset
arcadia_resource_versioneddatasets = CustomResource(
arcadia_group, "versioneddatasets")
21 changes: 17 additions & 4 deletions data-processing/data_manipulation/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@

import asyncio
import logging
import psycopg2

from common import config
import psycopg2
from sanic import Sanic
from sanic.response import json
from sanic_cors import CORS
from service import minio_store_process_service, data_process_service

from common import config
from kube import client
from service import data_process_service, minio_store_process_service
from transform.text import support_type
from utils import log_utils

###
# Initialize kubernetes client
###
kube = client.KubeEnv()
# have a try!
# print(kube.list_versioneddatasets("arcadia"))


###
# 初始化日志配置
###
Expand All @@ -55,6 +65,7 @@
app.config['RESPONSE_TIMEOUT'] = 60 * 60 * 60
app.config['KEEP_ALIVE_TIMEOUT'] = 60 * 60 * 60


@app.listener('before_server_start')
async def init_web_server(app, loop):
app.config['conn'] = get_connection()
Expand Down Expand Up @@ -220,7 +231,8 @@ def get_connection():
:param database:
:return:
'''
conn = psycopg2.connect(database=config.pg_database, user=config.pg_user, password=config.pg_password, host=config.pg_host, port=config.pg_port)
conn = psycopg2.connect(database=config.pg_database, user=config.pg_user,
password=config.pg_password, host=config.pg_host, port=config.pg_port)

# while True:
# cur = conn.cursor()
Expand All @@ -230,6 +242,7 @@ def get_connection():

return conn


if __name__ == '__main__':
app.run(host='0.0.0.0',
port=28888,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@

import asyncio
import logging

import ulid
from sanic.response import json

from db import data_process_task
from sanic.response import json
from service import minio_store_process_service

logger = logging.getLogger('data_process_service')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import io
import logging
import os
import pandas as pd

from db import data_process_task
from file_handle import csv_handle, pdf_handle
import pandas as pd
from minio import Minio
from minio.commonconfig import Tags
from minio.error import S3Error
from sanic.response import json, raw

from db import data_process_task
from file_handle import csv_handle, pdf_handle
from utils import file_utils, minio_utils

logger = logging.getLogger('minio_store_process_service')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re

import zhipuai

from common import config

###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
###

import re

from common import special_characters


###
# 去除不可见字符
# @author: wangxinbiao
Expand Down
2 changes: 1 addition & 1 deletion data-processing/data_manipulation/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import datetime
import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler


Expand Down
3 changes: 2 additions & 1 deletion data-processing/data_manipulation/utils/minio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
# MinIO
###

from common import config
from minio import Minio

from common import config


async def create_client():
return Minio(
Expand Down
1 change: 1 addition & 0 deletions data-processing/data_manipulation/utils/pg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import psycopg2.extras


async def execute_sql(conn,sql,record_to_select):
'''
执行sql语句
Expand Down

0 comments on commit a751edf

Please sign in to comment.