Skip to content

Commit

Permalink
Merge branch 'google-drive-retrieve-permissions-for-users-with-read-o…
Browse files Browse the repository at this point in the history
…nly-access' of github.com:elastic/connectors-python into google-drive-retrieve-permissions-for-users-with-read-only-access
  • Loading branch information
jedrazb committed Jan 5, 2024
2 parents df66bca + 76c341d commit 911516e
Show file tree
Hide file tree
Showing 18 changed files with 532 additions and 348 deletions.
2 changes: 1 addition & 1 deletion connectors/cli/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import yaml
from elasticsearch import ApiError

from connectors.es.client import ESManagementClient
from connectors.es.management_client import ESManagementClient

CONFIG_FILE_PATH = ".cli/config.yml"

Expand Down
22 changes: 5 additions & 17 deletions connectors/cli/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from collections import OrderedDict

from connectors.es.client import ESManagementClient
from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings
from connectors.es.management_client import ESManagementClient
from connectors.es.settings import DEFAULT_LANGUAGE
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
Expand Down Expand Up @@ -86,20 +86,6 @@ async def __create(
finally:
await self.es_management_client.close()

async def __create_search_index(self, index_name, language):
mappings = Mappings.default_text_fields_mappings(
is_connectors_index=True,
)

settings = Settings(language_code=language, analysis_icu=False).to_hash()

settings["auto_expand_replicas"] = "0-3"
settings["number_of_shards"] = 2

await self.es_management_client.client.indices.create(
index=index_name, mappings=mappings, settings=settings
)

async def __create_connector(
self, index_name, service_type, configuration, is_native, language, from_index
):
Expand All @@ -110,7 +96,9 @@ async def __create_connector(
timestamp = iso_utc()

if not from_index:
await self.__create_search_index(index_name, language)
await self.es_management_client.create_content_index(
index_name, language
)

api_key = await self.__create_api_key(index_name)

Expand Down
2 changes: 1 addition & 1 deletion connectors/cli/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from elasticsearch import ApiError

from connectors.es.client import ESManagementClient
from connectors.es.management_client import ESManagementClient
from connectors.protocol import ConnectorIndex


Expand Down
6 changes: 3 additions & 3 deletions connectors/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from elasticsearch import ApiError

from connectors.es.client import ESManagementClient
from connectors.es.management_client import ESManagementClient
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
Expand Down Expand Up @@ -37,7 +37,7 @@ def job(self, job_id):
async def __async_job(self, job_id):
try:
await self.es_management_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX],
)
job = await self.sync_job_index.fetch_by_id(job_id)
return job
Expand All @@ -63,7 +63,7 @@ async def __async_start(self, connector_id, job_type):
async def __async_list_jobs(self, connector_id, index_name, job_id):
try:
await self.es_management_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX],
)
jobs = self.sync_job_index.get_all_docs(
query=self.__job_list_query(connector_id, index_name, job_id),
Expand Down
48 changes: 3 additions & 45 deletions connectors/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

from elastic_transport.client_utils import url_to_node_config
from elasticsearch import ApiError, AsyncElasticsearch, ConflictError
from elasticsearch import ConnectionError as ElasticConnectionError
from elasticsearch import (
ConnectionError as ElasticConnectionError,
)

from connectors import __version__
from connectors.logger import logger, set_extra_logger
Expand Down Expand Up @@ -161,50 +163,6 @@ async def wait(self):
return False


class ESManagementClient(ESClient):
"""
Elasticsearch client with methods to manage connector-related indices.
Additionally to regular methods of ESClient, this client provides methods to work with arbitrary indices,
for example allowing to list indices, delete indices, wipe data from indices and such.
ESClient should be used to provide rich clients that operate on "domains", such as:
- specific connector
- specific job
This client, on the contrary, is used to manage a number of indices outside of connector protocol operations.
"""

def __init__(self, config):
logger.debug(f"ESManagementClient connecting to {config['host']}")
# initialize ESIndex instance
super().__init__(config)

async def ensure_exists(self, indices=None):
if indices is None:
indices = []

for index in indices:
logger.debug(f"Checking index {index}")
if not await self.client.indices.exists(index=index):
await self.client.indices.create(index=index)
logger.debug(f"Created index {index}")

async def delete_indices(self, indices):
await self.client.indices.delete(index=indices, ignore_unavailable=True)

async def clean_index(self, index_name):
return await self.client.delete_by_query(
index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True
)

async def list_indices(self):
return await self.client.indices.stats(index="search-*")

async def index_exists(self, index_name):
return await self.client.indices.exists(index=index_name)


def with_concurrency_control(retries=3):
def wrapper(func):
@functools.wraps(func)
Expand Down
136 changes: 136 additions & 0 deletions connectors/es/management_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

from elasticsearch import (
NotFoundError as ElasticNotFoundError,
)
from elasticsearch.helpers import async_scan

from connectors.es.client import ESClient
from connectors.es.settings import TIMESTAMP_FIELD, Mappings, Settings
from connectors.logger import logger


class ESManagementClient(ESClient):
"""
Elasticsearch client with methods to manage connector-related indices.
Additionally to regular methods of ESClient, this client provides methods to work with arbitrary indices,
for example allowing to list indices, delete indices, wipe data from indices and such.
ESClient should be used to provide rich clients that operate on "domains", such as:
- specific connector
- specific job
This client, on the contrary, is used to manage a number of indices outside of connector protocol operations.
"""

def __init__(self, config):
logger.debug(f"ESManagementClient connecting to {config['host']}")
# initialize ESIndex instance
super().__init__(config)

async def ensure_exists(self, indices=None):
if indices is None:
indices = []

for index in indices:
logger.debug(f"Checking index {index}")
if not await self.client.indices.exists(index=index):
await self.client.indices.create(index=index)
logger.debug(f"Created index {index}")

async def create_content_index(self, search_index_name, language_code):
settings = Settings(language_code=language_code, analysis_icu=False).to_hash()
mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

return await self.client.indices.create(
index=search_index_name, mappings=mappings, settings=settings
)

async def ensure_content_index_mappings(self, index, mappings):
# open = Match open, non-hidden indices. Also matches any non-hidden data stream.
# Content indices are always non-hidden.
response = await self.client.indices.get_mapping(index=index)

existing_mappings = response[index].get("mappings", {})
if len(existing_mappings) == 0:
if mappings:
logger.debug(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
await self.client.indices.put_mapping(
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
)
logger.debug("Successfully added mappings for index %s", index)
else:
logger.debug(
"Index %s has no mappings but no mappings are provided, skipping mappings creation"
)
else:
logger.debug(
"Index %s already has mappings, skipping mappings creation", index
)

async def ensure_ingest_pipeline_exists(
self, pipeline_id, version, description, processors
):
try:
await self.client.ingest.get_pipeline(id=pipeline_id)
except ElasticNotFoundError:
await self.client.ingest.put_pipeline(
id=pipeline_id,
version=version,
description=description,
processors=processors,
)

async def delete_indices(self, indices):
await self.client.indices.delete(index=indices, ignore_unavailable=True)

async def clean_index(self, index_name):
return await self.client.delete_by_query(
index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True
)

async def list_indices(self):
return await self.client.indices.stats(index="search-*")

async def index_exists(self, index_name):
return await self.client.indices.exists(index=index_name)

async def upsert(self, _id, index_name, doc):
await self.client.index(
id=_id,
index=index_name,
document=doc,
)

async def yield_existing_documents_metadata(self, index):
"""Returns an iterator on the `id` and `_timestamp` fields of all documents in an index.
WARNING
This function will load all ids in memory -- on very large indices,
depending on the id length, it can be quite large.
300,000 ids will be around 50MiB
"""
logger.debug(f"Scanning existing index {index}")
if not await self.index_exists(index):
return

async for doc in async_scan(
client=self.client, index=index, _source=["id", TIMESTAMP_FIELD]
):
source = doc["_source"]
doc_id = source.get("id", doc["_id"])
timestamp = source.get(TIMESTAMP_FIELD)

yield doc_id, timestamp
10 changes: 10 additions & 0 deletions connectors/es/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import yaml

TIMESTAMP_FIELD = "_timestamp"

ENUM_IGNORE_ABOVE = 2048

DATE_FIELD_MAPPING = {"type": "date"}
Expand Down Expand Up @@ -275,6 +277,14 @@ def analyzer_definitions(self):

return definitions

@property
def auto_expand_replicas(self):
return "0-1"

@property
def number_of_shards(self):
return 2

def __init__(self, *, language_code=None, analysis_icu=False):
self._language_data = None
self.language_code = language_code or DEFAULT_LANGUAGE
Expand Down
Loading

0 comments on commit 911516e

Please sign in to comment.