Batch indexing to Solr using SparkApp framework in spark-submit
You may use spark-submit with your spark job to batch index HDFS files into Solr. For this
you need to create a class which implements the SparkApp.RDDProcesor
interface. This allows ETL of large datasets to Solr, exploiting Spark's robust data
processing capabillities.
To use the SparkApp framework, you need to create a Maven project with the spark-solr dependency.
<dependencies>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>{latest_version}</version>
</dependency>
</dependencies>
This project needs to have at least one class, which implements the
SparkApp.RDDProcessor. This class can either be a Java or a Scala class.
This documentation uses a Java class to demonstrate how to use the framework.
The SparkApp.RDDProcessor has three functions which need to be
overwritten:
getName()getOptions()run
getName()
The getName() function returns a string, the short name of the
application. When running your spark-submit job, this is the name you pass as a parameter to
make the job find your class.
public String getName() { return "csv"; }
getOptions()
In the getOptions() function you may specify parameters that are specific
to your application. Certain parameters, for example zkHost,
collection, or batchSize are present by default. You do
not need to specify those here.
public Option[] getOptions() {
return new Option[]{
OptionBuilder
.withArgName("PATH").hasArgs()
.isRequired(true)
.withDescription("Path to the CSV file to index")
.create("csvPath")
};
}
run
The run function is the core of the application. This returns an integer,
and has two parameters, a SparkConf instance and
CommandLine instance.
You can create a JavaSparkContext with the use of the
SparkConf instance, and use this to open our CSV file as a
JavaRDD<String>:
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));
You now have to convert these Strings to
SolrInputDocument, and create a JavaRDD of them. To
achieve this the script uses a custom made map function which splits the CSV file upon
commas and adds the records to the SolrInputDocument. For this step to
work, you have to specify the schema used in the CSV file in advance.
JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() {
@Override
public SolrInputDocument call(String line) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
String[] row = line.split(",");
if (row.length != schema.length)
return null;
for (int i=0;i<schema.length;i++){
doc.setField(schema[i], row[i]);
}
return doc;
}
});
After this, the script asks the CommandLine instance for the
options it needs to perform indexing:
String zkhost = cli.getOptionValue("zkHost", "localhost:9983");
String collection = cli.getOptionValue("collection", "collection1");
int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));
Finally, it indexes data into the Solr cluster:
SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());
If the function was succesfully called, 0 is returned.
