-
Notifications
You must be signed in to change notification settings - Fork 0
GCP database API 예제
jaeseok.an edited this page Nov 19, 2021
·
2 revisions
- db.collection(컬렉션이름).document(문서키).collection(컬렉션명)
- db.collection("company").document("cxxxxxxx").collection("source_meta")
- stream으로 문서 쿼리
- docs = 컬렉션.where(조건).limit(문서수).stream()
- 문서를 list로 저장
for doc in docs: print(f'Document data: {doc.to_dict()}') return doc.to_dict()
- example
class CompanyService:
COMPANY_COLLECTION = db.collection(ROOT_COLLECTION)
@staticmethod
def get(id, name):
docs = CompanyService.COMPANY_COLLECTION. \
where('id', '==', id). \
where('name', '==', name).limit(1).stream()
for doc in docs:
print(f'Document data: {doc.to_dict()}')
return doc.to_dict()
print(u'No such document!')
return None
def get_source_metas_ref(customer_code):
return CompanyService.COMPANY_COLLECTION.document(customer_code).collection(SOURCE_META_COLLECTION)
def get_all_history_by_ingestion(customer_code, source_type, source_id, ingestion_id, table_name) -> QueryJob:
try:
client = BigQueryClient.instance().get_client()
query = f"""
SELECT *
FROM `{table_name}`
WHERE customer_code=@customer_code
{"AND source_type=@source_type" if source_type else ""}
{"AND source_id=@source_id" if source_id else ""}
AND ingestion_id=@ingestion_id
"""
query_params = [
bigquery.ScalarQueryParameter("customer_code", "STRING", customer_code),
bigquery.ScalarQueryParameter("ingestion_id", "STRING",
convert_ingestion_id_to_timestamp_str(ingestion_id))
]
if source_type:
query_params.append(bigquery.ScalarQueryParameter("source_type", "STRING", source_type))
if source_id:
query_params.append(bigquery.ScalarQueryParameter("source_id", "STRING", source_id))
job_config = bigquery.QueryJobConfig(
query_parameters=query_params
)
return client.query(query, job_config=job_config)
except Exception as e:
raise response_exc(ServerError(f"Encountered errors while execute query: {e}"))
test