Skip to content

Commit

Permalink
Merge pull request #146 from Sunbird-Obsrv/release-5.0.0
Browse files Browse the repository at this point in the history
Merge Release-5.0.0 to Release-5.1.0
  • Loading branch information
manjudr authored Sep 20, 2022
2 parents 29bd6bd + b6488f3 commit a6af6ec
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 2 deletions.
13 changes: 12 additions & 1 deletion analytics-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk_${scala.maj.version}</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>com.microsoft.azure</groupId>
Expand Down Expand Up @@ -247,6 +247,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop2-2.0.1</version>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.exception.DataFetcherException
import org.ekstep.analytics.framework.fetcher.{AzureDataFetcher, DruidDataFetcher, S3DataFetcher, CephS3DataFetcher}
import org.ekstep.analytics.framework.fetcher.{AzureDataFetcher, DruidDataFetcher, S3DataFetcher, CephS3DataFetcher, GcloudDataFetcher}
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}

/**
Expand Down Expand Up @@ -34,6 +34,9 @@ object DataFetcher {
case "azure" =>
JobLogger.log("Fetching the batch data from AZURE")
AzureDataFetcher.getObjectKeys(search.queries.get);
case "gcloud" =>
JobLogger.log("Fetching the batch data from Google Cloud")
GcloudDataFetcher.getObjectKeys(search.queries.get);
case "local" =>
JobLogger.log("Fetching the batch data from Local file")
search.queries.get.map { x => x.file.getOrElse(null) }.filterNot { x => x == null };
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.ekstep.analytics.framework.dispatcher

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.ekstep.analytics.framework.{FrameworkContext, StorageConfig}
import org.ekstep.analytics.framework.exception.DispatcherException
import org.ekstep.analytics.framework.util.CommonUtil

object GcloudDispatcher extends HadoopDispatcher with IDispatcher {

implicit val className = "org.ekstep.analytics.framework.dispatcher.GcloudDispatcher"

override def dispatch(config: Map[String, AnyRef], events: RDD[String])(implicit sc: SparkContext, fc: FrameworkContext): Unit = {

val bucket = config.getOrElse("bucket", null).asInstanceOf[String];
val key = config.getOrElse("key", null).asInstanceOf[String];
val isPublic = config.getOrElse("public", false).asInstanceOf[Boolean];

if (null == bucket || null == key) {
throw new DispatcherException("'bucket' & 'key' parameters are required to send output to GCloud")
}

val srcFile = CommonUtil.getGCloudFile(bucket, "_tmp/" + key);
val destFile = CommonUtil.getGCloudFile(bucket, key);
dispatchData(srcFile, destFile, sc.hadoopConfiguration, events)
}

override def dispatch(events: RDD[String], config: StorageConfig)(implicit sc: SparkContext, fc: FrameworkContext): Unit = {
val bucket = config.container;
val key = config.fileName;

dispatch(Map[String, AnyRef]("bucket" -> bucket, "key" -> key), events);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object DispatcherFactory {
AzureDispatcher;
case "elasticsearch" =>
ESDispatcher;
case "gcloud" =>
GcloudDispatcher;
case _ =>
throw new DispatcherException("Unknown output dispatcher destination found");
}
Expand All @@ -45,6 +47,8 @@ object DispatcherFactory {
FileDispatcher;
case "azure" =>
AzureDispatcher;
case "gcloud" =>
GcloudDispatcher;
case _ =>
throw new DispatcherException("Unknown output dispatcher destination found");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.ekstep.analytics.framework.fetcher

import org.ekstep.analytics.framework.{FrameworkContext, Query}
import org.ekstep.analytics.framework.exception.DataFetcherException
import org.sunbird.cloud.storage.conf.AppConf

object GcloudDataFetcher {

@throws(classOf[DataFetcherException])
def getObjectKeys(queries: Array[Query])(implicit fc: FrameworkContext): Array[String] = {

val keys = for(query <- queries) yield {
val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) {
Array("gs://"+getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.getOrElse(""))
} else {
getKeys(query);
}
if(query.excludePrefix.isDefined) {
paths.filter { x => !x.contains(query.excludePrefix.get) }
} else {
paths
}
}
keys.flatMap { x => x.map { x => x } }
}

private def getKeys(query: Query)(implicit fc: FrameworkContext) : Array[String] = {
val storageService = fc.getStorageService("gcloud", "gcloud_client_key", "gcloud_private_secret");
val keys = storageService.searchObjects(getBucket(query.bucket), getPrefix(query.prefix), query.startDate, query.endDate, query.delta, query.datePattern.getOrElse("yyyy-MM-dd"))
storageService.getPaths(getBucket(query.bucket), keys).toArray
}

private def getBucket(bucket: Option[String]) : String = {
bucket.getOrElse("telemetry-data-store");
}

private def getPrefix(prefix: Option[String]) : String = {
prefix.getOrElse("raw/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object CommonUtil {
val sc = new SparkContext(conf)
setS3Conf(sc)
setAzureConf(sc)
setGcloudConf(sc)
JobLogger.log("Spark Context initialized")
sc
}
Expand Down Expand Up @@ -145,6 +146,7 @@ object CommonUtil {
val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate()
setS3Conf(sparkSession.sparkContext)
setAzureConf(sparkSession.sparkContext)
setGcloudConf(sparkSession.sparkContext)
JobLogger.log("SparkSession initialized")
sparkSession
}
Expand All @@ -168,6 +170,14 @@ object CommonUtil {
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
}

def setGcloudConf(sc: SparkContext) = {
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
}

def closeSparkContext()(implicit sc: SparkContext) {
JobLogger.log("Closing Spark Context", None, INFO)
sc.stop();
Expand Down Expand Up @@ -737,6 +747,14 @@ object CommonUtil {
bucket + "@" + AppConf.getConfig(storageKey) + ".blob.core.windows.net/" + file;
}

def getGCloudFile(bucket: String, file: String): String = {
"gs://" + bucket + "/" + file;
}

def getGCloudFileWithoutPrefix(bucket: String, file: String): String = {
bucket + "/" + file;
}

def setStorageConf(store: String, accountKey: Option[String], accountSecret: Option[String])(implicit sc: SparkContext): Configuration = {
store.toLowerCase() match {
case "s3" =>
Expand All @@ -745,6 +763,12 @@ object CommonUtil {
case "azure" =>
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + AppConf.getConfig(accountKey.getOrElse("azure_storage_key")) + ".blob.core.windows.net", AppConf.getConfig(accountSecret.getOrElse("azure_storage_secret")))
case "gcloud" =>
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
case _ =>
// Do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class DatasetExt(df: Dataset[Row]) {
CommonUtil.getS3FileWithoutPrefix(storageConfig.container, storageConfig.fileName);
case "azure" =>
CommonUtil.getAzureFileWithoutPrefix(storageConfig.container, storageConfig.fileName, storageConfig.accountKey.getOrElse("azure_storage_key"))
case "gcloud" =>
CommonUtil.getGCloudFileWithoutPrefix(storageConfig.container, storageConfig.fileName);
case _ =>
storageConfig.fileName
}
Expand All @@ -56,6 +58,8 @@ class DatasetExt(df: Dataset[Row]) {
"s3n://"
case "azure" =>
"wasb://"
case "gcloud" =>
"gs://"
case _ =>
""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,20 @@ class TestDataFetcher extends SparkSpec with Matchers with MockFactory {
keys5.head should be ("https://sunbirddevprivate.blob.core.windows.net/dev-data-store/raw/2020-06-10-0-1591845501666.json.gz")
}


it should "fetch data from Google Cloud" in {

implicit val mockFc = mock[FrameworkContext];
val mockStorageService = mock[BaseStorageService]
mockFc.inputEventsCount = sc.longAccumulator("Count");
(mockFc.getStorageService(_:String, _:String, _:String):BaseStorageService).expects("gcloud", "gcloud_client_key", "gcloud_private_secret").returns(mockStorageService);
(mockStorageService.searchObjects _).expects("test-obsrv-data-store", "unique/raw/", Option("2022-08-09"), Option("2022-08-09"), None, "yyyy-MM-dd").returns(null);
(mockStorageService.getPaths _).expects("test-obsrv-data-store", null).returns(List("src/test/resources/sample_telemetry_2.log"))
val queries = Option(Array(
Query(Option("test-obsrv-data-store"), Option("unique/raw/"), Option("2022-08-09"), Option("2022-08-09"))
));
val rdd = DataFetcher.fetchBatchData[V3Event](Fetcher("gcloud", None, queries));
rdd.count should be (19)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class TestOutputDispatcher extends SparkSpec("src/test/resources/sample_telemetr
S3Dispatcher.dispatch(Map[String, AnyRef]("key" -> "test_key", "bucket" -> "test_bucket"), events.map(f => JSONUtils.serialize(f)));
}

a[IOException] should be thrownBy {
OutputDispatcher.dispatch(StorageConfig("gcloud", "test-obsrv-data-store", "test_key/test_data.json"), events.map(f => JSONUtils.serialize(f)));
}
}

it should "dispatch output to elastic-search" in {
Expand Down

0 comments on commit a6af6ec

Please sign in to comment.