Kudu integration with Spark
Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include
the kudu-spark
dependency using the --packages
option.
Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0-cdh5.13.91 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11.
spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
Below is a minimal Spark SQL "select" example for a Kudu table created with Impala in the "default" database. We first import the kudu spark package, then create a DataFrame, and then create a view from the DataFrame. After those steps, the table is accessible from Spark SQL.
import org.apache.kudu.spark.kudu._
// Create a DataFrame that points to the Kudu table we want to query.
val df = spark.read.options(Map("kudu.master" -> "master1.foo.com,master2.foo.com,master3.foo.com",
"kudu.table" -> "default.my_table")).kudu
// Create a view from the DataFrame to make it accessible from Spark SQL.
df.createOrReplaceTempView("my_table")
// Now we can run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from my_table").show()
Below is a more sophisticated example that includes both reads and writes:
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
import collection.JavaConverters._
// Read a table from Kudu
val df = spark.read
.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
.format("kudu").load
// Query using the Spark API...
df.select("id").filter("id >= 5").show()
// ...or register a temporary table and use SQL
df.createOrReplaceTempView("kudu_table")
val filteredDF = spark.sql("select id from kudu_table where id >= 5").show()
// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)
// Create a new Kudu table from a DataFrame schema
// NB: No rows from the DataFrame are inserted into the table
kuduContext.createTable(
"test_table", df.schema, Seq("key"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("key").asJava, 3))
// Insert data
kuduContext.insertRows(df, "test_table")
// Delete data
kuduContext.deleteRows(filteredDF, "test_table")
// Upsert data
kuduContext.upsertRows(df, "test_table")
// Update data
val alteredDF = df.select("id", $"count" + 1)
kuduContext.updateRows(filteredRows, "test_table")
// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported
df.write
.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
.mode("append")
.format("kudu").save
// Check for the existence of a Kudu table
kuduContext.tableExists("another_table")
// Delete a Kudu table
kuduContext.deleteTable("unwanted_table")