发布时间:2025-06-24 20:34:54 作者:北方职教升学中心 阅读量:681
同时,我还会把redis和el-search的操作,在这个模块接入并封装
3.2.1 pom引用
<dependencies><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.3</version></dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.17.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-x-content</artifactId><version>8.17.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-core</artifactId><version>5.8.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional><scope>provided</scope></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>3.4.2</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2-extension-spring6</artifactId><version>2.0.54</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.12.1</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.54</version></dependency><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.4.2.RELEASE</version></dependency><!-- Flink Redis Connector --><!-- <dependency>--><!-- <groupId>org.apache.bahir</groupId>--><!-- <artifactId>flink-connector-redis_2.12</artifactId>--><!-- <version>1.1.0</version>--><!-- </dependency>--></dependencies>
3.2.2 一些基本的entity类
@DatapublicclassGenItemEntity{Longid;Stringname;Longprice;Stringbrand;Stringspecification;Integerversion;}
四、代码展示
请道友移步码云
八、如果想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。当然,这样做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种现象叫缓存穿透。5.1.1 RedisSinkCommand
publicclassRedisSinkCommand<T>{@Setter@GetterprotectedERedisCommandcommand;@Setter@Getterprotectedlongdua;@Setter@GetterprotectedStringkey;@Setter@GetterprotectedTvalue;publicvoidinitSet(StringpKey,TpValue){command =ERedisCommand.SET;dua =300;key =pKey;value =pValue;}publicvoidinitDel(StringpKey){command =ERedisCommand.DEL;key =pKey;}}
publicenumERedisCommand{SET,DEL}
5.1.2 SpringDataRedisSink
@Slf4jpublicclassSpringDataRedisSink<T>implementsSink<RedisSinkCommand<T>>{@OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(InitContextcontext)throwsIOException{returnnull;}@OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(WriterInitContextcontext){returnnewLettuceRedisSinkWriter();}classLettuceRedisSinkWriterimplementsSinkWriter<RedisSinkCommand<T>>{@Overridepublicvoidwrite(RedisSinkCommand<T>pCmd,Contextcontext)throwsIOException,InterruptedException{RedisTemplate<String,Object>redisTemplate =RedisConfig.redisTemplate();switch(pCmd.getCommand()){caseSET->{redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());}caseDEL->{redisTemplate.delete(pCmd.getKey());}}}@Overridepublicvoidflush(booleanendOfInput)throwsIOException,InterruptedException{}@Overridepublicvoidclose()throwsException{}}}
5.2 elasticsearch的sink编写
publicclassRedisSinkCommand<T>{@Setter@GetterprotectedERedisCommandcommand;@Setter@Getterprotectedlongdua;@Setter@GetterprotectedStringkey;@Setter@GetterprotectedTvalue;publicvoidinitSet(StringpKey,TpValue){command =ERedisCommand.SET;dua =300;key =pKey;value =pValue;}publicvoidinitDel(StringpKey){command =ERedisCommand.DEL;key =pKey;}}
publicenumERedisCommand{SET,DEL}
@Slf4jpublicclassSpringDataRedisSink<T>implementsSink<RedisSinkCommand<T>>{@OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(InitContextcontext)throwsIOException{returnnull;}@OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(WriterInitContextcontext){returnnewLettuceRedisSinkWriter();}classLettuceRedisSinkWriterimplementsSinkWriter<RedisSinkCommand<T>>{@Overridepublicvoidwrite(RedisSinkCommand<T>pCmd,Contextcontext)throwsIOException,InterruptedException{RedisTemplate<String,Object>redisTemplate =RedisConfig.redisTemplate();switch(pCmd.getCommand()){caseSET->{redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());}caseDEL->{redisTemplate.delete(pCmd.getKey());}}}@Overridepublicvoidflush(booleanendOfInput)throwsIOException,InterruptedException{}@Overridepublicvoidclose()throwsException{}}}
elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑
5.2.1 ElCommand
@DatapublicclassElCommand<T>{protectedEElCommandcommand;protectedStringindex;protectedTentity;protectedStringid;}
publicenumEElCommand{CREATE,UPDATE,DELETE}
5.2.2 ElSearchSink
publicclassElSearchSink<T>implementsSink<ElCommand<T>>{@OverridepublicSinkWriter<ElCommand<T>>createWriter(InitContextcontext)throwsIOException{returnnull;}@OverridepublicSinkWriter<ElCommand<T>>createWriter(WriterInitContextcontext){returnnewElSearchSink.ElSearchSinkWriter();}classElSearchSinkWriterimplementsSinkWriter<ElCommand<T>>{@Overridepublicvoidwrite(ElCommand<T>pCmd,Contextcontext)throwsIOException,InterruptedException{ElasticSearchClientProviderelasticSearchClientProvider =ElasticSearchClientProvider.getInstance();ElasticsearchClientelClient =elasticSearchClientProvider.get();Stringindex =pCmd.getIndex();Stringid =pCmd.getId();Tentity =pCmd.getEntity();switch(pCmd.getCommand()){caseCREATE,UPDATE->{elClient.index(i->i.index(index).id(id).document(entity));}caseDELETE->{elClient.delete(d->d.index(index).id(id));}}}@Overridepublicvoidflush(booleanendOfInput)throwsIOException,InterruptedException{}@Overridepublicvoidclose()throwsException{}}}
六、
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要这样配置即可:<dependency><groupId>indi.zhifa.study2025</groupId><artifactId>common-data</artifactId><version>${project.version}</version><scope>compile</scope></dependency>
<dependency><groupId>indi.zhifa.study2025</groupId><artifactId>common-data</artifactId><version>${project.version}</version><scope>compile</scope></dependency>
注意,重点是<scope>compile</scope>
publicclassEsClientConfig{@Setter@GetterprivateStringhost;@Setter@GetterprivateIntegerport;@Setter@GetterprivateStringapiKey;}
publicclassElasticSearchClientProvider{privateEsClientConfigesClientConfig;privateRestClientBuilderbuilder;publicElasticSearchClientProvider(){try{init();}catch(Exceptione){e.printStackTrace();}}publicvoidinit()throwsIOException{Yamlyaml =newYaml();try(InputStreaminputStream =FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")){if(inputStream ==null){thrownewIllegalArgumentException("File not found: el-config.yml");}esClientConfig =yaml.loadAs(inputStream,EsClientConfig.class);}catch(Exceptione){thrownewRuntimeException("Failed to load YAML file",e);}SSLContextsslContext;try(InputStreaminputStream =FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){sslContext =TransportUtils.sslContextFromHttpCaCrt(inputStream);}catch(Exceptione){thrownewRuntimeException("Failed to load http_ca.crt",e);}builder =RestClient.builder(newHttpHost(esClientConfig.getHost(),esClientConfig.getPort(),"https")// 替换为你的Elasticsearch地址).setDefaultHeaders(newHeader[]{newBasicHeader("Authorization","ApiKey "+esClientConfig.getApiKey())}).setFailureListener(newRestClient.FailureListener(){@OverridepublicvoidonFailure(Nodenode){super.onFailure(node);}}).setHttpClientConfigCallback(hc->hc.setSSLContext(sslContext));}publicElasticsearchClientget(){RestClientrestClient =builder.build();ElasticsearchTransporttransport =newRestClientTransport(restClient,newJacksonJsonpMapper());ElasticsearchClientesClient =newElasticsearchClient(transport);returnesClient;}publicstaticElasticSearchClientProvidergetInstance(){returnHolder.INSTANCE;}privatestaticclassHolder{privatestaticfinalElasticSearchClientProviderINSTANCE=newElasticSearchClientProvider();}}
五、 redis操作和elsearch操作的封装
4.1 redis操作的封装
在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis
publicclassRedisConfig{publicRedisConfig(){init();}protectedFastJsonConfigredisFastJson(){FastJsonConfigconfig =newFastJsonConfig();config.setWriterFeatures(JSONWriter.Feature.WriteNullListAsEmpty,// 写入类名JSONWriter.Feature.WriteClassName,// 将 Boolean 类型的 null 转成 falseJSONWriter.Feature.WriteNullBooleanAsFalse,JSONWriter.Feature.WriteEnumsUsingName);config.setReaderFeatures(JSONReader.Feature.SupportClassForName,// 支持autoTypeJSONReader.Feature.SupportAutoType);returnconfig;}protectedFastJsonRedisSerializerfastJsonRedisSerializer(FastJsonConfigpFastJsonConfig){FastJsonRedisSerializerfastJsonRedisSerializer =newFastJsonRedisSerializer(Object.class);fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);returnfastJsonRedisSerializer;}protectedRedisConnectionFactoryredisConnectionFactory(){// 这里最好读配置,我懒得搞了RedisStandaloneConfigurationredisConfiguration =newRedisStandaloneConfiguration("192.168.0.64",6379);redisConfiguration.setPassword("study@2025");GenericObjectPoolConfig<?>poolConfig =newGenericObjectPoolConfig<>();poolConfig.setMaxTotal(2);// 最大连接数poolConfig.setMaxIdle(2);// 最大空闲连接数poolConfig.setMinIdle(2);// 最小空闲连接数poolConfig.setMaxWait(Duration.ofMillis(3000));// 连接等待时间ClientResourcesclientResources =DefaultClientResources.create();LettucePoolingClientConfigurationlettucePoolingClientConfiguration =LettucePoolingClientConfiguration.builder().poolConfig(poolConfig).build();LettucePoolingClientConfigurationclientConfig =LettucePoolingClientConfiguration.builder().clientResources(clientResources).commandTimeout(Duration.ofSeconds(5)).poolConfig(poolConfig).build();LettuceConnectionFactoryredisConnectionFactory =newLettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);redisConnectionFactory.afterPropertiesSet();// 初始化连接工厂returnredisConnectionFactory;}protectedRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryfactory,FastJsonRedisSerializerpFastJsonRedisSerializer){RedisTemplate<String,Object>redisTemplate =newRedisTemplate<String,Object>();redisTemplate.setConnectionFactory(factory);redisTemplate.setEnableTransactionSupport(true);redisTemplate.setKeySerializer(newStringRedisSerializer());redisTemplate.setValueSerializer(pFastJsonRedisSerializer);redisTemplate.setHashKeySerializer(newStringRedisSerializer());redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);returnredisTemplate;}protectedvoidinit(){mFastJsonConfig =redisFastJson();mFastJsonRedisSerializer =fastJsonRedisSerializer(mFastJsonConfig);mRedisConnectionFactory =redisConnectionFactory();mRedisTemplate =redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);mRedisTemplate.afterPropertiesSet();}privateFastJsonConfigmFastJsonConfig;privateFastJsonRedisSerializermFastJsonRedisSerializer;privateRedisConnectionFactorymRedisConnectionFactory;privateRedisTemplate<String,Object>mRedisTemplate;publicstaticRedisTemplate<String,Object>redisTemplate(){returnHolder.INSTANCE.mRedisTemplate;}publicstatic<T>Stringserialize(Tentity){returnJSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());}privatestaticclassHolder{privatestaticfinalRedisConfigINSTANCE=newRedisConfig();}}
4.2 elasticsearch操作的封装
由于el-search的连接器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0
2.1 docker-compose.yml
services:master:image:mysql:8.0.41 container_name:mysql-8restart:always #mem_limit: 512Menvironment:MYSQL_ROOT_PASSWORD:study@2025 TZ:Asia/Shanghai ports:-"3307:3306"volumes:-./cfg/my.cnf:/etc/my.cnf -./data:/var/lib/mysql -./initdb:/docker-entrypoint-initdb.d -./dump:/var/dump -./log:/var/log networks:-mysql-clusternetworks:mysql-cluster:
2.2 初始化sql
-- 创建复制用户createrole role_app;GRANTSELECT,UPDATE,INSERT,DELETEON*.*torole_app;GRANTREPLICATIONSLAVE,REPLICATIONCLIENT ON*.*TOrole_app;CREATEUSER'app'@'%'IDENTIFIED WITHcaching_sha2_password by'study@2025'DEFAULTROLE role_app COMMENT'app user';FLUSH PRIVILEGES;-- 创建两个数据库,用于测试CREATESCHEMA`shop-center`;FLUSH TABLESWITHREADLOCK;
2.3 注意点
首先把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来
dockerexec<id>cp/etc/my.cnf ./cfg/my.cnf
之后把注释打开,再重新启动
三、 redis和elsearch的自定义sink编写
5.1 redis的sink编写
我们希望传入redis时,数据是被处理好的,redis的sink不需要处理任何逻辑,只管更新缓存和删除缓存。因为es中需要反范式设计,不可能用1张表的数据做es查询数据的。
publicclassCacheService{@AutowiredprivateRedissonClientredissonClient;@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@AutowiredprivateDataRepositorydataRepository;publicObjectgetData(Stringkey){// 第一次检查缓存Objectvalue =redisTemplate.opsForValue().get(key);if(value !=null){returnvalue;}RLocklock =redissonClient.getLock(key +":LOCK");try{// 尝试加锁,设置锁超时时间防止死锁if(lock.tryLock(5,30,TimeUnit.SECONDS)){try{// 双重检查缓存value =redisTemplate.opsForValue().get(key);if(value !=null){returnvalue;}// 查询数据库ObjectdbData =dataRepository.findById(key);// 更新缓存,设置合理过期时间redisTemplate.opsForValue().set(key,dbData,1,TimeUnit.HOURS);returndbData;}finally{lock.unlock();}}else{// 未获取到锁,短暂等待后重试Thread.sleep(100);returnredisTemplate.opsForValue().get(key);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();thrownewRuntimeException("获取锁失败",e);}}}
8.2 es相关
对于es,其实更新数据不建议采用这种方式。而这类软件,有着复杂的业务逻辑。mysql环境搭建
需要注意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
一、
publicclassFlinkMain{publicstaticvoidmain(String[]args)throwsException{MySqlSource<String>mySqlSource =MySqlSource.<String>builder().hostname("192.168.0.64").port(3307).databaseList("shop-center")// set captured database.tableList("shop-center.item")// set captured table.username("app").password("study@2025").serverTimeZone("Asia/Shanghai").deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.startupOptions(StartupOptions.latest()).includeSchemaChanges(true).build();// FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()// .setHost("192.168.0.64") // 替换为 Redis 主机// .setPort(6379) // Redis 端口// .setPassword("ilv0404@1314") // 如果有密码,设置密码// .build();StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// DataStream<BinlogInfo> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")// .map(str->{// BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);// return res;//}// ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));//// mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));DataStream<RedisSinkCommand<GenItemEntity>>newMysqlStream =env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"Mysql source to redis").map(str->JSONObject.parseObject(str,newTypeReference<BinlogInfo<GenItemEntity>>(){}),TypeInformation.of(newTypeHint<BinlogInfo<GenItemEntity>>(){})).filter(bi->bi.getSource().getTable().equals("item")&&(bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"))).map(bi->{Stringop =bi.getOp();GenItemEntityitemEntity =bi.getAfter();Stringkey ="item:"+itemEntity.getId();switch(op){case"c","u"->{RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initSet(key,itemEntity);returnredisSinkCommand;}case"d"->{RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initDel(key);returnredisSinkCommand;}default->{RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initDel(key);returnredisSinkCommand;}}},TypeInformation.of(newTypeHint<RedisSinkCommand<GenItemEntity>>(){}));newMysqlStream.sinkTo(newSpringDataRedisSink<GenItemEntity>());DataStream<ElCommand<GenItemEntity>>mySqlToElStream =env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"Mysql source to el").map(str->JSONObject.parseObject(str,newTypeReference<BinlogInfo<GenItemEntity>>(){}),TypeInformation.of(newTypeHint<BinlogInfo<GenItemEntity>>(){})).filter(bi->bi.getSource().getTable().equals("item")&&(bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"))).map(bi->{ElCommandelCommand =newElCommand();GenItemEntityitemEntity =bi.getAfter();elCommand.setId(itemEntity.getId().toString());elCommand.setEntity(itemEntity);elCommand.setIndex("item_npc");Stringop =bi.getOp();switch(op){case"c"->elCommand.setCommand(EElCommand.CREATE);case"u"->elCommand.setCommand(EElCommand.UPDATE);case"d"->elCommand.setCommand(EElCommand.DELETE);}returnelCommand;},TypeInformation.of(newTypeHint<ElCommand<GenItemEntity>>(){}));mySqlToElStream.sinkTo(newElSearchSink());env.execute();}}
七、工程搭建与pom引用
3.1 主模块pom引用
flink程序不需要接入Spring框架,直接一个main就可运行。相关实践的思考
8.1 redis相关
我这里的代码,仅仅是学习用的。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。
对于电商系统的商品查询,我们可以在商品上架的时候更新es。在真实项目中,redis缓存的更新,通常源于查询时,如果发现缓存中没有数据,则查mysql,并把缓存数据加入redis。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。
二、需求背景
在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产品做出来,并没有考虑缓存问题。并且商品商家状态下,不允许修改商品。
但我们还想使用一些我们熟悉的接口,来操作redis和el。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>1.20.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.3.0</version></dependency>
3.2 common-data模块
一些entity数据,为了保持各模块共通,最好独立到一个common模块。