Skip to content

Commit

Permalink
Merge pull request data-integrations#43 from cloudsufi/bugfix/CSM-1056
Browse files Browse the repository at this point in the history
PLUGIN-1637 PLUGIN-1638 PLUGIN-1639 - SFMC changes
  • Loading branch information
sgarg-CS authored Jul 3, 2023
2 parents 17214c7 + 8ab066d commit 2c58029
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 48 deletions.
2 changes: 1 addition & 1 deletion docs/MarketingCloud-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ You also can use the macro function ${conn(connection-name)}.
For example, `https://instance.auth.marketingcloudapis.com/`

**SOAP API Endpoint**: The SOAP Endpoint URL associated for the Server-to-Server API integration. For example,
`https://instance.soap.marketingcloudapis.com/Service.asmx`
`https://instance.soap.marketingcloudapis.com/`

**Reference Name**: Name used to uniquely identify this source for lineage, annotating metadata, etc.

Expand Down
2 changes: 1 addition & 1 deletion docs/MarketingCloud-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Properties
For example, `https://instance.auth.marketingcloudapis.com/`

**SOAP API Endpoint**: The SOAP Endpoint URL associated for the Server-to-Server API integration. For example,
`https://instance.soap.marketingcloudapis.com/Service.asmx`
`https://instance.soap.marketingcloudapis.com/`

Path of the connection
----------------------
Expand Down
31 changes: 18 additions & 13 deletions src/main/java/com/custom/fuelsdk/PaginationETSoapObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.exacttarget.fuelsdk.internal.Soap;
import io.cdap.plugin.sfmc.source.util.MarketingCloudConstants;
import io.cdap.plugin.sfmc.source.util.SourceObject;

import org.apache.log4j.Logger;

