Skip to content

Commit

Permalink
support reading s3 config from config file
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Oct 2, 2024
1 parent 7446ed6 commit 598941e
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 27 deletions.
5 changes: 5 additions & 0 deletions .env_sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
AWS_S3_TEST_BUCKET=testbucket
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=admin123
PG_PARQUET_TEST=true
50 changes: 45 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

env:
AWS_ACCESS_KEY_ID: test_secret_access_key
AWS_SECRET_ACCESS_KEY: test_access_key_id
AWS_REGION: us-east-1
AWS_S3_TEST_BUCKET: testbucket
PG_PARQUET_TEST: true

jobs:
build-and-test:
runs-on: ubuntu-latest
Expand All @@ -33,6 +40,25 @@ jobs:
sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config
sudo apt-get -y install postgresql-16-postgis-3 libpq-dev postgresql-server-dev-16 postgresql-client-16
- name: Install MinIO
run: |
# Download and install MinIO server and client
wget https://dl.min.io/server/minio/release/linux-amd64/$MINIO_VERSION
chmod +x $MINIO_VERSION
mv $MINIO_VERSION /usr/local/bin/minio
echo "$MINIO_SHA256 /usr/local/bin/minio" | sha256sum --check
# Download and install MinIO admin
wget https://dl.min.io/client/mc/release/linux-amd64/$MINIO_ADMIN_VERSION
chmod +x $MINIO_ADMIN_VERSION
mv $MINIO_ADMIN_VERSION /usr/local/bin/mc
echo "$MINIO_ADMIN_SHA256 /usr/local/bin/mc" | sha256sum --check
env:
MINIO_VERSION: "minio.RELEASE.2024-09-22T00-33-43Z"
MINIO_SHA256: "dea08573980057d84c14d5c55926e10b91fb2993a99696ff136fb0bddaa7c98f"
MINIO_ADMIN_VERSION: "mc.RELEASE.2024-09-16T17-43-14Z"
MINIO_ADMIN_SHA256: "9a9e7d32c175f2804d6880d5ad3623097ea439f0e0304aa6039874d0f0c493d8"

