Skip to content

Commit

Permalink
[Feature] Support azure adls2 in shared-data mode
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Dec 17, 2024
1 parent 70f7704 commit 47374e6
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 15 deletions.
4 changes: 3 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ if ("${USE_STAROS}" STREQUAL "ON")
find_package(azure-storage-common-cpp CONFIG REQUIRED)
set(azure-storage-blobs-cpp_DIR "${STARLET_INSTALL_DIR}/third_party/share/azure-storage-blobs-cpp" CACHE PATH "azure storage blobs search path")
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
set(AZURE_SDK_LIB Azure::azure-identity Azure::azure-storage-blobs Azure::azure-core Azure::azure-storage-common)
set(azure-storage-files-datalake-cpp_DIR "${STARLET_INSTALL_DIR}/third_party/share/azure-storage-files-datalake-cpp" CACHE PATH "azure storage adls2 search path")
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
set(AZURE_SDK_LIB Azure::azure-identity Azure::azure-storage-blobs Azure::azure-storage-files-datalake Azure::azure-core Azure::azure-storage-common)

set(starlet_DIR "${STARLET_INSTALL_DIR}/starlet_install/lib/cmake" CACHE PATH "starlet search path")
find_package(starlet CONFIG REQUIRED)
Expand Down
3 changes: 3 additions & 0 deletions be/src/service/staros_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ absl::StatusOr<std::string> StarOSWorker::build_scheme_from_shard_info(const Sha
case staros::FileStoreType::AZBLOB:
scheme = "azblob://";
break;
case staros::FileStoreType::ADLS2:
scheme = "adls2://";
break;
default:
return absl::InvalidArgumentError("Unknown shard storage scheme!");
}
Expand Down
2 changes: 1 addition & 1 deletion fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ under the License.
<iceberg.version>1.6.0</iceberg.version>
<paimon.version>0.8.2</paimon.version>
<delta-kernel.version>4.0.0rc1</delta-kernel.version>
<staros.version>3.4-rc1</staros.version>
<staros.version>3.4-rc2-SNAPSHOT</staros.version>
<python>python</python>
</properties>

