Stream Interfaces
The SDK's expose an interface that has to be extended inorder to build a connector. A sample is shown below
Imports
package org.sunbird.obsrv.connector;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sunbird.obsrv.connector.model.Models;
import org.sunbird.obsrv.connector.source.IConnectorSource;
import org.sunbird.obsrv.connector.source.SourceConnectorFunction;
import org.sunbird.obsrv.job.exception.UnsupportedDataFormatException;
import java.util.List;SourceConnectorFunction
public class ExampleSourceFunction extends SourceConnectorFunction {
public ExampleSourceFunction(List<Models.ConnectorContext> connectorContexts) {
super(connectorContexts);
}
@Override
public void processEvent(
String event,
Function1<String, BoxedUnit> onSuccess,
Function2<String, org.sunbird.obsrv.job.model.Models.ErrorData, BoxedUnit> onFailure,
Function2<String, Object, BoxedUnit> incMetric
){
// TODO: Implement this method to process the event
// Call onSuccess.apply(event) if the event is processed successfully
// Call onFailure.apply(event, errorData) if the event processing fails
// Call incMetric.apply(event, metricData) to increment the metric
}
@Override
public List<String> getMetrics() {
// TODO: Return the list of metrics
return List.empty();
}
}Ref: https://github.com/Sunbird-Obsrv/connector-sdk-scala/
IConnectorSource Class
Imports
SourceConnectorFunction
Ref: https://github.com/Sunbird-Obsrv/connector-sdk-scala/
