Skip to content

Commit

Permalink
Upload to S3
Browse files Browse the repository at this point in the history
- Upload a timestamped file
- Update a stable copy to the latest file
  • Loading branch information
ptitfred committed Jan 29, 2025
1 parent 6d43eea commit bfffbfc
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 9 deletions.
87 changes: 87 additions & 0 deletions apps/transport/lib/S3/aggregates_uploader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.S3.AggregatesUploader do
@moduledoc """
Helpers to upload a file, computes its sha256, and update a "latest" file.
"""

@spec upload_aggregate!(Path.t(), String.t(), String.t()) :: :ok
@doc """
Example
with_tmp_file(fn file ->
File.write(file, "some relevant data")
upload_aggregate!(file, "aggregate-20250127193035.csv", "aggregate-latest.csv")
end)
"""
def upload_aggregate!(file, remote_file, remote_latest_file) do
with_tmp_file(fn checksum_file ->
sha256!(file, checksum_file)

upload_files!(file, checksum_file, remote_file)
|> update_latest_files!(remote_latest_file)
end)
end

@spec with_tmp_file((Path.t() -> any())) :: any()
def with_tmp_file(cb) do
file = mk_tmp_file()

try do
cb.(file)
after
:ok = File.rm(file)
end
end

defp mk_tmp_file do
path = System.tmp_dir!() |> Path.join(Ecto.UUID.generate())

File.touch!(path)

path
end

defp sha256!(file, checksum_file) do
hash_state = :crypto.hash_init(:sha256)

hash =
File.stream!(file, 2048)
|> Enum.reduce(hash_state, fn chunk, prev_state ->
:crypto.hash_update(prev_state, chunk)
end)
|> :crypto.hash_final()
|> Base.encode16()
|> String.downcase()

File.write(checksum_file, hash)
end

defp upload_files!(file, checksum_file, remote_file) do
remote_checksum_file = checksum_filename(remote_file)

stream_upload!(file, remote_file)
stream_upload!(checksum_file, remote_checksum_file)

{remote_file, remote_checksum_file}
end

defp update_latest_files!({remote_file, remote_checksum_file}, remote_latest_file) do
remote_latest_checksum_file = checksum_filename(remote_latest_file)

copy!(remote_file, remote_latest_file)
copy!(remote_checksum_file, remote_latest_checksum_file)

:ok
end

defp checksum_filename(base_filename) do
"#{base_filename}.sha256sum"
end

defp stream_upload!(file, filename) do
Transport.S3.stream_to_s3!(:aggregates, file, filename)
end

