Skip to content

Commit

Permalink
Convert JSON data to string when target attribute is varchar
Browse files Browse the repository at this point in the history
  • Loading branch information
nassibnassar committed Oct 4, 2022
1 parent 14a2ede commit 161c150
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions src/stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ static void end_copy_batch(const ldp_options& opt, ldp_log* lg,
buffer->clear();
}

static void json_value_to_string(const json::Value& val, string* strval)
{
if (val.IsNull()) {
*strval = "";
return;
}
if (val.IsBool()) {
if (val.GetBool()) {
*strval = "true";
} else {
*strval = "false";
}
return;
}
if (val.IsString()) {
*strval = val.GetString();
return;
}
*strval = "INVALID";
}

static void writeTuple(const ldp_options& opt, ldp_log* lg, const dbtype& dbt,
const table_schema& table, const json::Document& doc,
size_t* record_count, size_t* total_record_count, string* copy_buffer)
Expand Down Expand Up @@ -325,7 +346,10 @@ static void writeTuple(const ldp_options& opt, ldp_log* lg, const dbtype& dbt,
case column_type::id:
case column_type::timestamptz:
case column_type::varchar:
dbt.encode_copy(jsonValue.GetString(), &s);
string strval;
json_value_to_string(jsonValue, &strval);
dbt.encode_copy(strval.data(), &s);

// Check if varchar exceeds maximum string length.
if (s.length() >= varchar_size - 1) {
lg->write(log_level::warning, "", "",
Expand Down Expand Up @@ -710,6 +734,16 @@ void index_loaded_table(ldp_log* lg, const table_schema& table, etymon::pgconn*
}
}

const unsigned int minimum_varchar_size = 8;

static unsigned int min_varchar_size(unsigned int varchar_size)
{
if (varchar_size < minimum_varchar_size) {
return minimum_varchar_size;
}
return varchar_size;
}

static void create_loading_table(const ldp_options& opt, ldp_log* lg,
const table_schema& table,
etymon::pgconn* conn, const dbtype& dbt, vector<string>* users)
Expand All @@ -734,7 +768,7 @@ static void create_loading_table(const ldp_options& opt, ldp_log* lg,
sql += column.name;
sql += "\" ";
if (column.type == column_type::varchar)
column_type = "VARCHAR(" + to_string(column.length) + ")";
column_type = "VARCHAR(" + to_string(min_varchar_size(column.length)) + ")";
else
column_schema::type_to_string(column.type, &column_type);
sql += column_type;
Expand Down

0 comments on commit 161c150

Please sign in to comment.