Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix conflicts with upstream #2

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions athena-dynamodb/athena-dynamodb.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
Transform: 'AWS::Serverless-2016-10-31'
Metadata:
'AWS::ServerlessRepo::Application':
Name: AthenaDynamoDBConnector
Name: AthenaDynamoDBConnector-Porsche
Description: 'This connector enables Amazon Athena to communicate with DynamoDB, making your tables accessible via SQL.'
Author: 'default author'
Author: 'Porsche - Insights BI'
SpdxLicenseId: Apache-2.0
LicenseUrl: LICENSE.txt
ReadmeUrl: README.md
Labels:
- athena-federation
HomePageUrl: 'https://github.com/awslabs/aws-athena-query-federation'
HomePageUrl: 'https://github.com/porscheofficial/aws-athena-query-federation'
SemanticVersion: 2022.47.1
SourceCodeUrl: 'https://github.com/awslabs/aws-athena-query-federation'
SourceCodeUrl: 'https://github.com/porscheofficial/aws-athena-query-federation'
Parameters:
AthenaCatalogName:
Description: 'This is the name of the lambda function that will be created. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$'
Expand All @@ -24,6 +24,28 @@ Parameters:
Description: 'The prefix within SpillBucket where this function can spill data.'
Type: String
Default: athena-spill
SpillPutRequestHeaders:
Description: '(Optional) A JSON encoded map of request headers and values for the Amazon S3 putObject request that is used for spilling. (for example, {"x-amz-server-side-encryption" : "aws:kms"}).'
Type: String
DisableGlue:
Description: "(Optional) If present and set to 'true', the connector does not attempt to retrieve supplemental metadata from AWS Glue."
Default: 'false'
Type: String
AllowedValues:
- true
- false
DisableProjectionAndCasing:
Description: "(Optional) Disables projection and casing. Use if you want to query DynamoDB tables that have casing in their column names and you do not want to specify a columnMapping property on your AWS Glue table. Can be set to 'auto' or 'always', set to 'auto' by default. See documentation for more information."
Default: 'auto'
Type: String
AllowedValues:
- auto
- always
AllowedTables:
Description: "(Optional) List of ';' delimited table names that should be fetched from DynamoDB, no other tables will be scanned if supplied."
Type: String
AllowedPattern: ^(?!.*;$)[^;]*(;[^;]*)*$
Default: ""
LambdaTimeout:
Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)'
Default: 900
Expand Down Expand Up @@ -51,6 +73,8 @@ Parameters:

Conditions:
HasKMSKeyId: !Not [!Equals [!Ref KMSKeyId, ""]]
HasAllowedTables: !Not [!Equals [!Ref AllowedTables, ""]]
HasSpillPutRequestHeaders: !Not [!Equals [!Ref SpillPutRequestHeaders, ""]]
NotHasLambdaRole: !Equals [!Ref LambdaRole, ""]
HasPermissionsBoundary: !Not [ !Equals [ !Ref PermissionsBoundaryARN, "" ] ]
CreateKMSPolicy: !And [!Condition HasKMSKeyId, !Condition NotHasLambdaRole]
Expand All @@ -62,6 +86,10 @@ Resources:
Environment:
Variables:
disable_spill_encryption: !Ref DisableSpillEncryption
disable_projection_and_casing: !Ref DisableProjectionAndCasing
disable_glue: !Ref DisableGlue
spill_put_request_headers: !If [HasSpillPutRequestHeaders, !Ref SpillPutRequestHeaders, !Ref "AWS::NoValue"]
allowed_tables: !If [HasAllowedTables, !Ref AllowedTables, !Ref "AWS::NoValue"]
spill_bucket: !Ref SpillBucket
spill_prefix: !Ref SpillPrefix
kms_key_id: !If [HasKMSKeyId, !Ref KMSKeyId, !Ref "AWS::NoValue"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementResponse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class DynamoDBMetadataHandler
@VisibleForTesting
static final int MAX_SPLITS_PER_REQUEST = 1000;
private static final Logger logger = LoggerFactory.getLogger(DynamoDBMetadataHandler.class);
private static final String ALLOWED_TABLES_ENV = "allowed_tables";
static final String DYNAMODB = "dynamodb";
private static final String SOURCE_TYPE = "ddb";
// defines the value that should be present in the Glue Database URI to enable the DB for DynamoDB.
Expand All @@ -147,12 +149,19 @@ public class DynamoDBMetadataHandler
public DynamoDBMetadataHandler(java.util.Map<String, String> configOptions)
{
super(SOURCE_TYPE, configOptions);
this.ddbClient = DynamoDbClient.builder()
this.ddbClient = DynamoDbClient.builder()
.credentialsProvider(CrossAccountCredentialsProviderV2.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
.build();
this.glueClient = getAwsGlue();
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
this.tableResolver = new DynamoDBTableResolver(invoker, ddbClient);

String allowedTablesEnvStr = configOptions.getOrDefault(ALLOWED_TABLES_ENV, "");
List<String> allowedTables = new ArrayList<>();
if (!allowedTablesEnvStr.isEmpty()) {
allowedTables = Arrays.asList(allowedTablesEnvStr.split(";", -1));
}

this.tableResolver = new DynamoDBTableResolver(invoker, ddbClient, allowedTables);
this.queryPassthrough = new DDBQueryPassthrough();
}

Expand All @@ -171,7 +180,7 @@ public DynamoDBMetadataHandler(java.util.Map<String, String> configOptions)
this.glueClient = glueClient;
this.ddbClient = ddbClient;
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
this.tableResolver = new DynamoDBTableResolver(invoker, ddbClient);
this.tableResolver = new DynamoDBTableResolver(invoker, ddbClient, new ArrayList<>());
this.queryPassthrough = new DDBQueryPassthrough();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ public class DynamoDBTableResolver
private DynamoDbClient ddbClient;
// used to handle Throttling events using an AIMD strategy for congestion control.
private ThrottlingInvoker invoker;
private List<String> allowedTables;

public DynamoDBTableResolver(ThrottlingInvoker invoker, DynamoDbClient ddbClient)
public DynamoDBTableResolver(ThrottlingInvoker invoker, DynamoDbClient ddbClient, List<String> allowedTables)
{
this.invoker = invoker;
this.ddbClient = ddbClient;
this.allowedTables = allowedTables;
}

/**
Expand Down Expand Up @@ -90,7 +92,22 @@ private DynamoDBPaginatedTables listPaginatedTables(String token, int pageSize)
.limit(limit)
.build();
ListTablesResponse response = invoker.invoke(() -> ddbClient.listTables(ddbRequest));
tables.addAll(response.tableNames());
List<String> responseTableNames = response.tableNames();

if (!allowedTables.isEmpty()) {
responseTableNames.forEach(tableName -> {
if (allowedTables.contains(tableName)) {
tables.add(tableName);
}
else {
logger.warn("{} excluded from tables-to-scan", tableName);
}
});
}
else {
tables.addAll(responseTableNames);
}

nextToken = response.lastEvaluatedTableName();
}
while (nextToken != null && pageSize == UNLIMITED_PAGE_SIZE_VALUE);
Expand Down
Loading