Skip to content

Commit

Permalink
Merge pull request #506 from cloudsufi/oracle-case-issue-1.11
Browse files Browse the repository at this point in the history
[🍒][PLUGIN-1793]Fixed case issue and add e2e test case for small case schema.
  • Loading branch information
vikasrathee-cs authored Jul 5, 2024
2 parents b52d928 + be384f0 commit 5250465
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 6 deletions.
50 changes: 50 additions & 0 deletions oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,53 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table

@BQ_SOURCE_TEST_SMALL_CASE @ORACLE_TEST_TABLE
Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is coming in small case
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Oracle" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Oracle" to establish connection
Then Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "bqOutputDatatypesSchemaSmallCase"
Then Validate "BigQuery" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Oracle"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Validate "Oracle" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Oracle sink
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case
13 changes: 8 additions & 5 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ public static boolean validateDBToBQRecordValues(String schema, String sourceTab
ResultSet.HOLD_CURSORS_OVER_COMMIT);

ResultSet rsSource = statement1.executeQuery(getSourceQuery);
return compareResultSetAndJsonData(rsSource, jsonResponse);
return compareResultSetAndJsonData(rsSource, jsonResponse, false);
}
}

public static boolean validateBQToDBRecordValues(String schema, String sourceTable, String targetTable)
public static boolean validateBQToDBRecordValues(String schema, String sourceTable, String targetTable,
boolean isSchemaSmallCase)
throws SQLException, ClassNotFoundException, ParseException, IOException, InterruptedException {
List<JsonObject> jsonResponse = new ArrayList<>();
List<Object> bigQueryRows = new ArrayList<>();
Expand All @@ -88,7 +89,7 @@ public static boolean validateBQToDBRecordValues(String schema, String sourceTab
ResultSet.HOLD_CURSORS_OVER_COMMIT);

ResultSet rsTarget = statement1.executeQuery(getTargetQuery);
return compareResultSetAndJsonData(rsTarget, jsonResponse);
return compareResultSetAndJsonData(rsTarget, jsonResponse, isSchemaSmallCase);
}
}

Expand Down Expand Up @@ -119,7 +120,8 @@ private static void getBigQueryTableData(String table, List<Object> bigQueryRows
* @throws ParseException If an error occurs while parsing the data.
*/

public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonObject> bigQueryData)
public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonObject> bigQueryData,
boolean isSchemaSmallCase)
throws SQLException, ParseException {
ResultSetMetaData mdSource = rsSource.getMetaData();
boolean result = false;
Expand All @@ -146,7 +148,8 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonO
while (currentColumnCount <= columnCountSource) {
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
int columnType = mdSource.getColumnType(currentColumnCount);
String columnName = mdSource.getColumnName(currentColumnCount);
String columnName = isSchemaSmallCase ? mdSource.getColumnName(currentColumnCount).toLowerCase() :
mdSource.getColumnName(currentColumnCount);
// Perform different comparisons based on column type
switch (columnType) {
// Since we skip BFILE in Oracle Sink, we are not comparing the BFILE source and sink values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,18 @@ private static void createSourceBQTableWithQueries(String bqCreateTableQueryFile
PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully");
}

@Before(order = 1, value = "@BQ_SOURCE_TEST_SMALL_CASE")
public static void createTempSourceBQTableSmallCase() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("CreateBQTableQueryFileSmallCase"),
PluginPropertyUtils.pluginProp("InsertBQDataQueryFileSmallCase"));
}

@After(order = 1, value = "@BQ_SOURCE_TEST_SMALL_CASE")
public static void deleteTempSourceBQTableSmallCase() throws IOException, InterruptedException {
String bqSourceTable = PluginPropertyUtils.pluginProp("bqSourceTable");
BigQueryClient.dropBqQuery(bqSourceTable);
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,26 @@ public void validateTheValuesOfRecordsTransferredToTargetOracleTableIsEqualToThe

boolean recordsMatched = BQValidation.validateBQToDBRecordValues(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("bqSourceTable"),
PluginPropertyUtils.pluginProp("targetTable"));
PluginPropertyUtils.pluginProp("targetTable"),
false);
Assert.assertTrue("Value of records transferred to the target table should be equal to the value " +
"of the records in the source table", recordsMatched);
}

@Then("Validate the values of records transferred to target Oracle table is equal to the values from source " +
"BigQuery table with case")
public void
validateTheValuesOfRecordsTransferredToTargetOracleTableIsEqualToTheValuesFromSourceBigQueryTableWithCase()
throws IOException, InterruptedException, IOException, SQLException, ClassNotFoundException, ParseException {
int sourceBQRecordsCount = BigQueryClient.countBqQuery(PluginPropertyUtils.pluginProp("bqSourceTable"));
BeforeActions.scenario.write("No of Records from source BigQuery table:" + sourceBQRecordsCount);
Assert.assertEquals("Out records should match with target Oracle table records count",
CdfPipelineRunAction.getCountDisplayedOnSourcePluginAsRecordsOut(), sourceBQRecordsCount);

boolean recordsMatched = BQValidation.validateBQToDBRecordValues(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("bqSourceTable"),
PluginPropertyUtils.pluginProp("targetTable"),
true);
Assert.assertTrue("Value of records transferred to the target table should be equal to the value " +
"of the records in the source table", recordsMatched);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ importQuery=where $CONDITIONS
projectId=cdf-athena
dataset=test_automation
bqOutputDatatypesSchema=[{"key":"ID","value":"decimal"},{"key":"LASTNAME","value":"string"}]
bqOutputDatatypesSchemaSmallCase=[{"key":"id","value":"decimal"},{"key":"lastname","value":"string"}]
jdbcUrl=jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=%s;OAuthType=3;

#bq macro properties
Expand All @@ -107,6 +108,9 @@ bqUpdateTableSchema=true
#bq queries file path
CreateBQTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt
InsertBQDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt
#bq queries file path for Small Case Schema
CreateBQTableQueryFileSmallCase=testdata/BigQuery/BigQueryCreateTableQuerySmallCase.txt
InsertBQDataQueryFileSmallCase=testdata/BigQuery/BigQueryInsertDataQuerySmallCase.txt

#ORACLE Datatypes
bigQueryColumns=(COL23 FLOAT(4), COL28 TIMESTAMP, COL29 TIMESTAMP(9), COL30 TIMESTAMP WITH TIME ZONE, \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table `DATASET.TABLE_NAME` (id NUMERIC, lastname STRING)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
insert into `DATASET.TABLE_NAME` (id, lastname) values
(1,'Shelby'),
(2,'Simpson'),
(3,'Williams'),
(4,'Sherry'),
(5,'James');
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package io.cdap.plugin.oracle;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.db.ColumnType;
import io.cdap.plugin.db.SchemaReader;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

/**
Expand All @@ -37,4 +40,14 @@ public OracleSinkDBRecord(StructuredRecord record, List<ColumnType> columnTypes)
protected SchemaReader getSchemaReader() {
return new OracleSinkSchemaReader();
}

@Override
protected void insertOperation(PreparedStatement stmt) throws SQLException {
for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) {
ColumnType columnType = columnTypes.get(fieldIndex);
// Get the field from the schema using the column name with ignoring case.
Schema.Field field = record.getSchema().getField(columnType.getName(), true);
writeToDB(stmt, field, fieldIndex);
}
}
}

0 comments on commit 5250465

Please sign in to comment.