MapReduceIndexerTool
MapReduceIndexerTool is a MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS in a flexible, scalable, and fault-tolerant manner. It also supports merging the output shards into a set of live customer-facing Solr servers, typically a SolrCloud.
More details are available through the command line help:
$ hadoop jar target/search-mr-*-job.jar \
org.apache.solr.hadoop.MapReduceIndexerTool --help
usage: hadoop [GenericOptions]... jar search-mr-*-job.jar \
org.apache.solr.hadoop.MapReduceIndexerTool
[--help] --output-dir HDFS_URI [--input-list URI]
--morphline-file FILE [--morphline-id STRING]
[--update-conflict-resolver FQCN] [--mappers INTEGER]
[--reducers INTEGER] [--max-segments INTEGER]
[--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE]
[--verbose] [--show-non-solr-cloud] [--zk-host STRING] [--go-live]
[--collection STRING] [--go-live-threads INTEGER]
[HDFS_URI [HDFS_URI ...]]
The MapReduce batch job is a driver that takes a morphline and creates a
set of Solr index shards from a set of input files and writes the indexes
into HDFS, in a flexible, scalable and fault-tolerant manner. It also
supports merging the output shards into a set of live customer facing Solr
servers, typically a SolrCloud. The program proceeds in several
consecutive MapReduce based phases, as follows:
1) Randomization phase: This (parallel) phase randomizes the list of
input files in order to spread indexing load more evenly among the
mappers of the subsequent phase.
2) Mapper phase: This (parallel) phase takes the input files, extracts
the relevant content, transforms it and hands SolrInputDocuments to a set
of reducers. The ETL functionality is flexible and customizable using
chains of arbitrary morphline commands that pipe records from one
transformation command to another. Commands to parse and transform a set
of standard data formats such as Avro, CSV, Text, HTML, XML, PDF, Word,
or Excel are provided out of the box, and additional custom commands
and parsers for additional file or data formats can be added as morphline
plug-ins. This is done by implementing a simple Java interface that
consumes a record (for example a file in the form of an InputStream plus
some headers plus contextual metadata) and generates as output zero or
more records. Any kind of data format can be indexed and any Solr
documents for any kind of Solr schema can be generated, and any custom
ETL logic can be registered and executed.
Record fields, including MIME types, can also explicitly be passed by
force from the CLI to the morphline, for example: hadoop ... -D
morphlineField._attachment_mimetype=text/csv
3) Reducer phase: This (parallel) phase loads the mapper's
SolrInputDocuments into one EmbeddedSolrServer per reducer. Each such
reducer and Solr server can be seen as a (micro) shard. The Solr servers
store their data in HDFS.
4) Mapper-only merge phase: This (parallel) phase merges the set of
reducer shards into the number of Solr shards expected by the user, using
a mapper-only job. This phase is omitted if the number of shards is
already equal to the number of shards expected by the user.
5) Go-live phase: This optional (parallel) phase merges the output shards
of the previous phase into a set of live customer facing Solr servers,
typically a SolrCloud. If this phase is omitted you can explicitly point
each Solr server to one of the HDFS output shard directories.
Fault Tolerance: Mapper and reducer task attempts are retried on failure
per the standard MapReduce semantics. On program startup all data in the
--output-dir is deleted if that output directory already exists. If the
whole job fails you can retry simply by rerunning the program again using
the same arguments.
positional arguments:
HDFS_URI HDFS URI of file or directory tree to index.
(default: [])
optional arguments:
--help, -help, -h Show this help message and exit
--input-list URI Local URI or HDFS URI of a UTF-8 encoded file
containing a list of HDFS URIs to index, one URI
per line in the file. If '-' is specified, URIs
are read from the standard input. Multiple --
input-list arguments can be specified.
--morphline-id STRING The identifier of the morphline that shall be
executed within the morphline config file
specified by --morphline-file. If the --
morphline-id option is omitted the first (meaning
the top-most) morphline within the config file is
used. Example: morphline1
--update-conflict-resolver FQCN
Fully qualified class name of a Java class that
implements the UpdateConflictResolver interface.
This enables deduplication and ordering of a
series of document updates for the same unique
document key. For example, a MapReduce batch job
might index multiple files in the same job where
some of the files contain old and new versions
of the very same document, using the same unique
document key.
Typically, implementations of this interface
forbid collisions by throwing an exception, or
ignore all but the most recent document version,
or, in the general case, order colliding updates
ascending from least recent to most recent
(partial) update. The caller of this interface
(i.e. the Hadoop Reducer) will then apply the
updates to Solr in the order returned by the
orderUpdates() method.
The default
RetainMostRecentUpdateConflictResolver
implementation ignores all but the most recent
document version, based on a configurable
numeric Solr field, which defaults to the
file_last_modified timestamp (default: org.
apache.solr.hadoop.dedup.
RetainMostRecentUpdateConflictResolver)
--mappers INTEGER Tuning knob that indicates the maximum number of
MR mapper tasks to use. -1 indicates use all map
slots available on the cluster. (default: -1)
--reducers INTEGER Tuning knob that indicates the number of reducers
into which to index. To use one reducer per
output shard, use 0 for Search 1.x and use -2 for
Search for CDH 5. Using one reducer per output
shard disables the mtree merge MR algorithm. The
mtree merge MR algorithm improves scalability by
distributing CPU load among a set of parallel
reducers that can be more numerous than the
number of Solr shards expected by the user. It
can be seen as an extension of concurrent lucene
merges and tiered lucene merges to the clustered
case. -1 indicates use all reduce slots available
on the cluster. The subsequent mapper-only phase
merges the reducer output to the number of shards
expected by the user, again by utilizing a
cluster's parallelism. (default: -1)
--max-segments INTEGER
Tuning knob that indicates the maximum number of
segments to be contained on output in the index
of each reducer shard. After a reducer has built
its output index it applies a merge policy to
merge segments until there are <= maxSegments
lucene segments left in this index. Merging
segments involves reading and rewriting all data
in all these segment files, potentially multiple
times, which is very I/O intensive and time
consuming. However, an index with fewer segments
can later be merged faster, and it can later be
queried faster once deployed to a live Solr
serving shard. Set maxSegments to 1 to optimize
the index for low query latency. In a nutshell,
a small maxSegments value trades indexing
latency for subsequently improved query latency.
This can be a reasonable trade-off for batch
indexing systems. (default: 1)
--fair-scheduler-pool STRING
Optional tuning knob that indicates the name of
the fair scheduler pool to submit jobs to. The
Fair Scheduler is a pluggable MapReduce
scheduler that provides a way to share large
clusters. Fair scheduling is a method of
assigning resources to jobs such that all jobs
get, on average, an equal share of resources
over time. When there is a single job running,
that job uses the entire cluster. When other
jobs are submitted, tasks slots that free up are
assigned to the new jobs, so that each job gets
roughly the same amount of CPU time. Unlike the
default Hadoop scheduler, which forms a queue of
jobs, this lets short jobs finish in reasonable
time while not starving long jobs. It is also an
easy way to share a cluster between multiple of
users. Fair sharing can also work with job
priorities - the priorities are used as weights
to determine the fraction of total compute time
that each job gets.
--dry-run Run in local mode and print documents to stdout
instead of loading them into Solr. This executes
the morphline in the client process (without
submitting a job to MR) for quicker turnaround
during early trial and debug sessions. (default:
false)
--log4j FILE Relative or absolute path to a log4j.properties
config file on the local file system. This file
will be uploaded to each MR task. Example:
/path/to/log4j.properties
--verbose, -v Turn on verbose output. (default: false)
--show-non-solr-cloud Also show options for Non-SolrCloud mode as part
of --help. (default: false)
Required arguments:
--output-dir HDFS_URI HDFS directory to write Solr indexes to. Inside
there one output directory per shard will be
generated. Example: hdfs://c2202.mycompany.
com/user/$USER/test
--morphline-file FILE Relative or absolute path to a local config file
that contains one or more morphlines. The file
must be UTF-8 encoded. Example:
/path/to/morphline.conf
Cluster arguments:
Arguments that provide information about your Solr cluster.
--zk-host STRING The address of a ZooKeeper ensemble being used
by a SolrCloud cluster. This ZooKeeper ensemble
will be examined to determine the number of
output shards to create as well as the Solr URLs
to merge the output shards into when using the --
go-live option. Requires that you also pass the
--collection to merge the shards into.
The --zk-host option implements the same
partitioning semantics as the standard SolrCloud
Near-Real-Time (NRT) API. This enables to mix
batch updates from MapReduce ingestion with
updates from standard Solr NRT ingestion on the
same SolrCloud cluster, using identical unique
document keys.
Format is: a list of comma separated host:port
pairs, each corresponding to a zk server.
Example: '127.0.0.1:2181,127.0.0.1:
2182,127.0.0.1:2183' If the optional chroot
suffix is used the example would look like:
'127.0.0.1:2181/solr,127.0.0.1:2182/solr,
127.0.0.1:2183/solr' where the client would be
rooted at '/solr' and all paths would be
relative to this root - i.e.
getting/setting/etc... '/foo/bar' would result
in operations being run on '/solr/foo/bar' (from
the server perspective).
Go live arguments:
Arguments for merging the shards that are built into a live Solr
cluster. Also see the Cluster arguments.
--go-live Allows you to optionally merge the final index
shards into a live Solr cluster after they are
built. You can pass the ZooKeeper address with --
zk-host and the relevant cluster information
will be auto detected. (default: false)
--collection STRING The SolrCloud collection to merge shards into
when using --go-live and --zk-host. Example:
collection1
--go-live-threads INTEGER
Tuning knob that indicates the maximum number of
live merges to run in parallel at one time.
(default: 1000)
Generic options supported are
--conf <configuration FILE>
specify an application configuration file
-D <property=value> use value for given property
--fs <local|namenode:port>
specify a namenode
--jt <local|jobtracker:port>
specify a job tracker
--files <comma separated list of files>
specify comma separated files to be copied to
the map reduce cluster
--libjars <comma separated list of jars>
specify comma separated jar files to include in
the classpath.
--archives <comma separated list of archives>
specify comma separated archives to be
unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
Examples:
# Index an Avro based Twitter tweet file into a live SolrCloud cluster:
sudo -u hdfs hadoop \
--config /etc/hadoop/conf.cloudera.mapreduce1 \
jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
-D 'mapred.child.java.opts=-Xmx500m' \
--log4j src/test/resources/log4j.properties \
--morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
--output-dir hdfs://c2202.mycompany.com/user/$USER/test \
--zk-host zk01.mycompany.com:2181/solr \
--collection collection1 \
--go-live \
hdfs:///user/foo/indir
# Index all files that match all of the following conditions:
# 1) File is contained in dir tree hdfs:///user/$USER/solrloadtest/twitter/tweets
# 2) file name matches the glob pattern 'sample-statuses*.gz'
# 3) file was last modified less than 100000 minutes ago
# 4) file size is between 1 MB and 1 GB
# Also include extra library jar file containing JSON tweet Java parser:
hadoop jar target/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool \
-find hdfs:///user/$USER/solrloadtest/twitter/tweets \
-type f \
-name 'sample-statuses*.gz' \
-mmin -1000000 \
-size -100000000c \
-size +1000000c \
| sudo -u hdfs hadoop \
--config /etc/hadoop/conf.cloudera.mapreduce1 \
jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
--libjars /path/to/cdk-morphlines-twitter-0.9.2.jar \
-D 'mapred.child.java.opts=-Xmx500m' \
--log4j src/test/resources/log4j.properties \
--morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadJsonTestTweets.conf \
--output-dir hdfs://c2202.mycompany.com/user/$USER/test \
--zk-host zk01.mycompany.com:2181/solr \
--collection collection1 \
--input-list -