Skip to content

Commit

Permalink
Add BigQuery Error Details Provider
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Nov 14, 2024
1 parent 601f62b commit efeef28
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 75 deletions.
2 changes: 1 addition & 1 deletion src/e2e-test/resources/errorMessage.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ errorMessageInvalidBucketName=Invalid bucket name in path
errorMessageInvalidFormat=Input has multi-level structure that cannot be represented appropriately as csv. \
Consider using json, avro or parquet to write data.
errorMessageMultipleFileWithFirstRowAsHeaderDisabled=Spark program 'phase-1' failed with error: Found a row with 6 fields when the schema only contains 4 fields. Check that the schema contains the right number of fields.. Please check the system logs for more details.
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error: For input string:
errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error:
errorMessageMultipleFileWithoutClearDefaultSchema=Spark program 'phase-1' failed with error: Found a row with 4 fields when the schema only contains 2 fields.
errorMessageInvalidSourcePath=Invalid bucket name in path 'abc@'. Bucket name should
errorMessageInvalidDestPath=Invalid bucket name in path 'abc@'. Bucket name should
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.common;

import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;

/**
* A custom ErrorDetailsProvider for BigQuery plugins.
*/
public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider {

@Override
protected String getExternalDocumentationLink() {
return "https://cloud.google.com/bigquery/docs/error-messages";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
Expand Down Expand Up @@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
storage, bucket, bucketName,
config.getLocation(), cmekKeyName);
}
// set error details provider
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
prepareRunInternal(context, bigQuery, bucketName);
}

Expand All @@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = String.format("gs://%s", runUUID.toString());
gcsPath = String.format("gs://%s", runUUID);
} else {
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
}
try {
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
Expand Down Expand Up @@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle
*
* @return Hadoop configuration
*/
protected Configuration getOutputConfiguration() throws IOException {
Configuration configuration = new Configuration(baseConfiguration);
return configuration;
protected Configuration getOutputConfiguration() {
return new Configuration(baseConfiguration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
Expand Down Expand Up @@ -103,6 +107,7 @@
*/
public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat<StructuredRecord, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class);
private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

@Override
public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
Expand Down Expand Up @@ -165,19 +170,31 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
throw new IOException("The output path '" + outputPath + "' already exists.");
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
}

// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
throw new IOException("Compression isn't supported for this OutputFormat.");
String errorMessage = "Compression isn't supported for this OutputFormat.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
}

// Error if unable to create a BigQuery helper.
try {
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, gse));
}

// Let delegate process its checks.
Expand Down Expand Up @@ -208,7 +225,11 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
} catch (GeneralSecurityException e) {
throw new IOException("Failed to create Bigquery client.", e);
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, e));
}
}

Expand Down Expand Up @@ -266,7 +287,11 @@ public void commitJob(JobContext jobContext) throws IOException {
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
} catch (Exception e) {
throw new IOException("Failed to import GCS into BigQuery. ", e);
String errorMessage = "Failed to import GCS into BigQuery.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, e));
}

cleanup(jobContext);
Expand Down Expand Up @@ -566,26 +591,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
int numOfErrors;
String errorMessage;
if (errors == null || errors.isEmpty()) {
errorMessage = pollJob.getStatus().getErrorResult().getMessage();
errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(),
pollJob.getStatus().getErrorResult().getMessage());
numOfErrors = 1;
} else {
errorMessage = errors.get(errors.size() - 1).getMessage();
errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(),
errors.get(errors.size() - 1).getMessage());
numOfErrors = errors.size();
}
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId()));
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
new IOException(errorMessageException));

}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
throw new IOException(
String.format(
"Job %s failed to complete after %s millis.",
jobReference.getJobId(),
elapsedTime));
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
, elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
new IOException(errorMessage));
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand Down Expand Up @@ -621,8 +654,12 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
TableSchema tableSchema = createTableSchemaFromFields(fieldsJson);
return Optional.of(tableSchema);
} catch (IOException e) {
throw new IOException(
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e);
String errorMessage = String.format("Unable to parse key '%s'.",
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, e));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA
} else if (jsonString.startsWith("[") && jsonString.endsWith("]")) {
writeJsonArrayToWriter(gson.fromJson(jsonString, JsonArray.class), writer);
} else {
throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " +
throw new IllegalArgumentException(String.format("Expected value of Field '%s' to be a valid JSON " +
"object or array.", name));
}
break;
Expand Down
Loading

0 comments on commit efeef28

Please sign in to comment.