Batch Interfaces
The SDK’s exposes an interface which is to be extended inorder to build a connector
Java
Imports
Section titled “Imports”package org.sunbird.obsrv.connector;
import com.typesafe.config.Config;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.sunbird.obsrv.connector.model.Models.ConnectorContext;import org.sunbird.obsrv.connector.source.ISourceConnector;
import java.util.Collections;import java.util.Map;ISourceConnector
Section titled “ISourceConnector”public class ExampleSourceConnector implements ISourceConnector {
@Override public Map<String, String> getSparkConf(Config config) { // TODO: Return the SparkConf related to your connector return Collections.emptyMap(); }
@Override public Dataset<Row> process(SparkSession spark, ConnectorContext ctx, Config config, BiConsumer<String, Long> metricFn) { // TODO: Add logic to read the data and return a dataframe return spark.emptyDataFrame(); }}Reference
Section titled “Reference”Scala
Imports
Section titled “Imports”package org.sunbird.obsrv.connector
import com.typesafe.config.Configimport org.apache.spark.sql.functions.{col, max}import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import org.sunbird.obsrv.connector.model.Models.ConnectorContextimport org.sunbird.obsrv.connector.source.{ISourceConnector, SourceConnector}ISourceConnector
Section titled “ISourceConnector”class ExampleSourceConnector extends ISourceConnector {
override def getSparkConf(config: Config): Map[String, String] = { // TODO: Return the SparkConf related to your connector Map[String, String]() }
override def process(spark: SparkSession, ctx: ConnectorContext, config: Config, metricFn: (String, Long) => Unit): Dataset[Row] = { // TODO: Add logic to read the data and return a dataframe spark.emptyDataFrame }Reference
Section titled “Reference”Python
Imports
Section titled “Imports”from obsrv.common import ObsrvExceptionfrom obsrv.connector import ConnectorContext, MetricsCollectorfrom obsrv.connector.batch import ISourceConnectorfrom obsrv.job.batch import get_base_conffrom obsrv.models import ErrorData, StatusCodefrom obsrv.utils import LoggerControllerfrom pyspark.conf import SparkConffrom pyspark.sql import DataFrame, SparkSessionfrom pyspark.sql.functions import litISourceConnector
Section titled “ISourceConnector”class ExampleSource(ISourceConnector): def process( self, sc: SparkSession, ctx: ConnectorContext, connector_config: Dict[Any, Any], metrics_collector: MetricsCollector, ) -> Iterator[DataFrame]: # TODO: return or yield dataframe # yield sc.createDataFrame([], schema=None) return sc.createDataFrame([], schema=None)
def get_spark_conf(self, connector_config) -> SparkConf: conf = get_base_conf() # TODO: Extend or Add the SparkConf related to your connector return conf