Skip to content

Commit

Permalink
fixing performance issue on array binding #217 and execute does not r…
Browse files Browse the repository at this point in the history
…eturn statement #279
  • Loading branch information
sfc-gh-ext-simba-dl committed May 2, 2023
1 parent cbeac34 commit 116a798
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 198 deletions.
92 changes: 43 additions & 49 deletions lib/connection/bind_uploader.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright (c) 2015-2021 Snowflake Computing Inc. All rights reserved.
*/
var Logger = require('../logger');
var Logger = require('../logger');

const Readable = require('stream').Readable;
const fs = require('fs');
Expand Down Expand Up @@ -31,7 +31,7 @@ const CREATE_STAGE_STMT = "CREATE OR REPLACE TEMPORARY STAGE "
function BindUploader(options, services, connectionConfig, requestId)
{
const MAX_BUFFER_SIZE = 1024 * 1024 * 100;

Logger.getInstance().debug('BindUploaders');
this.options = options;
this.services = services;
Expand All @@ -40,30 +40,54 @@ function BindUploader(options, services, connectionConfig, requestId)
this.stagePath = '@' + STAGE_NAME + '/' + requestId;
Logger.getInstance().debug('token = %s', connectionConfig.getToken());

this.tempStageCreated = false;
this.bindData = null;
this.files = [];
this.datas = [];
this.puts = [];
this.createStage = async function () {
var createStageOptions = { sqlText: GetCreateStageStmt() };
var newContext = Statement.createContext(createStageOptions, this.services, this.connectionConfig);
var ret = await Statement.sendRequest(newContext);
if (ret["status"] != 200) {
throw new Error("Failed to create stage");
}
}

this.uploadFilestream = async function (fileName, fileData) {
Logger.getInstance().debug('BindUploaders::uploadFilestream');
var stageName = this.stagePath;
if (stageName == null) {
throw new Error("Stage name is null.");
}
if (fileName == null) {
throw new Error("File name is null.");
}

await new Promise((resolve, reject) => {
var putStmt = "PUT file://" + fileName + "'" + stageName + "' overwrite=true auto_compress=false source_compression=gzip";
var uploadFileOptions = {
sqlText: putStmt, fileStream: fileData,
complete: function (err, stmt, rows) {
if (err) {
Logger.getInstance().debug('err ' + err);
throw err;
}
Logger.getInstance().debug('uploadFiles done ');
resolve(stmt.streamRows());
}
};
Statement.createStatementPreExec(uploadFileOptions, this.services, this.connectionConfig);
});
}

this.Upload = function(bindings)
this.Upload = async function (bindings)
{
Logger.getInstance().debug('BindUploaders::Upload');

if(bindings == null)
return null;

await this.createStage();

var dataRows = new Array();
var startIndex = 0;
var rowNum = 0;
var fileCount = 0;
var strbuffer = "";
var tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tmp'));
if (tmpDir.indexOf('~') != -1 && process.platform === "win32") {
var tmpFolderName = tmpDir.substring(tmpDir.lastIndexOf('\\'));
tmpDir = process.env.USERPROFILE + '\\AppData\\Local\\Temp\\' + tmpFolderName;
}


for(var i=0; i<bindings.length; i++)
{
for(var j=0; j< bindings[i].length; j++)
Expand All @@ -77,44 +101,14 @@ function BindUploader(options, services, connectionConfig, requestId)

if ((strbuffer.length >= MAX_BUFFER_SIZE) || (i == bindings.length -1))
{
var fileName = path.join(tmpDir,(++fileCount).toString());
var fileName = (++fileCount).toString();
Logger.getInstance().debug('fileName=' + fileName);
this.UploadStream(strbuffer, fileName);
await this.uploadFilestream(fileName, strbuffer);
strbuffer = "";
}
}
this.bindData = {files: this.files, datas: this.datas, puts:this.puts};
return this.bindData;
};

this.UploadStream = function(data, fileName)
{
Logger.getInstance().debug('BindUploaders::UploadStream');
var stageName = this.stagePath;
if(stageName == null)
{
throw new Error("Stage name is null.");
}
if(fileName == null)
{
throw new Error("File name is null.");
}

var putStmt = "PUT file://" + fileName + "'" + stageName + "' overwrite=true auto_compress=false source_compression=gzip";
try
{
fs.writeFileSync(fileName, data);
}
catch(e)
{
Logger.getInstance().debug('Failed to write file: %s', fileName);
throw e;
}
this.files.push(fileName);
this.datas.push(data);
this.puts.push(putStmt);
};

this.cvsData = function(data)
{
if(data == null || data.toString() == "")
Expand Down
97 changes: 15 additions & 82 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ exports.createStatementPreExec = function (

if (options.sqlText && (Util.isPutCommand(options.sqlText) || Util.isGetCommand(options.sqlText)))
{
if (options.fileStream) {
context.fileStream = options.fileStream;
options.fileStream = null;
}
return createFileStatementPreExec(
options, context, services, connectionConfig);
}
Expand All @@ -93,26 +97,19 @@ exports.createStatementPreExec = function (

// check array binding,
if(numBinds > threshold)
{
var bindUploaderRequestId = uuidv4();
var bind = new Bind.BindUploader(options, services, connectionConfig, bindUploaderRequestId);
var bindData;
{
try
{
bindData = bind.Upload(context.binds);
return new Promise((resolve, reject) => {
resolve(handleArrayBinding(options, services, connectionConfig, context));
});
}
catch(e)
{
Logger.getInstance().debug('bind upload error, use normal binding');
return createRowStatementPreExec(
options, context, services, connectionConfig);
}
if(bindData != null)
{
context.bindStage = Bind.GetStageName(bindUploaderRequestId);
Logger.getInstance().debug('context.bindStage = %s', context.bindStage);
return createStage(services, connectionConfig, bindData, options, context);
}
}
else
{
Expand All @@ -121,79 +118,15 @@ exports.createStatementPreExec = function (
}
};

function createStage(services, connectionConfig, bindData, options, context)
{
Logger.getInstance().debug('createStage');
var createStageOpt = {sqlText:Bind.GetCreateStageStmt(),
complete: function (err, stmt, rows)
{
Logger.getInstance().debug('stream');
Logger.getInstance().debug('err '+err);
if(err)
{
context.bindStage = null;
return createRowStatementPreExec(
options, context, services, connectionConfig);
}
else
{
var stream = stmt.streamRows();
stream.on('data', function (rows)
{
Logger.getInstance().debug('stream on data');
return uploadFiles(services, connectionConfig, bindData, options, context);
});
}
}
}
Logger.getInstance().debug('CREATE_STAGE_STMT = %s', Bind.GetCreateStageStmt());
exports.createStatementPreExec(createStageOpt, services, connectionConfig);
}

function uploadFiles(services, connectionConfig, bindData, options, context, curIndex = 0)
{
Logger.getInstance().debug('uploadFiles %d, %s', curIndex, context.bindStage);
if (curIndex < bindData.files.length) {
Logger.getInstance().debug('Put=' + bindData.puts[curIndex]);
var fileOpt = {
sqlText: bindData.puts[curIndex],
complete: function (err, stmt, rows) {
if (err) {
return createRowStatementPreExec(
options, context, services, connectionConfig);
}
Logger.getInstance().debug('uploadFiles done ');
var stream = stmt.streamRows();
stream.on('data', function (rows) {
Logger.getInstance().debug('stream on data');
});
stream.on('end', function (rows) {
Logger.getInstance().debug('stream on end ');
curIndex++;
if (curIndex < bindData.files.length) {
uploadFiles(services, connectionConfig, bindData, options, context, curIndex);
}
else {
Logger.getInstance().debug('all completed');
cleanTempFiles(bindData);
return createRowStatementPreExec(
options, context, services, connectionConfig);
}
});
}
}
exports.createStatementPreExec(fileOpt, services, connectionConfig);
}
async function handleArrayBinding(options, services, connectionConfig, context) {
var bindUploaderRequestId = uuidv4();
var bind = new Bind.BindUploader(options, services, connectionConfig, bindUploaderRequestId);
context.bindStage = Bind.GetStageName(bindUploaderRequestId);
await bind.Upload(context.binds);
return createRowStatementPreExec(
options, context, services, connectionConfig);
}

function cleanTempFiles(bindData)
{
for(var i=0; i<bindData.files.length; i++)
{
Logger.getInstance().debug('Clean File='+bindData.files[i]);
Bind.CleanFile(bindData.files[i]);
}
}
/**
* Executes a statement and returns a statement object that can be used to fetch
* its result.
Expand Down
20 changes: 16 additions & 4 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,22 @@ function azure_util(azure, filestream)
*
* @returns {null}
*/
this.uploadFile = async function (dataFile, meta, encryptionMetadata, maxConcurrency)
{
this.uploadFile = async function (dataFile, meta, encryptionMetadata, maxConcurrency) {
var fileStream = fs.readFileSync(dataFile);
this.uploadFileStream(fileStream, meta, encryptionMetadata, maxConcurrency);
}

/**
* Create the file metadata then upload the file stream.
*
* @param {String} fileStream
* @param {Object} meta
* @param {Object} encryptionMetadata
* @param {Number} maxConcurrency
*
* @returns {null}
*/
this.uploadFileStream = async function (fileStream, meta, encryptionMetadata, maxConcurrency) {
var azureMetadata = {
'sfcdigest': meta['SHA256_DIGEST']
};
Expand Down Expand Up @@ -203,8 +217,6 @@ function azure_util(azure, filestream)
var containerClient = client.getContainerClient(azureLocation.containerName);
var blockBlobClient = containerClient.getBlockBlobClient(blobName);

var fileStream = fs.readFileSync(dataFile);

try
{
await blockBlobClient.upload(fileStream, fileStream.length, {
Expand Down
52 changes: 52 additions & 0 deletions lib/file_transfer_agent/encrypt_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,58 @@ function encrypt_util(encrypt, filestream, temp)
return newMatDesc;
}

/**
* Encrypt file stream using AES algorithm.
*
* @param {Object} encryptionMaterial
* @param {String} fileStream
* @param {String} tmpDir
* @param {Number} chunkSize
*
* @returns {Object}
*/
this.encryptFileStream = async function (encryptionMaterial, fileStream,
tmpDir = null, chunkSize = blockSize * 4 * 1024)
{
// Get decoded key from base64 encoded value
var decodedKey = Buffer.from(encryptionMaterial[QUERY_STAGE_MASTER_KEY], BASE64);
var keySize = decodedKey.length;

// Get secure random bytes with block size
var ivData = getSecureRandom(blockSize);
var fileKey = getSecureRandom(blockSize);

// Create cipher with file key, AES CBC, and iv data
var cipher = crypto.createCipheriv(AES_CBC, fileKey, ivData);
var encrypted = cipher.update(fileStream);
var final = cipher.final();
var encryptedData = Buffer.concat([encrypted, final]);

// Create key cipher with decoded key and AES ECB
cipher = crypto.createCipheriv(AES_ECB, decodedKey, null);

// Encrypt with file key
var encKek = Buffer.concat([
cipher.update(fileKey),
cipher.final()
]);

var matDesc = MaterialDescriptor(
encryptionMaterial.smkId,
encryptionMaterial.queryId,
keySize * 8
);

var metadata = EncryptionMetadata(
encKek.toString(BASE64),
ivData.toString(BASE64),
matDescToUnicode(matDesc)
);
return {
encryptionMetadata: metadata,
dataStream: encryptedData
}
}
/**
* Encrypt file using AES algorithm.
*
Expand Down
Loading

0 comments on commit 116a798

Please sign in to comment.