Expand Down
15 changes: 13 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,8 @@ public class Config extends ConfigBase {
@ConfField
public static boolean enable_load_volume_from_conf = true;
// remote storage related configuration
@ConfField(comment = "storage type for cloud native table. Available options: \"S3\", \"HDFS\", \"AZBLOB\". case-insensitive")
@ConfField(comment = "storage type for cloud native table. Available options: " +
"\"S3\", \"HDFS\", \"AZBLOB\", \"ADLS2\". case-insensitive")
public static String cloud_native_storage_type = "S3";

// HDFS storage configuration
Expand Down Expand Up @@ -2611,11 +2612,21 @@ public class Config extends ConfigBase {
public static String azure_blob_endpoint = "";
@ConfField
public static String azure_blob_path = "";

@ConfField
public static String azure_blob_shared_key = "";
@ConfField
public static String azure_blob_sas_token = "";

// azure adls2
@ConfField
public static String azure_adls2_endpoint = "";
@ConfField
public static String azure_adls2_path = "";
@ConfField
public static String azure_adls2_shared_key = "";
@ConfField
public static String azure_adls2_sas_token = "";

@ConfField(mutable = true)
public static int starmgr_grpc_timeout_seconds = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS1_OAUTH2_CREDENTIAL;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS1_OAUTH2_ENDPOINT;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS1_USE_MANAGED_SERVICE_IDENTITY;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_ENDPOINT;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_OAUTH2_CLIENT_ENDPOINT;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_OAUTH2_CLIENT_ID;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_OAUTH2_CLIENT_SECRET;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_OAUTH2_TENANT_ID;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_OAUTH2_USE_MANAGED_IDENTITY;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_SAS_TOKEN;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_SHARED_KEY;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_ADLS2_STORAGE_ACCOUNT;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AZURE_BLOB_CONTAINER;
Expand Down Expand Up @@ -77,11 +79,13 @@ public CloudConfiguration build(Map<String, String> properties) {

// Try to build azure data lake gen2
AzureADLS2CloudCredential adls2 = new AzureADLS2CloudCredential(
properties.getOrDefault(AZURE_ADLS2_ENDPOINT, ""),
Boolean.parseBoolean(properties.getOrDefault(AZURE_ADLS2_OAUTH2_USE_MANAGED_IDENTITY, "false")),
properties.getOrDefault(AZURE_ADLS2_OAUTH2_TENANT_ID, ""),
properties.getOrDefault(AZURE_ADLS2_OAUTH2_CLIENT_ID, ""),
properties.getOrDefault(AZURE_ADLS2_STORAGE_ACCOUNT, storageAccount),
properties.getOrDefault(AZURE_ADLS2_SHARED_KEY, ""),
properties.getOrDefault(AZURE_ADLS2_SAS_TOKEN, ""),
properties.getOrDefault(AZURE_ADLS2_OAUTH2_CLIENT_SECRET, ""),
properties.getOrDefault(AZURE_ADLS2_OAUTH2_CLIENT_ENDPOINT, "")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.starrocks.credential.azure;

import com.google.common.base.Preconditions;
import com.staros.proto.ADLS2CredentialInfo;
import com.staros.proto.ADLS2FileStoreInfo;
import com.staros.proto.AzBlobCredentialInfo;
import com.staros.proto.AzBlobFileStoreInfo;
import com.staros.proto.FileStoreInfo;
Expand Down Expand Up @@ -183,29 +185,35 @@ public FileStoreInfo toFileStoreInfo() {
}

class AzureADLS2CloudCredential extends AzureStorageCloudCredential {
private final String endpoint;
private final boolean oauth2ManagedIdentity;
private final String oauth2TenantId;
private final String oauth2ClientId;
private final String storageAccount;
private final String sharedKey;
private final String sasToken;
private final String oauth2ClientSecret;
private final String oauth2ClientEndpoint;

public AzureADLS2CloudCredential(boolean oauth2ManagedIdentity, String oauth2TenantId, String oauth2ClientId,
String storageAccount, String sharedKey, String oauth2ClientSecret,
public AzureADLS2CloudCredential(String endpoint, boolean oauth2ManagedIdentity, String oauth2TenantId, String oauth2ClientId,
String storageAccount, String sharedKey, String sasToken, String oauth2ClientSecret,
String oauth2ClientEndpoint) {
Preconditions.checkNotNull(endpoint);
Preconditions.checkNotNull(oauth2TenantId);
Preconditions.checkNotNull(oauth2ClientId);
Preconditions.checkNotNull(storageAccount);
Preconditions.checkNotNull(sharedKey);
Preconditions.checkNotNull(sasToken);
Preconditions.checkNotNull(oauth2ClientSecret);
Preconditions.checkNotNull(oauth2ClientEndpoint);

this.endpoint = endpoint;
this.oauth2ManagedIdentity = oauth2ManagedIdentity;
this.oauth2TenantId = oauth2TenantId;
this.oauth2ClientId = oauth2ClientId;
this.storageAccount = storageAccount;
this.sharedKey = sharedKey;
this.sasToken = sasToken;
this.oauth2ClientSecret = oauth2ClientSecret;
this.oauth2ClientEndpoint = oauth2ClientEndpoint;

Expand All @@ -214,6 +222,27 @@ public AzureADLS2CloudCredential(boolean oauth2ManagedIdentity, String oauth2Ten

@Override
void tryGenerateConfigurationMap() {
if (!endpoint.isEmpty()) {
// If user specific endpoint, they don't need to specific storage account anymore
// Like if user is using Azurite, they need to specific endpoint
if (!sharedKey.isEmpty()) {
generatedConfigurationMap.put(
String.format("fs.azure.account.auth.type.%s", endpoint),
"SharedKey");
generatedConfigurationMap.put(
String.format("fs.azure.account.key.%s", endpoint),
sharedKey);
} else if (!sasToken.isEmpty()) {
generatedConfigurationMap.put(
String.format("fs.azure.account.auth.type.%s", endpoint),
"SAS");
generatedConfigurationMap.put(
String.format("fs.azure.sas.fixed.token.%s", endpoint),
sasToken);
}
return;
}

if (oauth2ManagedIdentity && !oauth2TenantId.isEmpty() && !oauth2ClientId.isEmpty()) {
generatedConfigurationMap.put(createConfigKey(ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME),
"OAuth");
Expand All @@ -232,6 +261,13 @@ void tryGenerateConfigurationMap() {
generatedConfigurationMap.put(
String.format("fs.azure.account.key.%s.dfs.core.windows.net", storageAccount),
sharedKey);
} else if (!storageAccount.isEmpty() && !sasToken.isEmpty()) {
generatedConfigurationMap.put(
String.format("fs.azure.account.auth.type.%s.dfs.core.windows.net", storageAccount),
"SAS");
generatedConfigurationMap.put(
String.format("fs.azure.sas.fixed.token.%s.dfs.core.windows.net", storageAccount),
sasToken);
} else if (!oauth2ClientId.isEmpty() && !oauth2ClientSecret.isEmpty() &&
!oauth2ClientEndpoint.isEmpty()) {
generatedConfigurationMap.put(createConfigKey(ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME),
Expand Down Expand Up @@ -263,8 +299,16 @@ public String toCredString() {

@Override
public FileStoreInfo toFileStoreInfo() {
// TODO: Support azure credential
return null;
FileStoreInfo.Builder fileStore = FileStoreInfo.newBuilder();
fileStore.setFsType(FileStoreType.ADLS2);
ADLS2FileStoreInfo.Builder adls2FileStoreInfo = ADLS2FileStoreInfo.newBuilder();
adls2FileStoreInfo.setEndpoint(endpoint);
ADLS2CredentialInfo.Builder adls2CredentialInfo = ADLS2CredentialInfo.newBuilder();
adls2CredentialInfo.setSharedKey(sharedKey);
adls2CredentialInfo.setSasToken(sasToken);
adls2FileStoreInfo.setCredential(adls2CredentialInfo.build());
fileStore.setAdls2FsInfo(adls2FileStoreInfo.build());
return fileStore.build();
}

// Create Hadoop configuration key for specific storage account, if storage account is not set, means this property
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.staros.proto.QuitMetaGroupInfo;
import com.staros.proto.ReplicaInfo;
import com.staros.proto.ReplicaRole;
import com.staros.proto.ReplicationType;
import com.staros.proto.ServiceInfo;
import com.staros.proto.ShardGroupInfo;
import com.staros.proto.ShardInfo;
Expand Down Expand Up @@ -744,7 +745,7 @@ public long createWorkerGroup(String size, int replicaNumber) throws DdlExceptio
WorkerGroupDetailInfo result = null;
try {
result = client.createWorkerGroup(serviceId, owner, spec, Collections.emptyMap(),
Collections.emptyMap(), replicaNumber);
Collections.emptyMap(), replicaNumber, ReplicationType.NO_REPLICATION);
} catch (StarClientException e) {
LOG.warn("Failed to create worker group. error: {}", e.getMessage());
throw new DdlException("Failed to create worker group. error: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,16 @@ public void validateStorageVolumeConfig() throws InvalidConfException {
// validate azure_blob_path configuration
normalizeConfigPath(Config.azure_blob_path, "azblob", "Config.azure_blob_path", true);
break;
case "adls2":
if (Config.azure_adls2_endpoint.isEmpty()) {
throw new InvalidConfException("The configuration item \"azure_adls2_endpoint\" is empty.");
}
// validate azure_adls2_path configuration
normalizeConfigPath(Config.azure_adls2_path, "adls2", "Config.azure_adls2_path", true);
break;
default:
throw new InvalidConfException(String.format(
"The configuration item \"cloud_native_storage_type = %s\" is invalid, must be HDFS or S3 or AZBLOB.",
"The configuration item \"cloud_native_storage_type = %s\" is invalid, must be HDFS S3 AZBLOB or ADLS2.",
Config.cloud_native_storage_type));
}
}
Expand Down Expand Up @@ -430,6 +437,10 @@ public static List<String> parseLocationsFromConfig() throws InvalidConfExceptio
uri = normalizeConfigPath(Config.azure_blob_path, "azblob", "Config.azure_blob_path", true);
locations.add(uri.toString());
break;
case "adls2":
uri = normalizeConfigPath(Config.azure_adls2_path, "adls2", "Config.azure_adls2_path", true);
locations.add(uri.toString());
break;
default:
return locations;
}
Expand Down Expand Up @@ -459,6 +470,11 @@ private Map<String, String> parseParamsFromConfig() {
params.put(CloudConfigurationConstants.AZURE_BLOB_SAS_TOKEN, Config.azure_blob_sas_token);
params.put(CloudConfigurationConstants.AZURE_BLOB_ENDPOINT, Config.azure_blob_endpoint);
break;
case "adls2":
params.put(CloudConfigurationConstants.AZURE_ADLS2_SHARED_KEY, Config.azure_adls2_shared_key);
params.put(CloudConfigurationConstants.AZURE_ADLS2_SAS_TOKEN, Config.azure_adls2_sas_token);
params.put(CloudConfigurationConstants.AZURE_ADLS2_ENDPOINT, Config.azure_adls2_endpoint);
break;
default:
return params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable

private static final String AZBLOB = "azblob";

private static final String ADLS2 = "adls2";

private static final String HDFS = "hdfs";

@SerializedName("defaultSVId")
Expand Down Expand Up @@ -340,6 +342,7 @@ private void validateLocations(String svType, List<String> locations) throws Ddl
switch (svType.toLowerCase()) {
case S3:
case AZBLOB:
case ADLS2:
if (!scheme.equalsIgnoreCase(svType)) {
throw new DdlException("Invalid location " + location);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.staros.proto.ADLS2CredentialInfo;
import com.staros.proto.ADLS2FileStoreInfo;
import com.staros.proto.AwsCredentialInfo;
import com.staros.proto.AzBlobCredentialInfo;
import com.staros.proto.AzBlobFileStoreInfo;
Expand Down Expand Up @@ -52,7 +54,8 @@ public enum StorageVolumeType {
UNKNOWN,
S3,
HDFS,
AZBLOB
AZBLOB,
ADLS2
}

// Without id, the scenario like "create storage volume 'a', drop storage volume 'a', create storage volume 'a'"
Expand Down Expand Up @@ -190,6 +193,8 @@ private StorageVolumeType toStorageVolumeType(String svt) {
return StorageVolumeType.HDFS;
case "azblob":
return StorageVolumeType.AZBLOB;
case "adls2":
return StorageVolumeType.ADLS2;
default:
return StorageVolumeType.UNKNOWN;
}
Expand All @@ -203,6 +208,8 @@ private boolean isValidCloudConfiguration() {
return cloudConfiguration.getCloudType() == CloudType.HDFS;
case AZBLOB:
return cloudConfiguration.getCloudType() == CloudType.AZURE;
case ADLS2:
return cloudConfiguration.getCloudType() == CloudType.AZURE;
default:
return false;
}
Expand All @@ -213,6 +220,8 @@ public static void addMaskForCredential(Map<String, String> params) {
params.computeIfPresent(CloudConfigurationConstants.AWS_S3_SECRET_KEY, (key, value) -> CREDENTIAL_MASK);
params.computeIfPresent(CloudConfigurationConstants.AZURE_BLOB_SHARED_KEY, (key, value) -> CREDENTIAL_MASK);
params.computeIfPresent(CloudConfigurationConstants.AZURE_BLOB_SAS_TOKEN, (key, value) -> CREDENTIAL_MASK);
params.computeIfPresent(CloudConfigurationConstants.AZURE_ADLS2_SHARED_KEY, (key, value) -> CREDENTIAL_MASK);
params.computeIfPresent(CloudConfigurationConstants.AZURE_ADLS2_SAS_TOKEN, (key, value) -> CREDENTIAL_MASK);
}

public void getProcNodeData(BaseProcResult result) {
Expand Down Expand Up @@ -294,7 +303,7 @@ public static Map<String, String> getParamsFromFileStoreInfo(FileStoreInfo fsInf
params.put(CloudConfigurationConstants.HDFS_USERNAME_DEPRECATED, userName);
}
return params;
case AZBLOB:
case AZBLOB: {
AzBlobFileStoreInfo azBlobFileStoreInfo = fsInfo.getAzblobFsInfo();
params.put(CloudConfigurationConstants.AZURE_BLOB_ENDPOINT, azBlobFileStoreInfo.getEndpoint());
AzBlobCredentialInfo azBlobcredentialInfo = azBlobFileStoreInfo.getCredential();
Expand All @@ -307,6 +316,21 @@ public static Map<String, String> getParamsFromFileStoreInfo(FileStoreInfo fsInf
params.put(CloudConfigurationConstants.AZURE_BLOB_SAS_TOKEN, sasToken);
}
return params;
}
case ADLS2: {
ADLS2FileStoreInfo adls2FileStoreInfo = fsInfo.getAdls2FsInfo();
params.put(CloudConfigurationConstants.AZURE_ADLS2_ENDPOINT, adls2FileStoreInfo.getEndpoint());
ADLS2CredentialInfo adls2credentialInfo = adls2FileStoreInfo.getCredential();
String sharedKey = adls2credentialInfo.getSharedKey();
if (!Strings.isNullOrEmpty(sharedKey)) {
params.put(CloudConfigurationConstants.AZURE_ADLS2_SHARED_KEY, sharedKey);
}
String sasToken = adls2credentialInfo.getSasToken();
if (!Strings.isNullOrEmpty(sasToken)) {
params.put(CloudConfigurationConstants.AZURE_ADLS2_SAS_TOKEN, sasToken);
}
return params;
}
default:
return params;
}
Expand Down
Loading

0 comments on commit 47374e6

Please sign in to comment.