发布时间:2025-06-24 18:01:44  作者:北方职教升学中心  阅读量:753


OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取//5、可靠、

在应用系统的建设过程中,通常都会遇到需要实时处理数据的场景,处理实时数据的框架有很多,本文将以一个示例来介绍flink+kafka在流数据处理中的应用。高可用、OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常

  • 基于flink基本算子对数据进行加工

map算子:对数据流一对一的加载计算,并返回一个新的对象

sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).print();//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":1,"value":3,"ts":1734832965640,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":3,"value":10,"ts":1734832967645,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":5,"value":2,"ts":1734832969653,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":7,"value":6,"ts":1734832971657,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}//{"id":9,"value":6,"ts":1734832973662,"source":"flink"}

filter算子:对数据流进行过滤,只返回为true的数据

sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);Integer id = jsonObject.getInteger("id");return id % 2 == 0;}}).print();//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}

flink将处理之后的数据再次写到kafka中,实现数据的流动

KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(kafka_server).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(sub_topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();processResult.sinkTo(sink);
  • kafka消费者订阅对应的topic

Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "study02-ubuntu:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot1");//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String,Object> kafkaConsumer = new KafkaConsumer<>(properties);TopicPartition p0 = new TopicPartition(topic, 0);TopicPartition p1 = new TopicPartition(topic, 1);kafkaConsumer.assign(Arrays.asList(p0,p1));while (true) {ConsumerRecords<String,Object> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, Object> record : records) {//todo 处理消息System.out.println(record.value());}}//output//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink接收kafka数据通过算子计算之后再次转发到kafka中完整代码示例:
package com.yanboot.flink.connector;import com.alibaba.fastjson2.JSONObject;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.connector.base.DeliveryGuarantee;import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;import org.apache.flink.connector.kafka.sink.KafkaSink;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KafkaStreamDataProcess {private final static String kafka_server = "study02-ubuntu:9092";private final static String pub_topic = "sunlei";private final static String sub_topic = "sub_sunlei";private final static String groupId = "kafka-demo";public static void main(String[] args) throws Exception {//设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//指定并行度env.setParallelism(1);//构建kafkaSourceKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafka_server) //指定kafka服务.setTopics(pub_topic)    //指定topic.setGroupId(groupId)   //指定groupID//OffsetsInitializer.latest():一定从最早的位置开始消费//OffsetsInitializer.latest():一定从最新的位置开始消费//OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费//OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取//OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常.setStartingOffsets(OffsetsInitializer.latest())    //指定offset的位置.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器.build();DataStreamSource<String> sou = env.fromSource(kafkaSource, //指定数据源WatermarkStrategy.noWatermarks(), //指定水位线"flink kafka source");SingleOutputStreamOperator<String> processResult = sou.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {JSONObject jsonObject = JSONObject.parseObject(s);jsonObject.put("source", "flink");return jsonObject.toString();}}).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);Integer id = jsonObject.getInteger("id");return id % 2 == 0;}});processResult.print();KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(kafka_server).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(sub_topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();processResult.sinkTo(sink);//启动作业env.execute();}}

3、概念介绍

  • flink:是一个分布式、实现目标

    本文主要从下面3个步骤完成流数据的处理:

    • flink作为kafka消费者,从kafka中消费数据并将消费到的数据转换为flink数据流;

    • flink对获取到的数据流进行计算、OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费//4、可扩展的方式来处理和分析实时数据。聚合等操作;

    • flink对处理之后的数据再次写入到kafka中,实现数据的流动。OffsetsInitializer.latest():一定从最新的位置开始消费//3、高可靠的大数据处理引擎,提供了一种高效、实现步骤

      • 新建maven工程,将依赖添加到环境中

      <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <flink.version>1.20.0</flink.version>    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>  </properties><properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <java.version>21</java.version>    <flink.version>1.20.0</flink.version>    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>  </properties>  <dependencies>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-streaming-java</artifactId>      <version>${flink.version}</version>    </dependency>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-clients</artifactId>      <version>${flink.version}</version>    </dependency>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-connector-kafka</artifactId>      <version>${flink-kafka.version}</version>    </dependency>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-connector-base</artifactId>      <version>${flink.version}</version>    </dependency>    <!--    json处理    -->    <dependency>      <groupId>com.alibaba.fastjson2</groupId>      <artifactId>fastjson2</artifactId>      <version>2.0.53</version>    </dependency>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>3.8.1</version>      <scope>test</scope>    </dependency>  </dependencies>  <build>    <plugins>      <plugin>        <groupId>org.apache.maven.plugins</groupId>        <artifactId>maven-shade-plugin</artifactId>        <version>3.1.1</version>        <executions>          <execution>            <phase>package</phase>            <goals>              <goal>shade</goal>            </goals>            <configuration>              <artifactSet>                <excludes>                  <exclude>com.google.code.findbugs:jsr305</exclude>                </excludes>              </artifactSet>              <filters>                <filter>                  <!-- Do not copy the signatures in the META-INF folder.                  Otherwise, this might cause SecurityExceptions when using the JAR. -->                  <artifact>*:*</artifact>                  <excludes>                    <exclude>META-INF/*.SF</exclude>                    <exclude>META-INF/*.DSA</exclude>                    <exclude>META-INF/*.RSA</exclude>                  </excludes>                </filter>              </filters>              <transformers>                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                  <!-- Replace this with the main class of your job -->                  <mainClass>org.example.App</mainClass>                </transformer>                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>              </transformers>            </configuration>          </execution>        </executions>      </plugin>    </plugins>  </build>
      • kafka生产者负责模拟数据流生成

      System.out.println("kafka生产者启动....当前时间为:" + LocalDateTime.now());KafkaProducerStudy kafkaProducerStudy = new KafkaProducerStudy();KafkaProducer<String, Object> kafkaProducer = kafkaProducerStudy.createKfkaProducer();kafkaProducer.initTransactions();kafkaProducer.beginTransaction();for (int i = 0; i < 10; i++) {ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, kafkaProducerStudy.setKafkaUserValue(i));kafkaProducer.send(record);Thread.sleep(1000);}kafkaProducer.commitTransaction();kafkaProducer.close();System.out.println("kafkaProducer关闭当前时间为:" + LocalDateTime.now());
      • flink从kafka中获取数据流

      //构建kafkaSource数据源KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafka_server) //指定kafka服务.setTopics(pub_topic)    //指定topic.setGroupId(groupId)   //指定groupID.setStartingOffsets(OffsetsInitializer.latest())    //指定消费数据起始的位置.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器.build();//kafkaSource能够通过指定不同策略的偏移量//1、OffsetsInitializer.latest():一定从最早的位置开始消费//2、

    • flink-connector-kafka:是flink内置的kafka连接器,它允许Flink应用轻松地从Kafka中读取数据流(Source)或将数据流写入到Kafka(Sink)。

    2、

  • kafka:是用于构建实时数据管道和流应用程序并具有横向扩展,容错,wicked fast(变态快)等优点的一种消息中间件。

    1、