Skip to content

Commit

Permalink
Merge pull request #4157 from hove-io/enrich_addresses/cache
Browse files Browse the repository at this point in the history
[tyr worker] enrich-ntfs-with-addresses : use previous ntfs as cache
  • Loading branch information
datanel authored Oct 30, 2023
2 parents 7525558 + 14c0727 commit 4cc64b6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docker/debian8/Dockerfile-tyr-worker
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM navitia/master

# Install binary enrich-ntfs-with-addresses from tartare-tools
ENV TARTARE_TOOLS_VERSION="v0.36.4"
ENV TARTARE_TOOLS_VERSION="v0.37.0"
ARG GITHUB_TOKEN
RUN git config --global url."https://x-access-token:${GITHUB_TOKEN}@github.com/hove-io/".insteadOf "ssh://[email protected]/hove-io/"
RUN git clone -b ${TARTARE_TOOLS_VERSION} --depth 1 https://x-access-token:${GITHUB_TOKEN}@github.com/hove-io/tartare-tools
Expand Down
22 changes: 19 additions & 3 deletions source/tyr/tyr/binarisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,11 +1125,11 @@ def poi2mimir(self, instance_name, input, autocomplete_version, job_id=None, dat
@celery.task(bind=True)
def fusio2s3(self, instance_config, filename, job_id, dataset_uid):
"""Zip fusio file and launch fusio2s3"""
filename = enrich_ntfs_with_addresses(filename, job_id, dataset_uid)
filename = enrich_ntfs_with_addresses("fusio", instance_config, filename, job_id, dataset_uid)
_inner_2s3(self, "fusio", instance_config, filename, job_id, dataset_uid)


def enrich_ntfs_with_addresses(filename, job_id, dataset_uid):
def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id, dataset_uid):
"""launch enrich-ntfs-with-addresses"""

job = models.Job.query.get(job_id)
Expand All @@ -1141,10 +1141,23 @@ def enrich_ntfs_with_addresses(filename, job_id, dataset_uid):

file_dir = os.path.dirname(filename)
file_basename = os.path.basename(filename)
output_dir = file_dir + "/enriched_with_addresses"
output_dir = file_dir + "/for_loki"
os.makedirs(output_dir, 0o755)
output = output_dir + "/" + file_basename

previous_ntfs_path = output_dir + "/previous_ntfs.zip"

file_key = "{coverage}/{dataset_type}.zip".format(coverage=instance_config.name, dataset_type=dataset_type)

use_previous_ntfs = True

try:
minio_wrapper = MinioWrapper()
minio_wrapper.get_file(file_key, previous_ntfs_path)
except:
logger.warning("no previous ntfs found")
use_previous_ntfs = False

try:
params = [
"--input",
Expand All @@ -1155,6 +1168,9 @@ def enrich_ntfs_with_addresses(filename, job_id, dataset_uid):
current_app.config['BRAGI_URL'],
]

if use_previous_ntfs:
params.extend(["--previous-ntfs", previous_ntfs_path])

res = None
with collect_metric("enrich-ntfs-with-addresses", job, dataset_uid):
res = launch_exec("enrich-ntfs-with-addresses", params, logger)
Expand Down
11 changes: 11 additions & 0 deletions source/tyr/tyr/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ def upload_file(
)
client.fput_object(self.bucket_name, file_key, filename, metadata=metadata, content_type=content_type)

def get_file(self, object_name, file_path):
if self.use_iam_provider:
self.retrieve_credentials()
client = Minio(
endpoint=self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
session_token=self.session_token,
)
return client.fget_object(self.bucket_name, object_name, file_path)

def retrieve_credentials(self):
"""Retrieve credentials from ECS IAM Role"""

Expand Down

0 comments on commit 4cc64b6

Please sign in to comment.