diff --git a/docs/HTTP-batchsource.md b/docs/HTTP-batchsource.md index 37ed1e2..749a4f5 100644 --- a/docs/HTTP-batchsource.md +++ b/docs/HTTP-batchsource.md @@ -203,6 +203,10 @@ can be omitted as long as the field is present in schema. **CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row. +**Authorize Parsing of Objects to String:** Whether to allow the parsing of JSON Objects/Arrays to string. +If set to true, every JSON Objects/Arrays in the record will be parsed as a string field if defined as string in the output schema. +This option can be used to handle JSON that contain fields with dynamically changing schema + ### Basic Authentication **Username:** Username for basic authentication. diff --git a/docs/HTTP-streamingsource.md b/docs/HTTP-streamingsource.md index 4cad538..8145182 100644 --- a/docs/HTTP-streamingsource.md +++ b/docs/HTTP-streamingsource.md @@ -207,6 +207,10 @@ can be omitted as long as the field is present in schema. **CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row. +**Authorize Parsing of Objects to String:** Whether to allow the parsing of JSON Objects/Arrays to string. +If set to true, every JSON Objects/Arrays in the record will be parsed as a string field if defined as string in the output schema. +This option can be used to handle JSON that contain fields with dynamically changing schema + ### Basic Authentication **Username:** Username for basic authentication. diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index a554dd6..1149834 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -98,6 +98,7 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { public static final String PROPERTY_TRUSTSTORE_KEY_ALGORITHM = "trustStoreKeyAlgorithm"; public static final String PROPERTY_TRANSPORT_PROTOCOLS = "transportProtocols"; public static final String PROPERTY_CIPHER_SUITES = "cipherSuites"; + public static final String PROPERTY_AUTHORIZE_OBJECT_PARSING_TO_STRING = "authorizeParsingOfObjectToString"; public static final String PROPERTY_SCHEMA = "schema"; public static final String PAGINATION_INDEX_PLACEHOLDER_REGEX = "\\{pagination.index\\}"; @@ -384,6 +385,12 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig { @Macro protected String cipherSuites; + @Name(PROPERTY_AUTHORIZE_OBJECT_PARSING_TO_STRING) + @Description("If set to true, the JSON Arrays and JSON Objects in data can be retrieved as strings field " + + "(if set as string in the output schema). This can be used to handle JSONs with dynamic schema. ") + @Macro + protected String authorizeParsingOfObjectToString; + @Name(PROPERTY_SCHEMA) @Macro @Nullable @@ -617,6 +624,10 @@ public String getCipherSuites() { return cipherSuites; } + public Boolean isParsingOfObjectToStringEnabled() { + return Boolean.parseBoolean(authorizeParsingOfObjectToString); + } + @Nullable public Schema getSchema() { try { diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java index b89d5af..51943a2 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/page/JsonPage.java @@ -15,15 +15,20 @@ */ package io.cdap.plugin.http.source.common.pagination.page; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.InvalidEntry; import io.cdap.cdap.format.StructuredRecordStringConverter; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; import io.cdap.plugin.http.source.common.http.HttpResponse; +import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -35,6 +40,8 @@ * Returns elements from json one by one by given json path. */ class JsonPage extends BasePage { + private static final Logger LOG = LoggerFactory.getLogger(BaseHttpPaginationIterator.class); + private final String insideElementJsonPathPart; private final Iterator iterator; private final JsonElement json; @@ -157,6 +164,19 @@ public PageEntry next() { resultJson.add(schemaFieldName, queryResponse.get()); } + if (config.isParsingOfObjectToStringEnabled()) { + JsonElement newResultJson = stringifyJsonObjectsIfNeeded(resultJson, schema); + + if (newResultJson.isJsonPrimitive()) { + InvalidEntry error = + new InvalidEntry<>(1, "Resulting JSON was a primitive and not a json object", + null); + return new PageEntry(error, config.getErrorHandling()); + } + + resultJson = newResultJson.getAsJsonObject(); + } + String jsonString = resultJson.toString(); try { StructuredRecord record = StructuredRecordStringConverter.fromJsonString(jsonString, schema); @@ -171,6 +191,141 @@ public PageEntry next() { } } + /** + * In a JSONObject stringify all JSONArray and JSONObjects if they are defined as strings in the schema + * + * @param jsonObject the json object + * @param schema the json schema + * @return the processed object + */ + private JsonElement stringifyJsonObjectsIfNeeded(JsonObject jsonObject, Schema schema) { + if (!jsonObject.isJsonPrimitive()) { + if (schemaTypeEquals(schema, Schema.Type.STRING)) { + return new JsonPrimitive(jsonObject.toString()); + } + } else { + return jsonObject; + } + + JsonObject newJsonObject = new JsonObject(); + + for (Map.Entry e : jsonObject.entrySet()) { + String fieldName = e.getKey(); + JsonElement field = e.getValue(); + + Schema.Field fieldSchemaObj = getFieldSchema(schema, fieldName); + + if (fieldSchemaObj != null) { + Schema fieldSchema = fieldSchemaObj.getSchema(); + + if (field.isJsonPrimitive()) { + newJsonObject.add(fieldName, field); + } else if (field.isJsonObject()) { + newJsonObject.add(fieldName, stringifyJsonObjectsIfNeeded(field.getAsJsonObject(), fieldSchema)); + } else { // json array + newJsonObject.add(fieldName, stringifyJsonArraysIfNeeded(field.getAsJsonArray(), fieldSchema)); + } + } + } + + return newJsonObject; + } + + + /** + * In a JSONArray stringify all JSONArray and JSONObjects if they are defined as strings in the schema + * + * @param jsonArray the json array + * @param schema the json schema + * @return the processed object + */ + private JsonElement stringifyJsonArraysIfNeeded(JsonArray jsonArray, Schema schema) { + if (!jsonArray.isJsonPrimitive()) { + if (schemaTypeEquals(schema, Schema.Type.STRING)) { + return new JsonPrimitive(jsonArray.toString()); + } + } else { + return jsonArray; + } + + JsonArray newJsonArray = new JsonArray(); + + for (JsonElement je : jsonArray) { + if (je.isJsonPrimitive()) { + newJsonArray.add(je); + } else { + Schema componentSchema = getArrayComponentSchema(schema); + if (componentSchema == null) { + LOG.error("Trying to retrieve sub-schema of array " + jsonArray + " in schema but found " + schema); + } else { + if (je.isJsonObject()) { + newJsonArray.add(stringifyJsonObjectsIfNeeded(je.getAsJsonObject(), componentSchema)); + } else { // json array + newJsonArray.add(stringifyJsonArraysIfNeeded(je.getAsJsonArray(), componentSchema)); + } + } + } + } + + return newJsonArray; + } + + /** + * Return the ComponentSchema of an ARRAY or an UNION[NULL, ARRAY] + */ + private Schema getArrayComponentSchema(Schema schema) { + if (schema.getType().equals(Schema.Type.ARRAY)) { + return schema.getComponentSchema(); + } + + if (schema.getType().equals(Schema.Type.UNION)) { + for (Schema unionSchema : schema.getUnionSchemas()) { + if (unionSchema.getType().equals(Schema.Type.ARRAY)) { + return unionSchema.getComponentSchema(); + } + } + } + + return null; + } + + /** + * Return the Schema of a field of a RECORD or an UNION[NULL, RECORD] + */ + private Schema.Field getFieldSchema(Schema schema, String fieldName) { + + if (schema.getType().equals(Schema.Type.RECORD)) { + return schema.getField(fieldName); + } + + if (schema.getType().equals(Schema.Type.UNION)) { + for (Schema unionSchema : schema.getUnionSchemas()) { + if (unionSchema.getType().equals(Schema.Type.RECORD)) { + return unionSchema.getField(fieldName); + } + } + } + + return null; + } + + /** + * Return true if Schema is of type 'type' or of type UNION[NULL, 'type'] + */ + private boolean schemaTypeEquals(Schema schema, Schema.Type type) { + + if (schema.getType().equals(Schema.Type.UNION)) { + for (Schema unionSchema : schema.getUnionSchemas()) { + if (unionSchema.getType().equals(type)) { + return true; + } + } + } + + return schema.getType().equals(type); + } + + private List getOptionalFields() { List optionalFields = new ArrayList<>(); List allFields = schema.getFields(); diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json index 6c05378..1a4f8f7 100644 --- a/widgets/HTTP-batchsource.json +++ b/widgets/HTTP-batchsource.json @@ -99,6 +99,22 @@ } ] } + }, + { + "widget-type": "toggle", + "label": "Authorize Parsing of Objects to String", + "name": "authorizeParsingOfObjectToString", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } } ] }, @@ -666,6 +682,20 @@ } ] }, + { + "name": "JSON Formatting", + "condition": { + "property": "format", + "operator": "equal to", + "value": "json" + }, + "show": [ + { + "name": "authorizeParsingOfObjectToString", + "type": "property" + } + ] + }, { "name": "CSV Formatting", "condition": { diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json index e7abdb6..db3a5ac 100644 --- a/widgets/HTTP-streamingsource.json +++ b/widgets/HTTP-streamingsource.json @@ -104,6 +104,22 @@ } ] } + }, + { + "widget-type": "toggle", + "label": "Authorize Parsing of Objects to String", + "name": "authorizeParsingOfObjectToString", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } } ] }, @@ -666,6 +682,20 @@ } ] }, + { + "name": "JSON Formatting", + "condition": { + "property": "format", + "operator": "equal to", + "value": "json" + }, + "show": [ + { + "name": "authorizeParsingOfObjectToString", + "type": "property" + } + ] + }, { "name": "CSV Formatting", "condition": {