发布时间:2025-06-24 18:01:44 作者:北方职教升学中心 阅读量:753
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取//5、可靠、
在应用系统的建设过程中,通常都会遇到需要实时处理数据的场景,处理实时数据的框架有很多,本文将以一个示例来介绍flink+kafka在流数据处理中的应用。高可用、OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
基于flink基本算子对数据进行加工
map算子:对数据流一对一的加载计算,并返回一个新的对象
sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).print();//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":1,"value":3,"ts":1734832965640,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":3,"value":10,"ts":1734832967645,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":5,"value":2,"ts":1734832969653,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":7,"value":6,"ts":1734832971657,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}//{"id":9,"value":6,"ts":1734832973662,"source":"flink"}
filter算子:对数据流进行过滤,只返回为true的数据
sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);Integer id = jsonObject.getInteger("id");return id % 2 == 0;}}).print();//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink将处理之后的数据再次写到kafka中,实现数据的流动
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(kafka_server).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(sub_topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();processResult.sinkTo(sink);
kafka消费者订阅对应的topic
Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "study02-ubuntu:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot1");// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String,Object> kafkaConsumer = new KafkaConsumer<>(properties);TopicPartition p0 = new TopicPartition(topic, 0);TopicPartition p1 = new TopicPartition(topic, 1);kafkaConsumer.assign(Arrays.asList(p0,p1));while (true) {ConsumerRecords<String,Object> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, Object> record : records) {//todo 处理消息System.out.println(record.value());}}//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink接收kafka数据通过算子计算之后再次转发到kafka中完整代码示例:
package com.yanboot.flink.connector;import com.alibaba.fastjson2.JSONObject;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.connector.base.DeliveryGuarantee;import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;import org.apache.flink.connector.kafka.sink.KafkaSink;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KafkaStreamDataProcess {private final static String kafka_server = "study02-ubuntu:9092";private final static String pub_topic = "sunlei";private final static String sub_topic = "sub_sunlei";private final static String groupId = "kafka-demo";public static void main(String[] args) throws Exception {//设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//指定并行度env.setParallelism(1);//构建kafkaSourceKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafka_server) //指定kafka服务.setTopics(pub_topic) //指定topic.setGroupId(groupId) //指定groupID//OffsetsInitializer.latest():一定从最早的位置开始消费//OffsetsInitializer.latest():一定从最新的位置开始消费//OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费//OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取//OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常.setStartingOffsets(OffsetsInitializer.latest()) //指定offset的位置.setValueOnlyDeserializer(new SimpleStringSchema()) //指定反序列化器.build();DataStreamSource<String> sou = env.fromSource(kafkaSource, //指定数据源WatermarkStrategy.noWatermarks(), //指定水位线"flink kafka source");SingleOutputStreamOperator<String> processResult = sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);Integer id = jsonObject.getInteger("id");return id % 2 == 0;}});processResult.print();KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(kafka_server).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(sub_topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();processResult.sinkTo(sink);//启动作业env.execute();}}
3、概念介绍
flink:是一个分布式、实现目标
本文主要从下面3个步骤完成流数据的处理:
flink作为kafka消费者,从kafka中消费数据并将消费到的数据转换为flink数据流;
flink对获取到的数据流进行计算、OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费//4、可扩展的方式来处理和分析实时数据。聚合等操作;
flink对处理之后的数据再次写入到kafka中,实现数据的流动。OffsetsInitializer.latest():一定从最新的位置开始消费//3、高可靠的大数据处理引擎,提供了一种高效、实现步骤
新建maven工程,将依赖添加到环境中
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.20.0</flink.version> <flink-kafka.version>3.3.0-1.20</flink-kafka.version> </properties><properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>21</java.version> <flink.version>1.20.0</flink.version> <flink-kafka.version>3.3.0-1.20</flink-kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink-kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- json处理 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.53</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- Replace this with the main class of your job --> <mainClass>org.example.App</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
kafka生产者负责模拟数据流生成
System.out.println("kafka生产者启动....当前时间为:" + LocalDateTime.now());KafkaProducerStudy kafkaProducerStudy = new KafkaProducerStudy();KafkaProducer<String, Object> kafkaProducer = kafkaProducerStudy.createKfkaProducer();kafkaProducer.initTransactions();kafkaProducer.beginTransaction();for (int i = 0; i < 10; i++) {ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, kafkaProducerStudy.setKafkaUserValue(i));kafkaProducer.send(record);Thread.sleep(1000);}kafkaProducer.commitTransaction();kafkaProducer.close();System.out.println("kafkaProducer关闭当前时间为:" + LocalDateTime.now());
flink从kafka中获取数据流
//构建kafkaSource数据源KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafka_server) //指定kafka服务.setTopics(pub_topic) //指定topic.setGroupId(groupId) //指定groupID.setStartingOffsets(OffsetsInitializer.latest()) //指定消费数据起始的位置.setValueOnlyDeserializer(new SimpleStringSchema()) //指定反序列化器.build();//kafkaSource能够通过指定不同策略的偏移量//1、OffsetsInitializer.latest():一定从最早的位置开始消费//2、
flink-connector-kafka:是flink内置的kafka连接器,它允许Flink应用轻松地从Kafka中读取数据流(Source)或将数据流写入到Kafka(Sink)。
2、
kafka:是用于构建实时数据管道和流应用程序并具有横向扩展,容错,wicked fast(变态快)等优点的一种消息中间件。
1、