From 9dea326250e7bb7968114c69ff9c3c789d51828a Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Mon, 8 Jul 2024 13:09:04 +0200 Subject: [PATCH] feat: fetch & store metadata for blobs --- .../io/kestra/storage/gcs/GcsFileAttributes.java | 6 ++++++ .../java/io/kestra/storage/gcs/GcsStorage.java | 16 +++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java b/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java index e64f777..d211b1c 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java +++ b/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java @@ -7,6 +7,7 @@ import java.time.Instant; import java.time.OffsetDateTime; +import java.util.Map; import java.util.Optional; @Value @@ -45,4 +46,9 @@ public FileType getType() { public long getSize() { return blobInfo.getSize(); } + + @Override + public Map getMetadata() { + return blobInfo.getMetadata(); + } } diff --git a/src/main/java/io/kestra/storage/gcs/GcsStorage.java b/src/main/java/io/kestra/storage/gcs/GcsStorage.java index 9464698..0952bbc 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsStorage.java +++ b/src/main/java/io/kestra/storage/gcs/GcsStorage.java @@ -20,6 +20,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.kestra.core.storages.StorageObject; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -104,6 +105,11 @@ private void parentTraversalGuard(URI uri) { @Override public InputStream get(String tenantId, URI uri) throws IOException { + return getWithMetadata(tenantId, uri).inputStream(); + } + + @Override + public StorageObject getWithMetadata(String tenantId, URI uri) throws IOException { try { Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath()))); @@ -112,7 +118,7 @@ public InputStream get(String tenantId, URI uri) throws IOException { } ReadableByteChannel reader = blob.reader(); - return Channels.newInputStream(reader); + return new StorageObject(blob.getMetadata(), Channels.newInputStream(reader)); } catch (StorageException e) { throw new IOException(e); } @@ -194,16 +200,18 @@ private FileAttributes getGcsFileAttributes(Blob blob) { } @Override - public URI put(String tenantId, URI uri, InputStream data) throws IOException { + public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException { try { String path = getPath(tenantId, uri); mkdirs(path); BlobInfo blobInfo = BlobInfo .newBuilder(this.blob(tenantId, uri)) + .setMetadata(storageObject.metadata()) .build(); - try (WriteChannel writer = this.storage.writer(blobInfo)) { + try (WriteChannel writer = this.storage.writer(blobInfo); + InputStream data = storageObject.inputStream()) { byte[] buffer = new byte[10_240]; int limit; @@ -212,8 +220,6 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException { } } - data.close(); - return URI.create("kestra://" + uri.getPath()); } catch (StorageException e) { throw new IOException(e);