- name: Install and configure pgrx
run: |
cargo install --locked [email protected]
Expand All @@ -48,14 +74,28 @@ jobs:
- name: Create .env file
run: |
touch /tmp/.env
echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> /tmp/.env
echo AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env
echo AWS_REGION=${{ secrets.AWS_REGION }} >> /tmp/.env
echo AWS_S3_TEST_BUCKET=${{ secrets.AWS_S3_TEST_BUCKET }} >> /tmp/.env
echo AWS_ACCESS_KEY_ID=${{ env.AWS_ACCESS_KEY_ID }} >> /tmp/.env
echo AWS_SECRET_ACCESS_KEY=${{ env.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env
echo AWS_REGION=${{ env.AWS_REGION }} >> /tmp/.env
echo AWS_S3_TEST_BUCKET=${{ env.AWS_S3_TEST_BUCKET }} >> /tmp/.env
echo PG_PARQUET_TEST=${{ env.PG_PARQUET_TEST }} >> /tmp/.env
- name: Run tests
run: |
# Start MinIO server
export MINIO_ROOT_USER=${{ env.AWS_ACCESS_KEY_ID }}
export MINIO_ROOT_PASSWORD=${{ env.AWS_SECRET_ACCESS_KEY }}
minio server /tmp/minio-storage > /dev/null 2>&1 &
# Set access key and create test bucket
mc alias set local http://localhost:9000 ${{ env.AWS_ACCESS_KEY_ID }} ${{ env.AWS_SECRET_ACCESS_KEY }}
aws --endpoint-url http://localhost:9000 s3 mb s3://${{ env.AWS_S3_TEST_BUCKET }}
# Run tests with coverage tool
cargo llvm-cov test --lcov --output-path lcov.info
# Stop MinIO server
pkill -9 minio
env:
RUST_TEST_THREADS: 1
CARGO_PGRX_TEST_RUNAS: postgres
Expand All @@ -64,7 +104,7 @@ jobs:
- name: Upload coverage report to Codecov
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
fail_ci_if_error: false
files: ./lcov.info
flags: pgrxtests
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pg_test = []
[dependencies]
arrow = {version = "53", default-features = false}
arrow-schema = {version = "53", default-features = false}
aws-config = { version = "1.5", default-features = false, features = ["rustls"]}
aws-credential-types = {version = "1.2", default-features = false}
dotenvy = "0.15"
futures = "0.3"
object_store = {version = "0.11", default-features = false, features = ["aws"]}
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ You can call `SELECT * FROM parquet.kv_metadata(<uri>)` to query custom key-valu
## Object Store Support
`pg_parquet` supports reading and writing Parquet files from/to `S3` object store. Only the uris with `s3://` scheme is supported.

You can set the following `AWS S3` environment variables properly to access to the object store:
You can either set the following environment variables or use shared configuration files to access to the object store:
- `AWS_ACCESS_KEY_ID`: the access key ID of the AWS account,
- `AWS_SECRET_ACCESS_KEY`: the secret access key of the AWS account,
- `AWS_REGION`: the default region of the AWS account.

You can set config file path with `AWS_CONFIG_FILE` environment variable. The default config file path is `~/.aws/config`. You can also set profile name with `AWS_PROFILE` environment variable. The default profile name is `default`.

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension. (This is the only option that `COPY FROM` command supports.),
Expand Down
98 changes: 78 additions & 20 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::{str::FromStr, sync::Arc};

use arrow::datatypes::SchemaRef;
use object_store::aws::AmazonS3Builder;
use aws_config::environment::{
EnvironmentVariableCredentialsProvider, EnvironmentVariableRegionProvider,
};
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::{ProfileFileCredentialsProvider, ProfileFileRegionProvider};
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::async_writer::ParquetObjectWriter;
Expand Down Expand Up @@ -55,19 +62,27 @@ async fn object_store_with_location(uri: &str) -> (Arc<dyn ObjectStore>, Path) {

match uri_format {
UriFormat::File => {
// create or overwrite the local file
std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(uri)
.unwrap_or_else(|e| panic!("{}", e));

let storage_container = Arc::new(LocalFileSystem::new());

let location = Path::from_filesystem_path(uri).unwrap_or_else(|e| panic!("{}", e));

(storage_container, location)
}
UriFormat::S3 => {
let (bucket, key) = parse_bucket_and_key(uri);
let storage_container = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.build()
.unwrap_or_else(|e| panic!("{}", e)),
);
let (bucket_name, key) = parse_bucket_and_key(uri);

let storage_container = Arc::new(get_s3_object_store(&bucket_name).await);

let location = Path::from(key);

(storage_container, location)
}
}
Expand Down Expand Up @@ -127,22 +142,65 @@ pub(crate) async fn parquet_writer_from_uri(
arrow_schema: SchemaRef,
writer_props: WriterProperties,
) -> AsyncArrowWriter<ParquetObjectWriter> {
let uri_format = UriFormat::from_str(uri).unwrap_or_else(|e| panic!("{}", e));

if uri_format == UriFormat::File {
// we overwrite the local file if it exists
std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(uri)
.unwrap_or_else(|e| panic!("{}", e));
}

let (parquet_object_store, location) = object_store_with_location(uri).await;

let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);

AsyncArrowWriter::try_new(parquet_object_writer, arrow_schema, Some(writer_props))
.unwrap_or_else(|e| panic!("failed to create parquet writer for uri {}: {}", uri, e))
}

pub async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 {
// try loading the .env file
dotenvy::from_path("/tmp/.env").ok();

let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name);

let is_test_running = std::env::var("PG_PARQUET_TEST").is_ok();

if is_test_running {
// use minio for testing
aws_s3_builder = aws_s3_builder.with_endpoint("http://localhost:9000");
aws_s3_builder = aws_s3_builder.with_allow_http(true);
}

let aws_profile_name = std::env::var("AWS_PROFILE").unwrap_or("default".to_string());

// load the region from the environment variables or the profile file
let region_provider = RegionProviderChain::first_try(EnvironmentVariableRegionProvider::new())
.or_else(
ProfileFileRegionProvider::builder()
.profile_name(aws_profile_name.clone())
.build(),
);

let region = region_provider.region().await;

if let Some(region) = region {
aws_s3_builder = aws_s3_builder.with_region(region.to_string());
}

// load the credentials from the environment variables or the profile file
let credential_provider = CredentialsProviderChain::first_try(
"Environment",
EnvironmentVariableCredentialsProvider::new(),
)
.or_else(
"Profile",
ProfileFileCredentialsProvider::builder()
.profile_name(aws_profile_name)
.build(),
);

if let Ok(credentials) = credential_provider.provide_credentials().await {
aws_s3_builder = aws_s3_builder.with_access_key_id(credentials.access_key_id());

aws_s3_builder = aws_s3_builder.with_secret_access_key(credentials.secret_access_key());

if let Some(token) = credentials.session_token() {
aws_s3_builder = aws_s3_builder.with_token(token);
}
}

aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e))
}
47 changes: 46 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub extern "C" fn _PG_init() {
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use std::io::Write;
use std::marker::PhantomData;
use std::{collections::HashMap, fmt::Debug};

Expand Down Expand Up @@ -1116,7 +1117,7 @@ mod tests {
}

#[pg_test]
fn test_s3_object_store() {
fn test_s3_object_store_from_env() {
dotenvy::from_path("/tmp/.env").unwrap();

let test_bucket_name: String =
Expand All @@ -1130,6 +1131,50 @@ mod tests {
test_helper(test_table);
}

#[pg_test]
fn test_s3_object_store_from_config_file() {
dotenvy::from_path("/tmp/.env").unwrap();

let test_bucket_name: String =
std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found");

// remove these to make sure the config file is used
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
std::env::remove_var("AWS_ACCESS_KEY_ID");
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
std::env::remove_var("AWS_SECRET_ACCESS_KEY");
let region = std::env::var("AWS_REGION").unwrap();
std::env::remove_var("AWS_REGION");

// create a config file
let aws_config_file_content = format!(
"[profile pg_parquet_test]\nregion = {}\naws_access_key_id = {}\naws_secret_access_key = {}\n",
region, access_key_id, secret_access_key
);
std::env::set_var("AWS_PROFILE", "pg_parquet_test");

let aws_config_file = "/tmp/aws_config";
std::env::set_var("AWS_CONFIG_FILE", aws_config_file);

let mut aws_config_file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(aws_config_file)
.unwrap();

aws_config_file
.write_all(aws_config_file_content.as_bytes())
.unwrap();

let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name);

let test_table = TestTable::<i32>::new("int4".into()).with_uri(s3_uri);

test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_helper(test_table);
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_s3_object_store_write_invalid_uri() {
Expand Down

0 comments on commit 598941e

Please sign in to comment.