Awesome Open Source
Awesome Open Source


Build Status

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by spotify/spark-bigquery, but there are several differences and enhancements:

  • Use of the Structured Streaming API

  • Use within Pyspark

  • Saving via Decorators

  • Allow saving to partitioned tables

  • Easy integration with Databricks

  • Use of Standard SQL

  • Use Of Time-Ingested Partition Columns

  • Run Data Manipulation Language Queries DML

  • Update schemas on writes using the setSchemaUpdateOptions

  • JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:

  "obj": {
    "data": {
      "data": {}
  • DataFrame's schema is automatically adapted to a legal one:

    1. Illegal characters are replaced with _
    2. Field names are converted to lower case to avoid ambiguity
    3. Duplicate field names are given a numeric suffix (_1, _2, etc.)


I created a container that launches zepplin with spark and the connector for ease of use and quick startup. You can find it here


Including spark-bigquery into your project





To use it in a local SBT console first add the package as a dependency then set up your project details

resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.6"
import com.samelamin.spark.bigquery._

// Set up GCP credentials

// Set up BigQuery project and bucket

// Set up BigQuery dataset location, default is US

Structured Streaming from S3/HDFS to BigQuery

S3,Blob Storage or HDFS are the defacto technology for storage in the cloud, this package allows you to stream any data added to a Big Query Table of your choice

import com.samelamin.spark.bigquery._

val df = spark.readStream.json("s3a://bucket")

      .option("checkpointLocation", "s3a://checkpoint/dir")

Structured Streaming from BigQuery Table

You can use this connector to stream from a BigQuery Table. The connector uses a Timestamped column to get offsets.

import com.samelamin.spark.bigquery._

val df = spark

You can also specify a custom timestamp column:

import com.samelamin.spark.bigquery._


You can also specify a custom Time Ingested Partition column:

import com.samelamin.spark.bigquery._


Saving DataFrame using BigQuery Hadoop writer API

By Default any table created by this connector has a timestamp column of bq_load_timestamp which has the value of the current timestamp.

import com.samelamin.spark.bigquery._

val df = ...

You can also save to a table decorator by saving to dataset-id.table-name$YYYYMMDD

Saving DataFrame using Pyspark

from pyspark.sql import SparkSession

BQ_PROJECT_ID = "projectId"
DATASET_ID = "datasetId"
TABLE_NAME = "tableName"

KEY_FILE = "/path/to/service_account.json" # When not on GCP
STAGING_BUCKET = "gcs-bucket"              # Intermediate JSON files
DATASET_LOCATION = "US"                    # Location for dataset creation

# Start session and reference the JVM package via py4j for convienence
session = SparkSession.builder.getOrCreate()
bigquery =

# Prepare the bigquery context
bq = bigquery.BigQuerySQLContext(session._wrapped._jsqlContext)

# Extract and Transform a dataframe
# df =

# Load into a table or table partition
bqDF = bigquery.BigQueryDataFrame(df._jdf)
    "{0}:{1}.{2}".format(BQ_PROJECT_ID, DATASET_ID, TABLE_NAME),
    False, # Day paritioned when created
    0,     # Partition expired when created

Submit with:

pyspark --packages com.github.samelamin:spark-bigquery_2.11:0.2.6


gcloud dataproc jobs submit pyspark --properties spark.jars.packages=com.github.samelamin:spark-bigquery_2.11:0.2.6

Reading DataFrame From BigQuery

import com.samelamin.spark.bigquery._
val sqlContext = spark.sqlContext


val df ="com.samelamin.spark.bigquery").option("tableReferenceSource","bigquery-public-data:samples.shakespeare").load()

### Reading DataFrame From BigQuery in Pyspark

bq =
df= DataFrame(bq.bigQuerySelect("SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]"), session._wrapped)

Running DML Queries

import com.samelamin.spark.bigquery._

// Load results from a SQL query
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")

Please note that DML queries need to be done using Standard SQL

Update Schemas

You can also allow the saving of a dataframe to update a schema:

import com.samelamin.spark.bigquery._


Notes on using this API:

  • Structured Streaming needs a partitioned table which is created by default when writing a stream
  • Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a bq_load_timestamp column with a default value of the current timstamp.
  • For use with Databricks please follow this guide


Copyright 2016 samelamin.

Licensed under the Apache License, Version 2.0:

Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
scala (2,567
spark (348
schema (173
bigquery (42
data-frame (18

Find Open Source By Browsing 7,000 Topics Across 59 Categories