Skip to content

Commit

Permalink
Adds "copy to parquet" api
Browse files Browse the repository at this point in the history
We internally execute COPY TO command via parquet dest receiver to write parquet file from the table or query.
  • Loading branch information
aykut-bozkurt committed Oct 2, 2024
1 parent 3627d3d commit b71a602
Show file tree
Hide file tree
Showing 3 changed files with 479 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_utils;
184 changes: 184 additions & 0 deletions src/parquet_copy_hook/copy_to.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use pgrx::{
is_a,
pg_sys::{
self, makeRangeVar, pg_analyze_and_rewrite_fixedparams, pg_plan_query, CommandTag,
CopyStmt, CreateNewPortal, DestReceiver, GetActiveSnapshot, NodeTag::T_CopyStmt,
PlannedStmt, PortalDefineQuery, PortalDrop, PortalRun, PortalStart, QueryCompletion,
RawStmt, CURSOR_OPT_PARALLEL_OK,
},
AllocatedByRust, PgBox, PgList, PgRelation,
};

use crate::parquet_copy_hook::copy_utils::{
copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid,
};

// execute_copy_to_with_dest_receiver executes a COPY TO statement with our custom DestReceiver
// for writing to Parquet files.
// - converts the table relation to a SELECT statement if necessary
// - analyzes and rewrites the raw query
// - plans the rewritten query
// - creates a portal for the planned query by using the custom DestReceiver
// - executes the query with the portal
pub(crate) fn execute_copy_to_with_dest_receiver(
pstmt: &PgBox<pg_sys::PlannedStmt>,
query_string: &core::ffi::CStr,
params: PgBox<pg_sys::ParamListInfoData>,
query_env: PgBox<pg_sys::QueryEnvironment>,
parquet_dest: PgBox<DestReceiver>,
) -> u64 {
unsafe {
debug_assert!(is_a(pstmt.utilityStmt, T_CopyStmt));
let copy_stmt = PgBox::<CopyStmt>::from_pg(pstmt.utilityStmt as _);

let mut relation = PgRelation::from_pg(std::ptr::null_mut());

if copy_stmt_has_relation(pstmt) {
let rel_oid = copy_stmt_relation_oid(pstmt);
let lock_mode = copy_stmt_lock_mode(pstmt);
relation = PgRelation::with_lock(rel_oid, lock_mode);
}

let raw_query = prepare_copy_to_raw_stmt(pstmt, &copy_stmt, &relation);

let rewritten_queries = pg_analyze_and_rewrite_fixedparams(
raw_query.as_ptr(),
query_string.as_ptr(),
std::ptr::null_mut(),
0,
query_env.as_ptr(),
);

let query = PgList::from_pg(rewritten_queries)
.pop()
.expect("rewritten query is empty");

let plan = pg_plan_query(
query,
std::ptr::null(),
CURSOR_OPT_PARALLEL_OK as _,
params.as_ptr(),
);

let portal = CreateNewPortal();
let mut portal = PgBox::from_pg(portal);
portal.visible = false;

let mut plans = PgList::<PlannedStmt>::new();
plans.push(plan);

PortalDefineQuery(
portal.as_ptr(),
std::ptr::null(),
query_string.as_ptr(),
CommandTag::CMDTAG_COPY,
plans.as_ptr(),
std::ptr::null_mut(),
);

PortalStart(portal.as_ptr(), params.as_ptr(), 0, GetActiveSnapshot());

let mut completion_tag = QueryCompletion {
commandTag: CommandTag::CMDTAG_COPY,
nprocessed: 0,
};

PortalRun(
portal.as_ptr(),
i64::MAX,
false,
true,
parquet_dest.as_ptr(),
parquet_dest.as_ptr(),
&mut completion_tag as _,
);

PortalDrop(portal.as_ptr(), false);

completion_tag.nprocessed
}
}

fn prepare_copy_to_raw_stmt(
pstmt: &PgBox<pg_sys::PlannedStmt>,
copy_stmt: &PgBox<CopyStmt>,
relation: &PgRelation,
) -> PgBox<RawStmt, AllocatedByRust> {
let mut raw_query = unsafe { PgBox::<pg_sys::RawStmt>::alloc_node(pg_sys::NodeTag::T_RawStmt) };
raw_query.stmt_location = pstmt.stmt_location;
raw_query.stmt_len = pstmt.stmt_len;

if relation.is_null() {
raw_query.stmt = copy_stmt.query;
} else {
// convert relation to query
let mut target_list = PgList::new();

if copy_stmt.attlist.is_null() {
// SELECT * FROM relation
let mut col_ref =
unsafe { PgBox::<pg_sys::ColumnRef>::alloc_node(pg_sys::NodeTag::T_ColumnRef) };
let a_star = unsafe { PgBox::<pg_sys::A_Star>::alloc_node(pg_sys::NodeTag::T_A_Star) };

let mut field_list = PgList::new();
field_list.push(a_star.into_pg());

col_ref.fields = field_list.into_pg();
col_ref.location = -1;

let mut target =
unsafe { PgBox::<pg_sys::ResTarget>::alloc_node(pg_sys::NodeTag::T_ResTarget) };
target.name = std::ptr::null_mut();
target.indirection = std::ptr::null_mut();
target.val = col_ref.into_pg() as _;
target.location = -1;

target_list.push(target.into_pg());
} else {
// SELECT a,b,... FROM relation
let attlist = unsafe { PgList::<*mut i8>::from_pg(copy_stmt.attlist) };
for attname in attlist.iter_ptr() {
let mut col_ref =
unsafe { PgBox::<pg_sys::ColumnRef>::alloc_node(pg_sys::NodeTag::T_ColumnRef) };

let mut field_list = PgList::new();
field_list.push(unsafe { *attname });

col_ref.fields = field_list.into_pg();
col_ref.location = -1;

let mut target =
unsafe { PgBox::<pg_sys::ResTarget>::alloc_node(pg_sys::NodeTag::T_ResTarget) };
target.name = std::ptr::null_mut();
target.indirection = std::ptr::null_mut();
target.val = col_ref.into_pg() as _;
target.location = -1;

target_list.push(target.into_pg());
}
}

let from = unsafe {
makeRangeVar(
relation.namespace().as_ptr() as _,
relation.name().as_ptr() as _,
-1,
)
};
let mut from = unsafe { PgBox::from_pg(from) };
from.inh = false;

let mut select_stmt =
unsafe { PgBox::<pg_sys::SelectStmt>::alloc_node(pg_sys::NodeTag::T_SelectStmt) };

select_stmt.targetList = target_list.into_pg();

let mut from_list = PgList::new();
from_list.push(from.into_pg());
select_stmt.fromClause = from_list.into_pg();

raw_query.stmt = select_stmt.into_pg() as _;
}

raw_query
}
Loading

0 comments on commit b71a602

Please sign in to comment.