Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the capacity to parse a JSON Object or JSON Array as a string in HTTP source plugins #65

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/HTTP-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ can be omitted as long as the field is present in schema.

**CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row.

**Authorize Parsing of Objects to String:** Whether to allow the parsing of JSON Objects/Arrays to string.
If set to true, every JSON Objects/Arrays in the record will be parsed as a string field if defined as string in the output schema.
This option can be used to handle JSON that contain fields with dynamically changing schema

### Basic Authentication

**Username:** Username for basic authentication.
Expand Down
4 changes: 4 additions & 0 deletions docs/HTTP-streamingsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ can be omitted as long as the field is present in schema.

**CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row.

**Authorize Parsing of Objects to String:** Whether to allow the parsing of JSON Objects/Arrays to string.
If set to true, every JSON Objects/Arrays in the record will be parsed as a string field if defined as string in the output schema.
This option can be used to handle JSON that contain fields with dynamically changing schema

### Basic Authentication

**Username:** Username for basic authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {
public static final String PROPERTY_TRUSTSTORE_KEY_ALGORITHM = "trustStoreKeyAlgorithm";
public static final String PROPERTY_TRANSPORT_PROTOCOLS = "transportProtocols";
public static final String PROPERTY_CIPHER_SUITES = "cipherSuites";
public static final String PROPERTY_AUTHORIZE_OBJECT_PARSING_TO_STRING = "authorizeParsingOfObjectToString";
public static final String PROPERTY_SCHEMA = "schema";

public static final String PAGINATION_INDEX_PLACEHOLDER_REGEX = "\\{pagination.index\\}";
Expand Down Expand Up @@ -384,6 +385,12 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {
@Macro
protected String cipherSuites;

@Name(PROPERTY_AUTHORIZE_OBJECT_PARSING_TO_STRING)
@Description("If set to true, the JSON Arrays and JSON Objects in data can be retrieved as strings field " +
"(if set as string in the output schema). This can be used to handle JSONs with dynamic schema. ")
@Macro
protected String authorizeParsingOfObjectToString;

@Name(PROPERTY_SCHEMA)
@Macro
@Nullable
Expand Down Expand Up @@ -617,6 +624,10 @@ public String getCipherSuites() {
return cipherSuites;
}

public Boolean isParsingOfObjectToStringEnabled() {
return Boolean.parseBoolean(authorizeParsingOfObjectToString);
}

@Nullable
public Schema getSchema() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package io.cdap.plugin.http.source.common.pagination.page;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -35,6 +40,8 @@
* Returns elements from json one by one by given json path.
*/
class JsonPage extends BasePage {
private static final Logger LOG = LoggerFactory.getLogger(BaseHttpPaginationIterator.class);

private final String insideElementJsonPathPart;
private final Iterator<JsonElement> iterator;
private final JsonElement json;
Expand Down Expand Up @@ -157,6 +164,19 @@ public PageEntry next() {
resultJson.add(schemaFieldName, queryResponse.get());
}

if (config.isParsingOfObjectToStringEnabled()) {
JsonElement newResultJson = stringifyJsonObjectsIfNeeded(resultJson, schema);

if (newResultJson.isJsonPrimitive()) {
InvalidEntry<StructuredRecord> error =
new InvalidEntry<>(1, "Resulting JSON was a primitive and not a json object",
null);
return new PageEntry(error, config.getErrorHandling());
}

resultJson = newResultJson.getAsJsonObject();
}

String jsonString = resultJson.toString();
try {
StructuredRecord record = StructuredRecordStringConverter.fromJsonString(jsonString, schema);
Expand All @@ -171,6 +191,141 @@ public PageEntry next() {
}
}

/**
* In a JSONObject stringify all JSONArray and JSONObjects if they are defined as strings in the schema
*
* @param jsonObject the json object
* @param schema the json schema
* @return the processed object
*/
private JsonElement stringifyJsonObjectsIfNeeded(JsonObject jsonObject, Schema schema) {
if (!jsonObject.isJsonPrimitive()) {
if (schemaTypeEquals(schema, Schema.Type.STRING)) {
return new JsonPrimitive(jsonObject.toString());
}
} else {
return jsonObject;
}

JsonObject newJsonObject = new JsonObject();

for (Map.Entry<String, JsonElement> e : jsonObject.entrySet()) {
String fieldName = e.getKey();
JsonElement field = e.getValue();

Schema.Field fieldSchemaObj = getFieldSchema(schema, fieldName);

if (fieldSchemaObj != null) {
Schema fieldSchema = fieldSchemaObj.getSchema();

if (field.isJsonPrimitive()) {
newJsonObject.add(fieldName, field);
} else if (field.isJsonObject()) {
newJsonObject.add(fieldName, stringifyJsonObjectsIfNeeded(field.getAsJsonObject(), fieldSchema));
} else { // json array
newJsonObject.add(fieldName, stringifyJsonArraysIfNeeded(field.getAsJsonArray(), fieldSchema));
}
}
}

return newJsonObject;
}


/**
* In a JSONArray stringify all JSONArray and JSONObjects if they are defined as strings in the schema
*
* @param jsonArray the json array
* @param schema the json schema
* @return the processed object
*/
private JsonElement stringifyJsonArraysIfNeeded(JsonArray jsonArray, Schema schema) {
if (!jsonArray.isJsonPrimitive()) {
if (schemaTypeEquals(schema, Schema.Type.STRING)) {
return new JsonPrimitive(jsonArray.toString());
}
} else {
return jsonArray;
}

JsonArray newJsonArray = new JsonArray();

for (JsonElement je : jsonArray) {
if (je.isJsonPrimitive()) {
newJsonArray.add(je);
} else {
Schema componentSchema = getArrayComponentSchema(schema);
if (componentSchema == null) {
LOG.error("Trying to retrieve sub-schema of array " + jsonArray + " in schema but found " + schema);
} else {
if (je.isJsonObject()) {
newJsonArray.add(stringifyJsonObjectsIfNeeded(je.getAsJsonObject(), componentSchema));
} else { // json array
newJsonArray.add(stringifyJsonArraysIfNeeded(je.getAsJsonArray(), componentSchema));
}
}
}
}

return newJsonArray;
}

/**
* Return the ComponentSchema of an ARRAY or an UNION[NULL, ARRAY]
*/
private Schema getArrayComponentSchema(Schema schema) {
if (schema.getType().equals(Schema.Type.ARRAY)) {
return schema.getComponentSchema();
}

if (schema.getType().equals(Schema.Type.UNION)) {
for (Schema unionSchema : schema.getUnionSchemas()) {
if (unionSchema.getType().equals(Schema.Type.ARRAY)) {
return unionSchema.getComponentSchema();
}
}
}

return null;
}

/**
* Return the Schema of a field of a RECORD or an UNION[NULL, RECORD]
*/
private Schema.Field getFieldSchema(Schema schema, String fieldName) {

if (schema.getType().equals(Schema.Type.RECORD)) {
return schema.getField(fieldName);
}

if (schema.getType().equals(Schema.Type.UNION)) {
for (Schema unionSchema : schema.getUnionSchemas()) {
if (unionSchema.getType().equals(Schema.Type.RECORD)) {
return unionSchema.getField(fieldName);
}
}
}

return null;
}

/**
* Return true if Schema is of type 'type' or of type UNION[NULL, 'type']
*/
private boolean schemaTypeEquals(Schema schema, Schema.Type type) {

if (schema.getType().equals(Schema.Type.UNION)) {
for (Schema unionSchema : schema.getUnionSchemas()) {
if (unionSchema.getType().equals(type)) {
return true;
}
}
}

return schema.getType().equals(type);
}


private List<String> getOptionalFields() {
List<String> optionalFields = new ArrayList<>();
List<Schema.Field> allFields = schema.getFields();
Expand Down
30 changes: 30 additions & 0 deletions widgets/HTTP-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@
}
]
}
},
{
"widget-type": "toggle",
"label": "Authorize Parsing of Objects to String",
"name": "authorizeParsingOfObjectToString",
"widget-attributes": {
"default": "false",
"on": {
"label": "True",
"value": "true"
},
"off": {
"label": "False",
"value": "false"
}
}
}
]
},
Expand Down Expand Up @@ -666,6 +682,20 @@
}
]
},
{
"name": "JSON Formatting",
"condition": {
"property": "format",
"operator": "equal to",
"value": "json"
},
"show": [
{
"name": "authorizeParsingOfObjectToString",
"type": "property"
}
]
},
{
"name": "CSV Formatting",
"condition": {
Expand Down
30 changes: 30 additions & 0 deletions widgets/HTTP-streamingsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@
}
]
}
},
{
"widget-type": "toggle",
"label": "Authorize Parsing of Objects to String",
"name": "authorizeParsingOfObjectToString",
"widget-attributes": {
"default": "false",
"on": {
"label": "True",
"value": "true"
},
"off": {
"label": "False",
"value": "false"
}
}
}
]
},
Expand Down Expand Up @@ -666,6 +682,20 @@
}
]
},
{
"name": "JSON Formatting",
"condition": {
"property": "format",
"operator": "equal to",
"value": "json"
},
"show": [
{
"name": "authorizeParsingOfObjectToString",
"type": "property"
}
]
},
{
"name": "CSV Formatting",
"condition": {
Expand Down