defp copy!(s3_path, filename) do
Transport.S3.remote_copy_file!(:aggregates, s3_path, filename)
end
end
10 changes: 9 additions & 1 deletion apps/transport/lib/S3/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Transport.S3 do
This module contains common code related to S3 object storage.
"""
require Logger
@type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos
@type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos | :aggregates

@spec bucket_name(bucket_feature()) :: binary()
def bucket_name(feature) do
Expand Down Expand Up @@ -55,4 +55,12 @@ defmodule Transport.S3 do
|> ExAws.S3.download_file(remote_path, local_path)
|> Transport.Wrapper.ExAWS.impl().request!()
end

@spec remote_copy_file!(bucket_feature(), binary(), binary()) :: any()
def remote_copy_file!(feature, remote_path_src, remote_path_dest) do
bucket = Transport.S3.bucket_name(feature)

ExAws.S3.put_object_copy(bucket, remote_path_dest, bucket, remote_path_src)
|> Transport.Wrapper.ExAWS.impl().request!()
end
end
19 changes: 15 additions & 4 deletions apps/transport/lib/jobs/stops_registry_snapshot_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ defmodule Transport.Jobs.StopsRegistrySnapshotJob do
@moduledoc """
Job in charge of building a snapshot of the stops registry.
"""

use Oban.Worker, unique: [period: {1, :days}], tags: ["registry"], max_attempts: 3
require Logger
import Transport.S3.AggregatesUploader

@impl Oban.Worker
def perform(_job) do
file = "#{System.tmp_dir!()}/registre-arrets.csv"
def perform(%Oban.Job{}) do
with_tmp_file(fn file ->
:ok = Transport.Registry.Engine.execute(file)

upload_aggregate!(
file,
"stops_registry_#{timestamp()}.csv",
"stops_registry_latest.csv"
)
end)
end

Transport.Registry.Engine.execute(file)
defp timestamp do
DateTime.utc_now()
|> Calendar.strftime("%Y%m%d.%H%M%S.%f")
end
end
27 changes: 27 additions & 0 deletions apps/transport/test/support/s3_test_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ defmodule Transport.Test.S3TestUtils do
end)
end

def s3_mock_stream_file(path: expected_path, bucket: expected_bucket, acl: expected_acl) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.S3.Upload{
src: %File.Stream{},
bucket: ^expected_bucket,
path: ^expected_path,
opts: [acl: ^expected_acl],
service: :s3
} ->
:ok
end)
end

def s3_mocks_delete_object(expected_bucket, expected_path) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.Operation.S3{
Expand All @@ -43,4 +56,18 @@ defmodule Transport.Test.S3TestUtils do
:ok
end)
end

def s3_mocks_remote_copy_file(expected_bucket, expected_src_path, expected_dest_path) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.Operation.S3{
bucket: ^expected_bucket,
path: ^expected_dest_path,
http_method: :put,
service: :s3,
headers: headers
} ->
assert Map.get(headers, "x-amz-copy-source") =~ "/#{expected_bucket}/#{expected_src_path}"
%{body: %{}}
end)
end
end
26 changes: 26 additions & 0 deletions apps/transport/test/transport/S3/aggregates_uploader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Transport.S3.AggregatesUploaderTest do
use ExUnit.Case, async: true

alias Transport.S3.AggregatesUploader
alias Transport.Test.S3TestUtils

test "export to S3" do
snapshot = "aggregate-20250127193035.csv"
latest = "aggregate-latest.csv"
checksum = "#{snapshot}.sha256sum"
latest_checksum = "#{latest}.sha256sum"

bucket_name = Transport.S3.bucket_name(:aggregates)

S3TestUtils.s3_mock_stream_file(path: snapshot, bucket: bucket_name, acl: :private)
S3TestUtils.s3_mock_stream_file(path: checksum, bucket: bucket_name, acl: :private)
S3TestUtils.s3_mocks_remote_copy_file(bucket_name, snapshot, latest)
S3TestUtils.s3_mocks_remote_copy_file(bucket_name, checksum, latest_checksum)

AggregatesUploader.with_tmp_file(fn file ->
File.write(file, "some relevant data")

:ok = AggregatesUploader.upload_aggregate!(file, snapshot, latest)
end)
end
end
3 changes: 2 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ config :transport,
history: "resource-history-dev",
on_demand_validation: "on-demand-validation-dev",
gtfs_diff: "gtfs-diff-dev",
logos: "logos-dev"
logos: "logos-dev",
aggregates: "aggregates-dev"
}

config :oauth2, Datagouvfr.Authentication,
Expand Down
3 changes: 2 additions & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ config :transport,
history: "resource-history-prod",
on_demand_validation: "on-demand-validation-prod",
gtfs_diff: "gtfs-diff-prod",
logos: "logos-prod"
logos: "logos-prod",
aggregates: "aggregates-prod"
}

# Configure Sentry for production and staging.
Expand Down
3 changes: 2 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ if app_env == :staging do
history: "resource-history-staging",
on_demand_validation: "on-demand-validation-staging",
gtfs_diff: "gtfs-diff-staging",
logos: "logos-staging"
logos: "logos-staging",
aggregates: "aggregates-staging"
}
end

Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ config :transport,
history: "resource-history-test",
on_demand_validation: "on-demand-validation-test",
gtfs_diff: "gtfs-diff-test",
logos: "logos-test"
logos: "logos-test",
aggregates: "aggregates-test"
},
workflow_notifier: Transport.Jobs.Workflow.ProcessNotifier,
export_secret_key: "fake_export_secret_key",
Expand Down

0 comments on commit bfffbfc

Please sign in to comment.