发布时间:2025-06-24 18:19:15  作者:北方职教升学中心  阅读量:395


版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list​如果没有ws1,则创建[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session...省略若干日志输出...Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1(   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读  `partition` BIGINT METADATA VIRTUAL,  `offset` BIGINT METADATA VIRTUAL,id int, ts bigint , vc int )WITH (  'connector' = 'kafka',  'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',  'properties.group.id' = 'test',-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'  'scan.startup.mode' = 'earliest-offset',  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区'sink.partitioner' = 'fixed',  'topic' = 'ws1',  'format' = 'json');

可以往kafka读数据,也可以往kafka写数据。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2(     id int ,     sumVC int ,    primary key (id) NOT ENFORCED )WITH (  'connector' = 'upsert-kafka',  'properties.bootstrap.servers' = 'node2:9092',  'topic' = 'ws2',  'key.format' = 'json',  'value.format' = 'json');

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

CREATE TABLE source (     id INT,     ts BIGINT,     vc INT) WITH (     'connector' = 'datagen',     'rows-per-second'='1',     'fields.id.kind'='random',     'fields.id.min'='1',     'fields.id.max'='10',     'fields.ts.kind'='sequence',     'fields.ts.start'='1',     'fields.ts.end'='1000000',     'fields.vc.kind'='random',     'fields.vc.min'='1',     'fields.vc.max'='100');

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( >   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读>   `partition` BIGINT METADATA VIRTUAL,>   `offset` BIGINT METADATA VIRTUAL,> id int, > ts bigint , > vc int )> WITH (>   'connector' = 'kafka',>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',>   'properties.group.id' = 'test',> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'>   'scan.startup.mode' = 'earliest-offset',>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区> 'sink.partitioner' = 'fixed',>   'topic' = 'ws1',>   'format' = 'json'> );[INFO] Execute statement succeed.​Flink SQL> CREATE TABLE source ( >     id INT, >     ts BIGINT, >     vc INT> ) WITH ( >     'connector' = 'datagen', >     'rows-per-second'='1', >     'fields.id.kind'='random', >     'fields.id.min'='1', >     'fields.id.max'='10', >     'fields.ts.kind'='sequence', >     'fields.ts.start'='1', >     'fields.ts.end'='1000000', >     'fields.vc.kind'='random', >     'fields.vc.min'='1', >     'fields.vc.max'='100'> );[INFO] Execute statement succeed.​Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:80322024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.insert into t1(id,ts,vc) select * from source;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: b1765f969c3ae637bd4c8100efbb0c4e​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( >   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读>   `partition` BIGINT METADATA VIRTUAL,>   `offset` BIGINT METADATA VIRTUAL,> id int, > ts bigint , > vc int )> WITH (>   'connector' = 'kafka',>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',>   'properties.group.id' = 'test',> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'>   'scan.startup.mode' = 'earliest-offset',>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区> 'sink.partitioner' = 'fixed',>   'topic' = 'ws1',>   'format' = 'json'> );[INFO] Execute statement succeed.​Flink SQL> CREATE TABLE source ( >     id INT, >     ts BIGINT, >     vc INT> ) WITH ( >     'connector' = 'datagen', >     'rows-per-second'='1', >     'fields.id.kind'='random', >     'fields.id.min'='1', >     'fields.id.max'='10', >     'fields.ts.kind'='sequence', >     'fields.ts.start'='1', >     'fields.ts.end'='1000000', >     'fields.vc.kind'='random', >     'fields.vc.min'='1', >     'fields.vc.max'='100'> );[INFO] Execute statement succeed.​Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:80322024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.insert into t1(id,ts,vc) select * from source;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: 84292f84d1fce4756ccd8ae294b6163a​​Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:80322024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.select * from t1;[INFO] Result retrieval cancelled.​Flink SQL> ​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;+------------+| table name |+------------+|     source ||         t1 ||         t2 |+------------+3 rows in set​Flink SQL> drop table source;Flink SQL> drop table t1;Flink SQL> drop table t2;

创建表

CREATE TABLE source (     id INT,     ts BIGINT,     vc INT) WITH (     'connector' = 'datagen',     'rows-per-second'='1',     'fields.id.kind'='random',     'fields.id.min'='1',     'fields.id.max'='10',     'fields.ts.kind'='sequence',     'fields.ts.start'='1',     'fields.ts.end'='1000000',     'fields.vc.kind'='random',     'fields.vc.min'='1',     'fields.vc.max'='100');
CREATE TABLE t2(     id int ,     sumVC int ,    primary key (id) NOT ENFORCED )WITH (  'connector' = 'upsert-kafka',  'properties.bootstrap.servers' = 'node2:9092',  'topic' = 'ws2',  'key.format' = 'json',  'value.format' = 'json');

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!