Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Bigdata Notes | 13,291 | 2 months ago | 33 | Java | ||||||
大数据入门指南 :star: | ||||||||||
God Of Bigdata | 7,992 | a day ago | 2 | |||||||
专注大数据学习面试,大数据成神之路开启。Flink/Spark/Hadoop/Hbase/Hive... | ||||||||||
Doris | 7,619 | 21 hours ago | 1,520 | apache-2.0 | Java | |||||
Apache Doris is an easy-to-use, high performance and unified analytics database. | ||||||||||
Iceberg | 4,057 | 18 hours ago | 4 | May 23, 2022 | 1,304 | apache-2.0 | Java | |||
Apache Iceberg | ||||||||||
Linkis | 2,998 | a day ago | 244 | apache-2.0 | Java | |||||
Apache Linkis builds a computation middleware layer to facilitate connection, governance and orchestration between the upper applications and the underlying data engines. | ||||||||||
Sqlglot | 2,945 | 2 | 19 hours ago | 161 | July 06, 2022 | 1 | mit | Python | ||
Python SQL Parser and Transpiler | ||||||||||
Dataspherestudio | 2,474 | 1 | a month ago | 5 | July 14, 2022 | 402 | apache-2.0 | Java | ||
DataSphereStudio is a one stop data application development& management portal, covering scenarios including data exchange, desensitization/cleansing, analysis/mining, quality measurement, visualization, and task scheduling. | ||||||||||
Quicksql | 1,939 | 6 months ago | 84 | mit | Java | |||||
A Flexible, Fast, Federated(3F) SQL Analysis Middleware for Multiple Data Sources | ||||||||||
Sql Generator | 1,923 | 10 months ago | 1 | May 18, 2022 | 1 | apache-2.0 | Vue | |||
🔨 用 JSON 来生成结构化的 SQL 语句,基于 Vue3 + TypeScript + Vite + Ant Design + MonacoEditor 实现,项目简单(重逻辑轻页面)、适合练手~ | ||||||||||
Bigdataguide | 1,714 | 5 months ago | 7 | Java | ||||||
大数据学习,从零开始学习大数据,包含大数据学习各阶段学习视频、面试资料 |
A Datasource on top of Spark Datasource V1 APIs, that provides Spark support for Hive ACID transactions.
This datasource provides the capability to work with Hive ACID V2 tables, both Full ACID tables as well as Insert-Only tables.
functionality availability matrix
Functionality | Full ACID table | Insert Only Table |
---|---|---|
READ | >= v0.4.0 |
>= v0.4.0 |
INSERT INTO / OVERWRITE | >= v0.4.3 |
>= v0.4.4 |
CTAS | >= v0.4.3 |
>= v0.4.4 |
UPDATE | >= v0.5.0 |
Not Supported |
DELETE | >= v0.5.0 |
Not Supported |
MERGE | > v0.5.0 |
Not Supported |
STREAMING INSERT | >= v0.5.0 |
>= v0.5.0 |
Note: In case of insert only table for support of write operation compatibility check needs to be disabled
These are the pre-requisites to using this library:
Change configuration in $SPARK_HOME/conf/hive-site.xml
to point to already configured HMS server endpoint. If you meet the above pre-requisites, this is probably already configured.
<configuration>
<property>
<name>hive.metastore.uris</name>
<!-- hostname must point to the Hive metastore URI in your cluster -->
<value>thrift://hostname:10000</value>
<description>URI for spark to contact the hive metastore server</description>
</property>
</configuration>
There are a few ways to use the library while running spark-shell
`spark-shell --packages qubole:spark-acid:0.6.0-s_2.11
If you built the jar yourself, copy the spark-acid-assembly-0.6.0.jar
jar into $SPARK_HOME/assembly/target/scala.2_11/jars
and run
spark-shell
To operate on Hive ACID table from Scala / pySpark, the table can be directly accessed using this datasource. Note the short name of this datasource is HiveAcid
. Hive ACID table are tables in HiveMetastore so any operation of read and/or write needs format("HiveAcid").option("table", "<table name>"")
. Direct read and write from the file is not supported
scala> val df = spark.read.format("HiveAcid").options(Map("table" -> "default.acidtbl")).load()
scala> df.collect()
To read an existing Hive acid table through pure SQL, there are two ways:
Use SparkSession extensions framework to add a new Analyzer rule (HiveAcidAutoConvert) to Spark Analyser. This analyzer rule automatically converts an HiveTableRelation representing acid table to LogicalRelation backed by HiveAcidRelation.
To use this, initialize SparkSession with the extension builder as mentioned below:
val spark = SparkSession.builder()
.appName("Hive-acid-test")
.config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")
.enableHiveSupport()
.<OTHER OPTIONS>
.getOrCreate()
spark.sql("select * from default.acidtbl")
Create a dummy table that acts as a symlink to the original acid table. This symlink is required to instruct Spark to use this datasource against an existing table.
To create the symlink table:
spark.sql("create table symlinkacidtable using HiveAcid options ('table' 'default.acidtbl')")
spark.sql("select * from symlinkacidtable")
Note: This will produce a warning indicating that Hive does not understand this format
WARN hive.HiveExternalCatalog: Couldn’t find corresponding Hive SerDe for data source provider com.qubole.spark.hiveacid.datasource.HiveAcidDataSource. Persisting data source table default
.sparkacidtbl
into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
Please ignore it, as this is a sym table for Spark to operate with and no underlying storage.
This section talks about major functionality provided by the data source and example code snippets for them.
Same as CREATE and DROP supported by Spark SQL.
Drop Existing table
spark.sql("DROP TABLE if exists acid.acidtbl")
Create table
spark.sql("CREATE TABLE acid.acidtbl (status BOOLEAN, tweet ARRAY<STRING>, rank DOUBLE, username STRING) STORED AS ORC TBLPROPERTIES('TRANSACTIONAL' = 'true')")
Note: Table property 'TRANSACTIONAL' = 'true'
is required to create ACID table
Check if it is transactional
spark.sql("DESCRIBE extended acid.acidtbl").show()
Read acid table
val df = spark.read.format("HiveAcid").options(Map("table" -> "acid.acidtbl")).load()
df.select("status", "rank").filter($"rank" > "20").show()
Read acid via implicit API
import com.qubole.spark.hiveacid._
val df = spark.read.hiveacid("acid.acidtbl")
df.select("status", "rank").filter($"rank" > "20").show()
Same as SELECT supported by Spark SQL.
spark.sql("SELECT status, rank from acid.acidtbl where rank > 20")
Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
has to be added to spark.sql.extensions
for above.
Insert into
val df = spark.read.parquet("tbldata.parquet")
df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("append").save()
Insert overwrite
val df = spark.read.parquet("tbldata.parquet")
df.write.format("HiveAcid").option("table", "acid.acidtbl").mode("overwrite").save()
Insert into using implicit API
import com.qubole.spark.hiveacid._
val df = spark.read.parquet("tbldata.parquet")
df.write.hiveacid("acid.acidtbl", "append")
Same as INSERT supported by Spark SQL
Insert into the table select as
spark.sql("INSERT INTO acid.acidtbl select * from sample_data")
Insert overwrite the table select as
spark.sql("INSERT OVERWRITE TABLE acid.acidtbl select * from sample_data")
Insert into"
spark.sql("INSERT INTO acid.acidtbl VALUES(false, array("test"), 11.2, 'qubole')")
Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
has to be added to spark.sql.extensions
for above SQL statements.
ACID table supports streaming writes and can also be used as a Streaming Sink.
Streaming write happens under transactional guarantees which allows other
concurrent writes to the same table either streaming writes or batch writes.
For exactly-once semantics, spark.acid.streaming.log.metadataDir
is specified to
store the latest batchId processed. Note, that concurrent streaming writes
to the same table should have different metadataDir specified.
val query = newDf
.writeStream
.format("HiveAcid")
.options(Map(
"table" ->"acid.acidtbl",
"spark.acid.streaming.log.metadataDir"->"/tmp/metadataDir"))
.outputMode(OutputMode.Append)
.option("checkpointLocation", "/tmp/checkpointDir")
.start()
UPDATE tablename SET column = updateExp [, column = updateExp ...] [WHERE expression]
column
must be a column of the table being updated.updateExp
is an expression that Spark supports in the SELECT clause. Subqueries are not supported.WHERE
clause specifies the row to be updated.spark.sql("UPDATE acid.acidtbl set rank = rank - 1, status = true where rank > 20 and rank < 25 and status = false")
Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
has to be added to spark.sql.extensions
for above.
DELETE FROM tablename [WHERE expression]
WHERE
clause specifies rows to be deleted from tablename
.DELETE from acid.acidtbl where rank = 1000
Note: com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension
has to be added to spark.sql.extensions
for above.
MERGE INTO <target table> [AS T] USING <source table> [AS S]
ON <boolean merge expression>
WHEN MATCHED [AND <boolean expression1>] THEN <match_clause>
WHEN MATCHED [AND <boolean expression2>] THEN <match_clause>
WHEN NOT MATCHED [AND <boolean expression3>] THEN INSERT VALUES ( <insert value list> )
<match_clause> ::
UPDATE SET <set clause list>
DELETE
<insert value list> ::
value 1 [, value 2, value 3, ...]
[value 1, value 2, ...] * [, value n, value n+1, ...]
<update set list> ::
target_col1 = value 1 [, target_col2 = value 2 ...]
<target table>
needs to be Full ACID table. T
is optional placeholder for target alias.<source table>
needs to be a table defined. You can use functions like createOrReplaceTempView
to store source DataFrames as tables.
S
is optional placeholder for source alias.<merge expression>
are the join expressions used as merge condition<boolean expression1>
and <boolean expression2>
are optional match conditions.*
to be used anywhere in value list and it resolves into source table columns.*
resolution and
also match corresponding data type.MERGE INTO target as t USING source as s
ON t.id = s.id
WHEN MATCHED AND t.city = 'Bangalore' THEN UPDATE t.city = s.city
WHEN MATCHED AND t.dept = 'closed' THEN DELETE
WHEN NOT MATCHED AND t.city = ('Bangalore', 'San Jose') THEN INSERT VALUES (*, '07', '2020')
Right Outer Join
between target and source.
This can lead to full table scan of the target table.
Please consider partitioning the target table and only mentioning required partitions in merge condition for MERGE operations.Left Anti Join
between source and target will be performed. It will be
cheaper than Right Outer Join
between target and source.Right Outer Join
done for MERGE operation
to avoid extra Join for this check. When only INSERT clause is present this check is not done as it is not required.ACID datasource has been tested to work with Apache Spark 2.4.3, but it should work with older versions as well. However, because of a Hive dependency, this datasource needs Hadoop version 2.8.2 or higher due to HADOOP-14683
NB: Hive ACID V2 is supported in Hive 3.0.0 onwards and for that hive Metastore db needs to be upgraded to 3.0.0 or above.
ACID datasource does not control data storage format and layout, which is managed by Hive. It works with data written by Hive version 3.0.0 and above. Please see Hive ACID storage layout.
ACID datasource works with data stored on local files, HDFS as well as cloud blobstores (AWS S3, Azure Blob Storage etc).
First, build the dependencies and publish it to local. The shaded-dependencies sub-project is an sbt project to create the shaded hive metastore and hive exec jars combined into a fat jar spark-acid-shaded-dependencies
. This is required due to our dependency on Hive 3 for Hive ACID, and Spark currently only supports Hive 1.2. To compile and publish shaded dependencies jar:
cd shaded-dependencies sbt clean publishLocal
Next, build the main project:
sbt assembly
This will create the spark-acid-assembly-0.6.0.jar
which can be now used in your application.
Tests are run against a standalone docker setup. Please refer to [Docker setup] (docker/README.md) to build and start a container.
NB: Container run HMS server, HS2 Server and HDFS and listens on port 10000,10001 and 9000 respectively. So stop if you are running HMS or HDFS on same port on host machine.
To run the full integration test:
sbt test
To release a new version use
sbt release
To publish a new version use
sbt spPublish
Read more about sbt release
This datasource when it needs to read data, it talks to the HiveMetaStore Server to get the list of transactions that have been committed, and using that, the list of files it should read from the filesystem (uses s3 listing). Given the snapshot of list of file is created by using listing, to avoid inconsistent copy of data, on cloud object store service like S3 guard should be used.
This snapshot of list of files is created at the RDD level. These snapshot are at the RDD level so even when using same table in single SQL it may be operating on two different snapshots
spark.sql("select * from a join a)
The files in the snapshot needs to be protected till the RDD is in use. By design concurrent reads and writes on the Hive ACID works with the help of locks, where every client (across multiple engines) that is operating on ACID tables is expected to acquire locks for the duration of reads and writes. The lifetime of RDD can be very long, to avoid blocking other operations like inserts this datasource DOES NOT acquire lock but uses an alternative mechanism to protect reads. Other way the snapshot can be protected is by making sure the files in the snapshot are not deleted while in use. For the current datasoure any table on which Spark is operating Automatic Compaction
should be disabled. This makes sure that cleaner does not clean any file. To disable automatic compaction on table
ALTER TABLE <> SET TBLPROPERTIES ("NO_AUTO_COMPACTION"="true")
When the table is not in use cleaner can be enabled and all the files that needs cleaned will get queued up for cleaner. Disabling compaction do have performance implication on reads/writes as lot of delta file may need to be merged when performing read.
Note that even though reads are protected admin operation like TRUNCATE
ALTER TABLE DROP COLUMN
and DROP
have no protection as they clean files with intevention from cleaner. These operations should be performed when Spark is not using the table.
You can join the group for queries and discussions by sending email to: [email protected] On subscribing, you will be sent email to confirm joining the group.
We use Github Issues to track issues. Please feel free to open an issue for any queries, bugs and feature requests.
Pull Request can be raised against any open issues and are most welcome. Processes or guidelines for the same is not formal currently.
Please use the github issues for the acid-ds project to report issues or raise feature requests. You can also join this group to discuss them: [email protected]
If transaction is successful
For every transaction on partitioned table, now a null entry gets created in COMPLETED_TXN_COMPONENTS table when a transaction is moved from TXN_COMPONENTS. This entry does not get removed after compaction. Note that there would be only one such null entry in COMPLETED_TXN_COMPONENTS table for a transaction. For example if your transaction has touched 100 partitions only one entry of null partitions would get created. So the null entries should not overwhelm the table.
To delete this null entry from COMPLETED_TXN_COMPONENTS table, manually run this query once in a while on metastore db. Note that this is only applicable on partitioned tables
DELETE FROM completed_txn_components WHERE ctc_partition IS NULL AND ctc_writeid IS NULL AND ctc_table = <TABLE_NAME>
If transaction is aborted
Delete files in the object store which were written by the aborted transaction. To find out the write id the simple sql query is:
SELECT t.TXN_ID, T2W_WRITEID as WRITE_ID from TXNS as t JOIN TXN_COMPONENTS as tc ON tc.TC_TXNID = t.TXN_ID JOIN TXN_TO_WRITE_ID as tw on t.TXN_ID = tw.T2W_TXNID and t.TXN_STATE = 'a' and tc.TC_PARTITION is NULL
For example if your write ID is 4, then you will need to cleanup all delta/delta_delete/base directories with name: delta_0000004_0000004/delete_delta_0000004_0000004/base_0000004
Delete entry from TXN_COMPONENTS table for aborted transaction. Complete sql query looks like
WITH aborted_transactions AS ( SELECT t.txn_id, tw.t2w_writeid AS write_id FROM txns AS t JOIN txn_components AS tc ON t.txn_id = tc.tc_txnid JOIN txn_to_write_id AS tw ON t.txn_id = tw.t2w_txnid AND t.txn_state = 'a' AND tc.tc_partition IS NULL ) DELETE FROM txn_components WHERE tc_txnid IN (SELECT txn_id FROM aborted_transactions)