Skip to content

Commit

Permalink
#000 fix: CSP Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
manjudr committed Jul 19, 2023
1 parent a6af6ec commit c27b95e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.ekstep.analytics.framework.util
import org.apache.spark.SparkContext
import org.sunbird.cloud.storage.conf.AppConf

trait ICloudStorageProvider {
def setConf(sc: SparkContext): Unit
}

object CloudStorageProviders {
implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider"
private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("S3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider])
def setSparkCSPConfigurations(sc: SparkContext, csp: String): Unit = {
providerMap.get(csp.toLowerCase()).foreach { providerClass =>
val providerConstructor = providerClass.getDeclaredConstructor()
val providerInstance:ICloudStorageProvider = providerConstructor.newInstance()
providerInstance.setConf(sc)
}
}
}
class S3Provider extends ICloudStorageProvider {
override def setConf(sc: SparkContext): Unit = {
implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider"
JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey())
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret())
val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint")
if (storageEndpoint.nonEmpty) {
sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint)
}
}
}

class AzureProvider extends ICloudStorageProvider {
override def setConf(sc: SparkContext): Unit = {
val accName = AppConf.getStorageKey("azure")
val accKey = AppConf.getStorageSecret("azure")
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey)
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
}
}
class GcpProvider extends ICloudStorageProvider {
override def setConf(sc: SparkContext): Unit = {
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import java.security.MessageDigest
import java.sql.Timestamp
import java.util.zip.GZIPOutputStream
import java.util.{Date, Properties}

import com.ing.wbaa.druid.definitions.{Granularity, GranularityType}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.ekstep.analytics.framework.Level._
import org.ekstep.analytics.framework.Period._
import org.ekstep.analytics.framework.util.CloudStorageProviders.setSparkCSPConfigurations
import org.ekstep.analytics.framework.{DtRange, Event, JobConfig, _}

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -91,9 +91,7 @@ object CommonUtil {
}

val sc = new SparkContext(conf)
setS3Conf(sc)
setAzureConf(sc)
setGcloudConf(sc)
setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type"))
JobLogger.log("Spark Context initialized")
sc
}
Expand Down Expand Up @@ -144,40 +142,10 @@ object CommonUtil {
}

val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate()
setS3Conf(sparkSession.sparkContext)
setAzureConf(sparkSession.sparkContext)
setGcloudConf(sparkSession.sparkContext)
setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type"))
JobLogger.log("SparkSession initialized")
sparkSession
}

def setS3Conf(sc: SparkContext) = {
JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey());
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret());

val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint")
if (!"".equalsIgnoreCase(storageEndpoint)) {
sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint)
}
}

def setAzureConf(sc: SparkContext) = {
val accName = AppConf.getStorageKey("azure")
val accKey = AppConf.getStorageSecret("azure")
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey)
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
}

def setGcloudConf(sc: SparkContext) = {
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
}

def closeSparkContext()(implicit sc: SparkContext) {
JobLogger.log("Closing Spark Context", None, INFO)
sc.stop();
Expand Down

0 comments on commit c27b95e

Please sign in to comment.