Apache Spark-HBase Connector

Sandip NikaleSandip Nikale
6 min read

Spark needs a connector library to access HBase.
Two connectors are available, which one should you use?

Configuration and setup

Client-side (Spark) configuration:

  • You need the HBase client configuration file hbase-site.xml

  • This points to the HBase you want to connect to

  • Copy hbase-site.xml to SPARK_CONF_DIR (default is $SPARK_HOME/conf`)

Server-side (HBase region servers) configuration:

  • When using the Apache Hbase-Spark connector there is also a server-side configuration

  • This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH (for example copy it to /usr/hdp/hbase-2.3/lib:

    • scala-library

    • hbase-spark

    • hbase-spark-protocol-shaded.

  • Build the connector from GitHub as explained below (see Spark 3.x section). In this example we use pre-built jars JAR1 and JAR2.

      # Download connector jars to HBase region servers $HBASE_HOME/lib
    
      # From CERN network
      # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # stopgap copy
      wget http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      wget http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # Scala library, match the Scala version used for building
      wget https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar
    

Spark 2.x

Spark 2.4 and the Spark-HBase Hortonworks connector

# Example of how to use the connector with Spark (note: need hbase-site.xml in SPARK_CONF_DIR) 
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories http://repo.hortonworks.com/content/groups/public/ \
--packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1
// Example of how to use the Hortonworks connector to read into a DataFrame
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").option("hbase.spark.use.hbasecontext", false).load()

df.show()

Spark 2.x and the Apache Spark HBase connector

  • The connector for Spark 2.x is available in Maven Central

  • You need the configuration steps, see above "Configuration and setup"

    • Note that you need to configure the server-side too
# Example of how to use the connector with Spark 
# Note: you need hbase-site.xml in SPARK_CONF_DIR
# Note: you need to set up the server-side component too for filter pushdown
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories https://repository.cloudera.com/artifactory/libs-release-local \
--packages org.apache.hbase.connectors.spark:hbase-spark-protocol-shaded:1.0.0.7.2.2.2-1,org.apache.hbase.connectors.spark:hbase-spark:1.0.0.7.2.2.2-1,org.apache.hbase:hbase-shaded-mapreduce:2.2.4
// Example of how to use the Apache Hbase-Spark connector to read into a DataFrame
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load()
// alternative, more compact, syntax for HBase catalog
// val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load()

df.show()

Spark 3.x

Use the Apache Spark HBase connector with Spark 3.x

  • Build the connector from the Github repo Apache HBase Connectors

  • Build as in this example (customize HBase, Spark and Hadoop versions, as needed):

      mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 -Dhadoop-three.version=3.2.0 -DskipTests clean package
    
  • Deploy using Spark 3.x, as in this example:

      # Customize the JARs path to your filesystem location
      # For convenience I have also uploaded the jars on a web server
    
      # From CERN network
      #JAR1=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      #JAR2=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # stopgap copy
      JAR1 = http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      JAR2 = http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
      --jars $JAR1,$JAR2 --packages org.apache.hbase:hbase-shaded-mapreduce:2.4.9
    
  • Option for CERN users:

SQL Filter pushdown and server-side library configuration

  • This allows to push down filter predicates to HBase

    • It is configured with .option("hbase.spark.pushdown.columnfilter", true) which is the default.

    • This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH: scala-library, hbase-spark and hbase-spark-protocol-shaded.

    • See "Configuration and setup" section for details

    • If filter pushdown jars are not configured you will get an error: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder$

      • See: HBASE-22769

      • Example of how to use SQL filter pushdown

```plaintext
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", true).load()
df.filter("id<10").show()
```

Apache HBase-Spark connector tunables

  • There are several tunable in the Apache Hbase-Spark connector, for example:

    • hbase.spark.query.batchsize - Set the maximum number of values to return for each call to next() in scan.

    • hbase.spark.query.cachedrows - The number of rows for caching that will be passed to scan.

    • Details of the available configuration at this link


Spark-HBase connector: write and read example

  • On HBase, create the test table and grant the related privileges to your user (use hbase shell):

      create 'testspark', 'cf'
      grant '<your_username_here>', 'XRW', 'testspark'
    
    • Note this may be too needed: grant '<your_username_here>', 'X', 'hbase:meta'
  • On Spark:

    • Start Spark 2.x or 3.x as detailed above

    • Write:

```plaintext
val df = sql("select id, 'myline_'||id  name from range(10)")
df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).save()
```
  • Read the HBase table from Spark

      val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load()
      df.show()
    
  • Read back from Spark using a filter, without server-side installation for SQL filter pushdown HBASE-22769

      val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", false).load()
      df.show()
    

Another option for writing and reading by explicitly specifying the catalog in JSON format

  • see also HbaseTableCatalog

  • On HBase, grant create table privilege to your user:

      grant '<your_username_here>', 'C'
    
  • On Spark:

    • Start Spark 2.x or 3.x as detailed above

    • Write:

```plaintext
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = sql("select id, 'myline_'||id  name from range(10)")

// HBaseTableCatalog.newTable -> If defined and larger than 3, a new table will be created with the number of region specified.
df.write.options(Map(HBaseTableCatalog.tableCatalog->catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).save()
```
  • Read HBase from Spark

      import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
    
      val catalog = s"""{
        |"table":{"namespace":"default", "name":"testspark"},
        |"rowkey":"key",
        |"columns":{
        |"id":{"col":"key", "type":"int"},
        |"name":{"cf":"cf", "col":"name", "type":"string"}
        |}
        |}""".stripMargin
    
      val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load()
      df.show()
    

PySpark

  • Note: this is almost the same as with Scala examples above, with the important change when running Spark of substituting spark-shell with pyspark. See also above for "Configuration and setup" how to configure Hbase client and server.

  • Write to HBase with Spark

      df = spark.sql("select id, 'myline_'||id  name from range(10)")
      df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).save()
    
  • Read from HBase with Spark

      df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).load()
      df.show()
      df.filter("id<10").show() # see above note on SQL filter pushdown server-side configuration
    
  • Another option for reading: explicitly specify the catalog in JSON format:

      import json
    
      catalog = json.dumps(
        {
            "table":{"namespace":"default", "name":"testspark"},
            "rowkey":"key",
            "columns":{
                "id":{"col":"key", "type":"int"},
                "name":{"cf":"cf", "col":"name", "type":"string"}
            }
        })
    
      df = spark.read.options(catalog=catalog).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", False).load()
    
      df.show()
    
0
Subscribe to my newsletter

Read articles from Sandip Nikale directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sandip Nikale
Sandip Nikale

DataOps EngineerTalks about Cloud, DevOps, BigData & AI/ML