From 572cb904cf8ea27f1ef699b45cfb683d062f1a59 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 29 Sep 2023 17:02:15 +0200 Subject: [PATCH] feat: Enable downloading sandbox from S3 --- src/DIRAC/FrameworkSystem/Utilities/diracx.py | 76 +++++++++++++++++++ src/DIRAC/Resources/Storage/StorageBase.py | 2 +- .../Service/SandboxStoreHandler.py | 21 +++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 src/DIRAC/FrameworkSystem/Utilities/diracx.py diff --git a/src/DIRAC/FrameworkSystem/Utilities/diracx.py b/src/DIRAC/FrameworkSystem/Utilities/diracx.py new file mode 100644 index 00000000000..0bcbe129931 --- /dev/null +++ b/src/DIRAC/FrameworkSystem/Utilities/diracx.py @@ -0,0 +1,76 @@ +#### FATORIZE WITH PROXY MANAGER + +import requests + +from cachetools import TTLCache, cached +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any +from DIRAC import gConfig, S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers import Registry + + +from diracx.core.preferences import DiracxPreferences + +from diracx.core.utils import write_credentials + +from diracx.core.models import TokenResponse +from diracx.client import DiracClient + +# How long tokens are kept +DEFAULT_TOKEN_CACHE_TTL = 5 * 60 + +# Add a cache not to query the token all the time +_token_cache = TTLCache(maxsize=100, ttl=DEFAULT_TOKEN_CACHE_TTL) + + +@cached(_token_cache, key=lambda x, y: repr(x)) +def _get_token(credDict, diracxUrl, /) -> Path: + """ + Write token to a temporary file and return the path to that file + + """ + + apiKey = gConfig.getValue("/DiracX/LegacyExchangeApiKey") + if not apiKey: + raise ValueError("Missing mandatory /DiracX/LegacyExchangeApiKey configuration") + + vo = Registry.getVOForGroup(credDict["group"]) + dirac_properties = list(set(credDict.get("groupProperties", [])) | set(credDict.get("properties", []))) + group = credDict["group"] + scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties] + + r = requests.get( + f"{diracxUrl}/auth/legacy-exchange", + params={ + "preferred_username": credDict["username"], + "scope": " ".join(scopes), + }, + headers={"Authorization": f"Bearer {apiKey}"}, + timeout=10, + ) + + r.raise_for_status() + + token_location = Path(NamedTemporaryFile().name) + + write_credentials(TokenResponse(**r.json()), location=token_location) + + return token_location + + +def TheImpersonator(credDict: dict[str, Any]) -> DiracClient: + """ + Client to be used by DIRAC server needing to impersonate + a user for diracx. + It queries a token, places it in a file, and returns the `DiracClient` + class + + Use as decorator + """ + diracxUrl = gConfig.getValue("/DiracX/URL") + + token_location = _get_token(credDict, diracxUrl) + pref = DiracxPreferences(url=diracxUrl, credentials_path=token_location) + + return DiracClient(diracx_preferences=pref) diff --git a/src/DIRAC/Resources/Storage/StorageBase.py b/src/DIRAC/Resources/Storage/StorageBase.py index 0aef091039b..1fd321ef2b5 100755 --- a/src/DIRAC/Resources/Storage/StorageBase.py +++ b/src/DIRAC/Resources/Storage/StorageBase.py @@ -338,7 +338,7 @@ def constructURLFromLFN(self, lfn, withWSUrl=False): voLFN = lfnSplitList[1] # TODO comparison to Sandbox below is for backward compatibility, should # be removed in the next release - if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox": + if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox" and voLFN != "S3": return S_ERROR(f"LFN ({lfn}) path must start with VO name ({self.se.vo})") urlDict = dict(self.protocolParameters) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py index 8f20b96ad33..67d20fa7ae7 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py @@ -9,6 +9,7 @@ """ import hashlib import os +import requests import tempfile import threading import time @@ -26,6 +27,7 @@ from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.Core.Utilities.File import getGlobbedTotalSize +from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator class SandboxStoreHandlerMixin: @@ -431,6 +433,25 @@ def _sendToClient(self, fileID, token, fileHelper=None, raw=False): credDict = self.getRemoteCredentials() serviceURL = self.serviceInfoDict["URL"] filePath = fileID.replace(serviceURL, "") + + # If the PFN starts with S3, we know it has been uploaded to the + # S3 sandbox store, so download it from there before sending it + if filePath.startswith("/S3"): + with TheImpersonator(credDict) as client: + res = client.jobs.get_sandbox_file(filePath) + r = requests.get(res.url) + r.raise_for_status() + sbData = r.content + if fileHelper: + from io import BytesIO + + result = fileHelper.FDToNetwork(BytesIO(sbData)) + fileHelper.oFile.close() + return result + if raw: + return sbData + return S_OK(sbData) + result = self.sandboxDB.getSandboxId(self.__localSEName, filePath, credDict["username"], credDict["group"]) if not result["OK"]: return result