Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Bigdata Notes | 13,291 | 4 months ago | 33 | Java | ||||||
大数据入门指南 :star: | ||||||||||
Flink Learning | 13,198 | 3 months ago | apache-2.0 | Java | ||||||
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》 | ||||||||||
God Of Bigdata | 7,992 | 2 months ago | 2 | |||||||
专注大数据学习面试,大数据成神之路开启。Flink/Spark/Hadoop/Hbase/Hive... | ||||||||||
Alink | 3,343 | 1 | 2 months ago | 16 | September 08, 2022 | 48 | apache-2.0 | Java | ||
Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform. | ||||||||||
Bigdataguide | 1,994 | 13 days ago | Java | |||||||
大数据学习,从零开始学习大数据,包含大数据学习各阶段学习视频、面试资料 | ||||||||||
Flinkstreamsql | 1,873 | 7 months ago | 86 | apache-2.0 | Java | |||||
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法 | ||||||||||
Szt Bigdata | 1,702 | 6 months ago | 15 | other | Scala | |||||
深圳地铁大数据客流分析系统🚇🚄🌟 | ||||||||||
Flink Streaming Platform Web | 1,562 | 2 months ago | 27 | mit | Java | |||||
基于flink的实时流计算web平台 | ||||||||||
Bigdata Interview | 1,397 | 2 years ago | n,ull | |||||||
:dart: :star2:[大数据面试题]分享自己在网络上收集的大数据相关的面试题以及自己的答案总结.目前包含Hadoop/Hive/Spark/Flink/Hbase/Kafka/Zookeeper框架的面试题知识总结 | ||||||||||
Meetup | 1,332 | a month ago | 1 | mit | Go | |||||
【❤️ 互联网最全大厂技术分享PPT 👍🏻 持续更新中!】🍻各大技术交流会、活动资料汇总 ,如 👉QCon👉全球运维技术大会 👉 GDG 👉 全球技术领导力峰会👉大前端大会👉架构师峰会👉敏捷开发DevOps👉OpenResty👉Elastic,欢迎 PR / Issues |
Real-time Data Warehouse using: Flink & Kafka | Flink & Hudi | Spark & Delta | Flink & Hudi & E-commerce
docker compose build
docker compose up -d
docker compose ps
You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).
Start the Postgres client to have a look at the source tables and run some DML statements later:
docker compose exec postgres env PGOPTIONS="--search_path=claims" bash -c 'psql -U $POSTGRES_USER postgres'
SELECT * FROM information_schema.tables WHERE table_schema = 'claims';
Start the Debezium Postgres connector
using the configuration provided in the register-postgres.json
file:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-members.json
Check that the connector is running:
curl http://localhost:8083/connectors/claims-connector/status # | jq
The first time it connects to a Postgres server, Debezium takes
a consistent snapshot
of all database schemas; so, you should see that the pre-existing records in the accident_claims
table have already
been pushed into your Kafka topic:
docker compose exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic pg_claims.claims.accident_claims
Have a quick read about the structure of these events in the Debezium documentation.
In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes are propagated all the way to your Kafka topic:
INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type,accident_detail, claim_date, claim_status) VALUES (500, 'PharetraMagnaVestibulum.tiff', 'AUD', 321, '2020-08-01 06:43:03', 'Collision', 'Blue Ringed Octopus','2020-08-10 09:39:31', 'INITIAL');
UPDATE accident_claims
SET claim_total_receipt = 'CorrectReceipt.pdf'
WHERE claim_id = 1001;
DELETE
FROM accident_claims
WHERE claim_id = 1001;
In the output of your Kafka console consumer, you should now see three consecutive events with op
values equal
to c
(an insert event), u
(an update event) and d
(a delete event).
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/ https://flink-packages.org/categories/connectors https://github.com/knaufk/flink-faker/
Start the Flink SQL Client:
docker compose exec sql-client ./sql-client.sh
OR
docker compose exec sql-client ./sql-client-submit.sh
test
CREATE TABLE t1(
uuid VARCHAR(20), -- you can use 'PRIMARY KEY NOT ENFORCED' syntax to mark the field as record key
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '/data/t1',
'write.tasks' = '1', -- default is 4 ,required more resource
'compaction.tasks' = '1', -- default is 10 ,required more resource
'table.type' = 'COPY_ON_WRITE', -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
'read.tasks' = '1', -- default is 4 ,required more resource
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.streaming.start-commit' = '20210712134429', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
SELECT * FROM t1;
Register a Postgres catalog , so you can access the metadata of the external tables over JDBC:
CREATE CATALOG datasource WITH (
'type'='jdbc',
'property-version'='1',
'base-url'='jdbc:postgresql://postgres:5432/',
'default-database'='postgres',
'username'='postgres',
'password'='postgres'
);
CREATE DATABASE IF NOT EXISTS datasource;
CREATE TABLE datasource.accident_claims WITH (
'connector' = 'kafka',
'topic' = 'pg_claims.claims.accident_claims',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'accident_claims-consumer-group',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
) LIKE datasource.postgres.`claims.accident_claims` (EXCLUDING ALL);
OR generate data from datagen connector:
CREATE TABLE datasource.accident_claims(
claim_id BIGINT,
claim_total DOUBLE,
claim_total_receipt VARCHAR(50),
claim_currency VARCHAR(3),
member_id INT,
accident_date DATE,
accident_type VARCHAR(20),
accident_detail VARCHAR(20),
claim_date DATE,
claim_status VARCHAR(10),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
and members
table:
CREATE TABLE datasource.members WITH (
'connector' = 'kafka',
'topic' = 'pg_claims.claims.members',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'members-consumer-group',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
) LIKE datasource.postgres.`claims.members` ( EXCLUDING OPTIONS);
OR generate data from datagen connector:
CREATE TABLE datasource.members(
id BIGINT,
first_name VARCHAR(50),
last_name VARCHAR(50),
address VARCHAR(50),
address_city VARCHAR(10),
address_country VARCHAR(10),
insurance_company VARCHAR(25),
insurance_number VARCHAR(50),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
Check data:
SELECT * FROM datasource.accident_claims;
SELECT * FROM datasource.members;
Create a database in DWD layer:
CREATE DATABASE IF NOT EXISTS dwd;
CREATE TABLE dwd.accident_claims
(
claim_id BIGINT,
claim_total DOUBLE,
claim_total_receipt VARCHAR(50),
claim_currency VARCHAR(3),
member_id INT,
accident_date DATE,
accident_type VARCHAR(20),
accident_detail VARCHAR(20),
claim_date DATE,
claim_status VARCHAR(10),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3),
ds DATE,
PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
'connector'='hudi',
'path' = '/data/dwd/accident_claims',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'write.batch.size' = '1',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.delta_seconds' = '60',
'write.precombine.field' = 'ts_updated',
'read.tasks' = '1',
'read.streaming.check-interval' = '5',
'read.streaming.start-commit' = '20210712134429',
'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwd.members
(
id BIGINT,
first_name VARCHAR(50),
last_name VARCHAR(50),
address VARCHAR(50),
address_city VARCHAR(10),
address_country VARCHAR(10),
insurance_company VARCHAR(25),
insurance_number VARCHAR(50),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3),
ds DATE,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
'connector'='hudi',
'path'='/data/dwd/members',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'write.batch.size' = '1',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.delta_seconds' = '60',
'write.precombine.field' = 'ts_updated',
'read.tasks' = '1',
'read.streaming.check-interval' = '5',
'read.streaming.start-commit' = '20210712134429',
'index.bootstrap.enabled' = 'true'
);
and submit a continuous query to the Flink cluster that will write the data from datasource into dwd table(ES):
INSERT INTO dwd.accident_claims
SELECT claim_id,
claim_total,
claim_total_receipt,
claim_currency,
member_id,
CAST (accident_date as DATE),
accident_type,
accident_detail,
CAST (claim_date as DATE),
claim_status,
CAST (ts_created as TIMESTAMP),
CAST (ts_updated as TIMESTAMP),
claim_date
--CAST (SUBSTRING(claim_date, 0, 9) as DATE)
FROM datasource.accident_claims;
INSERT INTO dwd.members
SELECT id,
first_name,
last_name,
address,
address_city,
address_country,
insurance_company,
insurance_number,
CAST (ts_created as TIMESTAMP),
CAST (ts_updated as TIMESTAMP),
ts_created
--CAST (SUBSTRING(ts_created, 0, 9) as DATE)
FROM datasource.members;
Check data:
SELECT * FROM dwd.accident_claims;
SELECT * FROM dwd.members;
Create a database in DWB layer:
CREATE DATABASE IF NOT EXISTS dwb;
CREATE TABLE dwb.accident_claims
(
claim_id BIGINT,
claim_total DOUBLE,
claim_total_receipt VARCHAR(50),
claim_currency VARCHAR(3),
member_id INT,
accident_date DATE,
accident_type VARCHAR(20),
accident_detail VARCHAR(20),
claim_date DATE,
claim_status VARCHAR(10),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3),
ds DATE,
PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
'connector'='hudi',
'path' = '/data/dwb/accident_claims',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'write.batch.size' = '1',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.delta_seconds' = '60',
'write.precombine.field' = 'ts_updated',
'read.tasks' = '1',
'read.streaming.check-interval' = '5',
'read.streaming.start-commit' = '20210712134429',
'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwb.members
(
id BIGINT,
first_name VARCHAR(50),
last_name VARCHAR(50),
address VARCHAR(50),
address_city VARCHAR(10),
address_country VARCHAR(10),
insurance_company VARCHAR(25),
insurance_number VARCHAR(50),
ts_created TIMESTAMP(3),
ts_updated TIMESTAMP(3),
ds DATE,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
'connector'='hudi',
'path'='/data/dwb/members',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'write.batch.size' = '1',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.delta_seconds' = '60',
'write.precombine.field' = 'ts_updated',
'read.tasks' = '1',
'read.streaming.check-interval' = '5',
'read.streaming.start-commit' = '20210712134429',
'index.bootstrap.enabled' = 'true'
);
INSERT INTO dwb.accident_claims
SELECT claim_id,
claim_total,
claim_total_receipt,
claim_currency,
member_id,
accident_date,
accident_type,
accident_detail,
claim_date,
claim_status,
ts_created,
ts_updated,
ds
FROM dwd.accident_claims;
INSERT INTO dwb.members
SELECT id,
first_name,
last_name,
address,
address_city,
address_country,
insurance_company,
insurance_number,
ts_created,
ts_updated,
ds
FROM dwd.members;
Check data:
SELECT * FROM dwb.accident_claims;
SELECT * FROM dwb.members;
Create a database in DWS layer:
CREATE DATABASE IF NOT EXISTS dws;
CREATE TABLE dws.insurance_costs
(
es_key STRING PRIMARY KEY NOT ENFORCED,
insurance_company STRING,
accident_detail STRING,
accident_agg_cost DOUBLE
) WITH (
'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'agg_insurance_costs'
);
and submit a continuous query to the Flink cluster that will write the aggregated insurance costs
per insurance_company
, bucketed by accident_detail
(or, what animals are causing the most harm in terms of costs):
INSERT INTO dws.insurance_costs
SELECT UPPER(SUBSTRING(m.insurance_company, 0, 4) || '_' || SUBSTRING(ac.accident_detail, 0, 4)) es_key,
m.insurance_company,
ac.accident_detail,
SUM(ac.claim_total) member_total
FROM dwb.accident_claims ac
JOIN dwb.members m
ON ac.member_id = m.id
WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;
Finally, create a simple dashboard in Kibana