import java.util.ArrayList;
Expand Down Expand Up @@ -67,35 +66,41 @@ public class PaginationETSoapObject extends ETSoapObject {
private static Logger logger = Logger.getLogger(PaginationETSoapObject.class);

/**
* @param client
* @param dataExtension
* @param filter
* @return
* Filter and retrieve the records from requested data extension
*
* @param client The ETClient object
* @param dataExtension The data extension key. For example, `54068FF1-E1A6-4207-8184-09C61D420DC7`
* @param filter The filter string to filter the records
* @return The list of ETDataExtensionRow representing the records from requested data extension
* @throws ETSdkException This method is from ETDataExtension with fix for pagination
* of Data Extension object.
*/
public static ETResponse<ETDataExtensionRow> select(ETClient client,
String dataExtension,
ETFilter filter)
throws ETSdkException {
String name = null;

//
// The data extension can be specified using key or name:
//

ETExpression e = ETExpression.parse(dataExtension);
ETExpression e = ETExpression.parse(String.format("key=%s", dataExtension));
if (e.getProperty().toLowerCase().equals("key")
&& e.getOperator() == ETExpression.Operator.EQUALS) {
name = e.getValue();

// Discussion pending with the Salesforce team regarding their SOAP API.
// name = e.getValue();

// if no columns are explicitly requested
// retrieve all columns
if (filter.getProperties().isEmpty()) {
filter.setProperties(retrieveColumnNames(client, name));
filter.setProperties(retrieveColumnNames(client, dataExtension));
}
} else if (e.getProperty().toLowerCase().equals("name")
&& e.getOperator() == ETExpression.Operator.EQUALS) {
name = e.getValue();

// Discussion pending with the Salesforce team regarding their SOAP API.
// name = e.getValue();

// if no columns are explicitly requested
// throw an exception
// because we need the key
Expand Down Expand Up @@ -153,8 +158,8 @@ public static ETResponse<ETDataExtensionRow> select(ETClient client,
*/

/*Start BUG FIX*/
ETResponse<ETDataExtensionRow> response = customRetrieve(client, "DataExtensionObject[" + name + "]",
filter, null, ETDataExtensionRow.class);
ETResponse<ETDataExtensionRow> response = customRetrieve(client,
"DataExtensionObject[" + dataExtension + "]", filter, null, ETDataExtensionRow.class);
/*End BUG FIX*/
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class MarketingConnectorConfig extends PluginConfig {
@Name(MarketingCloudConstants.PROPERTY_SOAP_API_ENDPOINT)
@Macro
@Description("The SOAP Endpoint URL associated for the Server-to-Server API integration. " +
"For example, https://instance.soap.marketingcloudapis.com/Service.asmx")
"For example, https://instance.soap.marketingcloudapis.com/")
private final String soapEndpoint;

public MarketingConnectorConfig(String clientId, String clientSecret, String authEndpoint, String soapEndpoint) {
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,15 @@ public void validateSchemaCompatibility(Schema schema, FailureCollector collecto
collector.addFailure(
String.format("Column '%s' is a boolean in data extension '%s', but is a '%s' in the input schema.",
originalName, dataExtensionKey, schemaTypeStr),
"Change the field schema to ensure it is a boolean or string type.")
.withInputSchemaField(fieldName);
"Change the field schema to ensure it is a boolean or string type.");
}
break;
case DECIMAL:
if (fieldSchema.getLogicalType() != Schema.LogicalType.DECIMAL) {
collector.addFailure(
String.format("Column '%s' is a decimal in data extension '%s', but is a '%s' in the input schema.",
originalName, dataExtensionKey, schemaTypeStr),
"Change the field schema to ensure it is a decimal or string type.").withInputSchemaField(fieldName);
"Change the field schema to ensure it is a decimal or string type.");
}
break;
case PHONE:
Expand All @@ -149,33 +148,33 @@ public void validateSchemaCompatibility(Schema schema, FailureCollector collecto
collector.addFailure(
String.format("Column '%s' is a %s in data extension '%s', but is a '%s' in the input schema.",
originalName, column.getType().name().toLowerCase(), dataExtensionKey, schemaTypeStr),
"Change the field schema to ensure it is a string type.").withInputSchemaField(fieldName);
"Change the field schema to ensure it is a string type.");
break;
case DATE:
if (fieldSchema.getLogicalType() != Schema.LogicalType.DATE) {
collector.addFailure(
String.format("Column '%s' is a date in data extension '%s', but is a '%s' in the input schema.",
originalName, dataExtensionKey, schemaTypeStr),
"Change the field schema to ensure it is a date or string type.").withInputSchemaField(fieldName);
"Change the field schema to ensure it is a date or string type.");
}
break;
case NUMBER:
if (schemaType != Schema.Type.INT) {
collector.addFailure(
String.format("Column '%s' is a number in data extension '%s', but is a '%s' in the input schema.",
originalName, dataExtensionKey, schemaTypeStr),
"Change the field schema to ensure it is an integer or string type.").withInputSchemaField(fieldName);
"Change the field schema to ensure it is an integer or string type.");
}
break;
default:
collector.addFailure(
String.format("Unknown type '%s' for column '%s' in data extension '%s'.",
column.getType(), column.getName(), dataExtensionKey),
"Supported types are: boolean, decimal, phone, text, email_address, locale, date and number.")
.withInputSchemaField(fieldName);
"Supported types are: boolean, decimal, phone, text, email_address, locale, date and number.");
}
}
}
collector.getOrThrowException();
return null;
});
}
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/io/cdap/plugin/sfmc/sink/MarketingCloudConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.sfmc.sink;

import com.exacttarget.fuelsdk.ETClient;
import com.exacttarget.fuelsdk.ETSdkException;
import com.google.common.annotations.VisibleForTesting;
import io.cdap.cdap.api.annotation.Description;
Expand Down Expand Up @@ -146,7 +147,30 @@ Map<String, String> getColumnMapping(@Nullable Schema originalSchema, FailureCol
throw collector.getOrThrowException();
}
if (fieldNames.contains(parts[0])) {
mapping.put(parts[0], parts[1]);
if (getConnection().shouldConnect()) {
try {
DataExtensionClient client = DataExtensionClient.create(dataExtension, getConnection().getClientId(),
getConnection().getClientSecret(),
getConnection().getAuthEndpoint(),
getConnection().getSoapEndpoint());
if (client.getDataExtensionInfo().getColumn(parts[1]) != null) {
mapping.put(parts[0], parts[1]);
} else {
collector.addFailure(String.format("Invalid data extension column name: %s", parts[1]),
"Make sure column name exists in the data extension")
.withConfigProperty(COLUMN_MAPPING);
throw collector.getOrThrowException();
}
} catch (ETSdkException e) {
collector.addFailure("Error while validating Marketing Cloud client: " + e.getMessage(), null)
.withStacktrace(e.getStackTrace());
}
}
} else {
collector.addFailure(String.format("Invalid column name: %s", parts[0]),
"Make sure column name exists in the input schema")
.withConfigProperty(COLUMN_MAPPING);
throw collector.getOrThrowException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public ETResponse<ETDataExtensionRow> fetchDataExtensionRecords(String dataExten
ETResponse<ETDataExtensionRow> response = null;

if (requestId == null) {
response = PaginationETSoapObject.select(client, "key=" + dataExtensionKey,
response = PaginationETSoapObject.select(client, dataExtensionKey,
filter);
} else {
response = PaginationETSoapObject.continueRequest(client, null, requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void close() {
private void fetchData() throws Exception {
object = SourceObject.valueOf(split.getObjectName());
tableName = split.getTableName();
formattedTableName = tableName.replaceAll("-", "_");
formattedTableName = tableName.replaceAll("-|\\s", "_");

if (object == SourceObject.DATA_EXTENSION) {
dataExtensionKey = tableName.replaceAll(MarketingCloudConstants.DATA_EXTENSION_PREFIX, "");
Expand Down Expand Up @@ -180,6 +180,6 @@ private void fetchSchema(MarketingCloudClient client) {
} catch (Exception e) {
schemaFields = Collections.emptyList();
}
schema = Schema.recordOf(tableName.replaceAll("-", "_"), schemaFields);
schema = Schema.recordOf(tableName.replaceAll("-|\\s", "_"), schemaFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private Schema getSchema(SourceQueryMode mode) {
}

private void recordLineage(BatchSourceContext context, MarketingCloudObjectInfo tableInfo) {
String tableName = tableInfo.getFormattedTableName();
String tableName = tableInfo.getFormattedTableName().replaceAll("-|\\s", "_");
String outputName = String.format("%s-%s", conf.getReferenceName(), tableName);
Schema schema = tableInfo.getSchema();
LineageRecorder lineageRecorder = new LineageRecorder(context, outputName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.plugin.sfmc.source;

import com.exacttarget.fuelsdk.ETDataExtension;
import com.exacttarget.fuelsdk.ETResponse;
import com.exacttarget.fuelsdk.ETSdkException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand All @@ -30,7 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import java.util.stream.Collectors;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -97,9 +99,9 @@ public class MarketingCloudSourceConfig extends SalesforceMarketingCloudBaseConf
@Name(MarketingCloudConstants.PROPERTY_TABLE_NAME_FIELD)
@Macro
@Nullable
@Description("The name of the field that holds the object name to which the data belongs to. Must not be the name" +
@Description("The name of the field that holds the object name to which the data belongs to. Must not be the name " +
"of any column for any of the objects that will be read. For the Data Extension object, this field will have a " +
"value in dataextension_[Data Extension Key] format. Note, the Table Name property value is ignored if the mode" +
"value in dataextension_[Data Extension Key] format. Note, the Table Name property value is ignored if the mode " +
"is set to Single Object.")
private String tableNameField;

Expand Down Expand Up @@ -319,6 +321,14 @@ private void validateMultiObjectQueryMode(FailureCollector collector) {
if (dataExtensionKeyList.isEmpty()) {
collector.addFailure("At least 1 Data Extension Key must be specified.", null)
.withConfigProperty(MarketingCloudConstants.PROPERTY_DATA_EXTENSION_KEY_LIST);
} else {
if (getConnection().shouldConnect()) {
if (!getActualDataExtensionKeys(collector).containsAll(dataExtensionKeyList)) {
collector.addFailure("Data Extension Keys provided, are invalid.",
"Please specify valid data extension keys.")
.withConfigProperty(MarketingCloudConstants.PROPERTY_DATA_EXTENSION_KEY_LIST);
}
}
}
}

Expand All @@ -336,9 +346,19 @@ private void validateSingleObjectQueryMode(FailureCollector collector) {

SourceObject object = getObject(collector);

if (object == SourceObject.DATA_EXTENSION && Util.isNullOrEmpty(dataExtensionKey)) {
collector.addFailure("Data Extension Key must be specified.", null)
.withConfigProperty(MarketingCloudConstants.PROPERTY_DATA_EXTENSION_KEY);
if (object == SourceObject.DATA_EXTENSION) {
if (Util.isNullOrEmpty(dataExtensionKey)) {
collector.addFailure("Data Extension Key must be specified.", null)
.withConfigProperty(MarketingCloudConstants.PROPERTY_DATA_EXTENSION_KEY);
} else {
if (getConnection().shouldConnect()) {
if (!getActualDataExtensionKeys(collector).contains(dataExtensionKey)) {
collector.addFailure("Data Extension Key provided, is invalid.",
"Please specify a valid data extension key.")
.withConfigProperty(MarketingCloudConstants.PROPERTY_DATA_EXTENSION_KEY);
}
}
}
}
}

Expand All @@ -355,4 +375,30 @@ private void validateFilter(FailureCollector collector) {
.withStacktrace(e.getStackTrace());
}
}

public List<String> getActualDataExtensionKeys(FailureCollector collector) {
List<String> actualDataExtensionKeys = new ArrayList<>();
try {
MarketingCloudClient marketingCloudClient = MarketingCloudClient.getOrCreate(getConnection().getClientId(),
getConnection().getClientSecret(),
getConnection().getAuthEndpoint(),
getConnection().getSoapEndpoint());

ETResponse<ETDataExtension> response = marketingCloudClient.retrieveDataExtensionKeys();
actualDataExtensionKeys = response.getObjects().stream().map(e -> e.getKey())
.collect(Collectors.toList());
} catch (ETSdkException e) {
collector.addFailure("Unable to connect to Salesforce Instance.",
"Ensure properties like Client ID, Client Secret, API Endpoint, " +
"Soap Endpoint, Auth Endpoint are correct.")
.withConfigProperty(MarketingCloudConstants.PROPERTY_CLIENT_ID)
.withConfigProperty(MarketingCloudConstants.PROPERTY_CLIENT_SECRET)
.withConfigProperty(MarketingCloudConstants.PROPERTY_AUTH_API_ENDPOINT)
.withConfigProperty(MarketingCloudConstants.PROPERTY_SOAP_API_ENDPOINT)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
return actualDataExtensionKeys;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ public String getTableName() {
}

/**
* Replaces all hyphen (-) characters in column name with single underscore
* Replaces all hyphen (-) and space characters in column name with single underscore
* @return table name with all characters replaced.
*/
public String getFormattedTableName() {
return getTableName().replaceAll("-", "_");
return getTableName().replaceAll("-|\\s", "_");
}

public Schema getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testEmptyColumnMapping() {
MarketingCloudConf config = new MarketingCloudConf("referenceName", CLIENT_ID,
CLIENT_SECRET, "DE", AUTH_ENDPOINT,
SOAP_ENDPOINT, 500, null,
"column=mapping", null,
null, null,
null, null);
config.shouldFailOnError();
config.shouldTruncateText();
Expand Down
Loading

0 comments on commit 2c58029

Please sign in to comment.