发布时间:2025-06-24 18:40:57 作者:北方职教升学中心 阅读量:339
经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。
1.使用案例和讲解
1.读取数据案例
CREATE TABLE orders ( `order_id` STRING, `price` STRING, `order_time` STRING, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( 'connector' = 'redis', 'mode' = 'single', 'single.host' = '192.168.10.101', 'single.port' = '6379', 'password' = 'xxxxxx', 'command' = 'hgetall', 'key' = 'orders');select * from orders#集群模式create table redis_sink (site_id STRING,inverter_id STRING,start_time STRING,PRIMARY KEY(site_id) NOT ENFORCED) WITH ('connector' = 'redis','mode' = 'cluster','cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003','password' = '123123','command' = 'hgetall','key' = 'site_inverter')cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3
注:redis表必须定义主键,可以是单个主键,也可以是联合主键
以下为sql读取结果,直接将redis数据解析成我们需要的表格形式
2.写入数据案例
1. generate source dataCREATE TABLE order_source ( `order_number` BIGINT, `price` DECIMAL(32,2), `order_time` TIMESTAMP(3), PRIMARY KEY(order_id) NOT ENFORCED) WITH ('connector' = 'datagen','number-of-rows' = '5','fields.order_number.min' = '1','fields.order_number.max' = '20','fields.price.min' = '1001','fields.price.max' = '1100');2. define redis sink table CREATE TABLE orders ( `order_number` STRING, `price` STRING, `order_time` STRING, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( 'connector' = 'redis', 'mode' = 'single', 'single.host' = '192.168.10.101', 'single.port' = '6379', 'password' = 'xxxxxx', 'command' = 'hmset', 'key' = 'orders');3. insert data to redis sink table (cast data type to string)insert into redis_sink select cast(order_number as STRING) order_number, cast(price as STRING) price, cast(order_time as STRING) order_time from orders
redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键
3.目前支持的功能
1. 该connector目前支持多个写入和读取命令:
读取: get hget hgetall hscan lrange smembers zrange
写入: set hset hmset lpush rpush sadd
2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据
4. 连接参数说明
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | no | String | connector name |
mode | required | no | String | redis cluster mode (single or cluster) |
single.host | optional | no | String | redis single mode machine host |
single.port | optional | no | int | redis single mode running port |
password | optional | no | String | redis database password |
command | required | no | String | redis write data or read data command |
key | required | no | String | redis key |
expire | optional | no | Int | set key ttl |
field | optional | no | String | get a value with field when using hget command |
cursor | optional | no | Int | using hscan command(e.g:1,2) |
start | optional | 0 | Int | read data when using lrange command |
end | optional | 10 | Int | read data when using lrange command |
connection.max.wait-mills | optional | no | Int | redis connection parameter |
connection.timeout-ms | optional | no | Int | redis connection parameter |
connection.max-total | optional | no | Int | redis connection parameter |
connection.max-idle | optional | no | Int | redis connection parameter |
connection.test-on-borrow | optional | no | Boolean | redis connection parameter |
connection.test-on-return | optional | no | Boolean | redis connection parameter |
connection.test-while-idle | optional | no | Boolean | redis connection parameter |
so.timeout-ms | optional | no | Int | redis connection parameter |
max.attempts | optional | no | Int | redis connection parameter |
2.动态读取和写入的工厂类
import org.apache.flink.common.RedisOptions;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.sink.RedisDynamicTableSink;import org.apache.flink.source.RedisDynamicTableSource;import org.apache.flink.table.catalog.Column;import org.apache.flink.table.catalog.ResolvedSchema;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.factories.DynamicTableSinkFactory;import org.apache.flink.table.factories.DynamicTableSourceFactory;import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { private ReadableConfig options; public RedisSourceSinkFactory(){} public RedisSourceSinkFactory(ReadableConfig options){ this.options = options; } //DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口 @Override public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); options = helper.getOptions(); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); List<Column> columns = schema.getColumns(); ArrayList<String> columnNames = new ArrayList<>(); columns.forEach(column -> columnNames.add(column.getName())); List<String> primaryKey = schema.getPrimaryKey().get().getColumns(); return new RedisDynamicTableSource(options,columnNames,primaryKey); } /DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现 @Override public DynamicTableSink createDynamicTableSink(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); List<Column> columns = schema.getColumns(); ArrayList<String> columnNames = new ArrayList<>(); columns.forEach(column -> columnNames.add(column.getName())); List<String> primaryKey = schema.getPrimaryKey().get().getColumns(); ReadableConfig options = helper.getOptions(); return new RedisDynamicTableSink(options,columnNames,primaryKey); } @Override public String factoryIdentifier() { return "redis"; } //sql connector 必填项 @Override public Set<ConfigOption<?>> requiredOptions() { HashSet<ConfigOption<?>> options = new HashSet<>(); options.add(RedisOptions.PASSWORD); options.add(RedisOptions.KEY); options.add(RedisOptions.MODE); return options; } //sql connector 选填项 @Override public Set<ConfigOption<?>> optionalOptions() { HashSet<ConfigOption<?>> options = new HashSet<>(); options.add(RedisOptions.SINGLE_HOST); options.add(RedisOptions.SINGLE_PORT); options.add(RedisOptions.CLUSTER_NODES); options.add(RedisOptions.FIELD); options.add(RedisOptions.CURSOR); options.add(RedisOptions.EXPIRE); options.add(RedisOptions.COMMAND); options.add(RedisOptions.START); options.add(RedisOptions.END); options.add(RedisOptions.CONNECTION_MAX_TOTAL); options.add(RedisOptions.CONNECTION_MAX_IDLE); options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE); options.add(RedisOptions.CONNECTION_TEST_ON_BORROW); options.add(RedisOptions.CONNECTION_TEST_ON_RETURN); options.add(RedisOptions.CONNECTION_TIMEOUT_MS); options.add(RedisOptions.TTL_SEC); options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY); options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS); options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC); return options; }
3. Redis Source 读取类
import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.connector.source.ScanTableSource;import org.apache.flink.table.connector.source.SourceFunctionProvider;import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource { private ReadableConfig options; private List<String> primaryKey; private List<String> columns; public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) { this.options = Preconditions.checkNotNull(options); this.columns = Preconditions.checkNotNull(columns); this.primaryKey = Preconditions.checkNotNull(primaryKey); } @Override public DynamicTableSource copy() { return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey); } @Override public String asSummaryString() { return "redis table source"; } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.all(); } @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey); return SourceFunctionProvider.of(redisSourceFunction,false); }}
支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink
import org.apache.flink.common.RedisClusterMode;import org.apache.flink.common.RedisCommandOptions;import org.apache.flink.common.RedisOptions;import org.apache.flink.common.RedisSplitSymbol;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.binary.BinaryStringData;import org.apache.flink.util.Preconditions;import org.apache.flink.util.RedisUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisCluster;import redis.clients.jedis.JedisPool;import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{ private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class); private ReadableConfig options; private List<String> primaryKey; private List<String> columns; private Jedis jedis; private JedisCluster jedisCluster; private String value; private String field; private String[] fields; private String cursor; private Integer start; private Integer end; private String[] keySplit; private static int position = 1; private GenericRowData rowData; public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){ this.options = Preconditions.checkNotNull(options); this.columns = Preconditions.checkNotNull(columns); this.primaryKey = Preconditions.checkNotNull(primaryKey); } @Override public void run(SourceContext<RowData> ctx) throws Exception { String password = options.get(RedisOptions.PASSWORD); Preconditions.checkNotNull(password,"password is null,please set value for password"); Integer expire = options.get(RedisOptions.EXPIRE); String key = options.get(RedisOptions.KEY); Preconditions.checkNotNull(key,"key is null,please set value for key"); String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT); String command = options.get(RedisOptions.COMMAND); // judge if command is redis set data command and stop method List<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH, RedisCommandOptions.RPUSH, RedisCommandOptions.SADD); if(sourceCommand.contains(command.toUpperCase())){ return;} Preconditions.checkNotNull(command,"command is null,please set value for command"); String mode = options.get(RedisOptions.MODE); Preconditions.checkNotNull(command,"mode is null,please set value for mode"); Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE); Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL); Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS); Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW); Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN); Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE); if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){ String host = options.get(RedisOptions.SINGLE_HOST); Integer port = options.get(RedisOptions.SINGLE_PORT); JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal, maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle); jedis = jedisPool.getResource(); jedis.auth(password); switch (command.toUpperCase()){ case RedisCommandOptions.GET: value = jedis.get(key); rowData = new GenericRowData(2); rowData.setField(0,BinaryStringData.fromString(key)); rowData.setField(1,BinaryStringData.fromString(value)); break; case RedisCommandOptions.HGET: field = options.get(RedisOptions.FIELD); value = jedis.hget(key, field); rowData = new GenericRowData(3); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()])); } rowData.setField(primaryKey.size(),BinaryStringData.fromString(value)); break; case RedisCommandOptions.HGETALL: if (keyArr.length > 1){ for (String str : keyArr) { rowData = new GenericRowData(columns.size()); keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedis.hget(str, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); } }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){ rowData = new GenericRowData(columns.size()); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedis.hget(key, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); }else{ //Fuzzy matching ,gets the data of the entire table String fuzzyKey = new StringBuffer(key).append("*").toString(); Set<String> keys = jedis.keys(fuzzyKey); for (String keyStr : keys) { keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); rowData = new GenericRowData(columns.size()); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedis.hget(keyStr, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); } } break; case RedisCommandOptions.HSCAN: cursor = options.get(RedisOptions.CURSOR); ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor); List<Map.Entry<String, String>> result = entries.getResult(); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); rowData = new GenericRowData(columns.size()); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } position = primaryKey.size(); for (int i = 0; i < result.size(); i++) { value = result.get(i).getValue(); rowData.setField(position,BinaryStringData.fromString(value)); position++; } break; case RedisCommandOptions.LRANGE: start = options.get(RedisOptions.START); end = options.get(RedisOptions.END); List<String> list = jedis.lrange(key, start, end); rowData = new GenericRowData(list.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); list.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; case RedisCommandOptions.SMEMBERS: Set<String> smembers = jedis.smembers(key); rowData = new GenericRowData(smembers.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); smembers.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; case RedisCommandOptions.ZRANGE: start = options.get(RedisOptions.START); end = options.get(RedisOptions.END); Set<String> sets = jedis.zrange(key, start, end); rowData = new GenericRowData(sets.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); sets.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; default: LOG.error("Cannot process such data type: {}", command); break;} if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ ctx.collect(rowData); } }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){ String nodes = options.get(RedisOptions.CLUSTER_NODES); String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT); String[] host = new String[hostAndPorts.length]; int[] port = new int[hostAndPorts.length]; for (int i = 0; i < hostAndPorts.length; i++) { String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); host[i] = splits[0]; port[i] = Integer.parseInt(splits[1]); } Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS); Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS); Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS); jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal, maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle); switch (command.toUpperCase()){ case RedisCommandOptions.GET: value = jedisCluster.get(key); rowData = new GenericRowData(2); rowData.setField(0,BinaryStringData.fromString(key)); rowData.setField(1,BinaryStringData.fromString(value)); break; case RedisCommandOptions.HGET: field = options.get(RedisOptions.FIELD); value = jedisCluster.hget(key, field); rowData = new GenericRowData(3); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()])); } rowData.setField(primaryKey.size(),BinaryStringData.fromString(value)); break; case RedisCommandOptions.HGETALL: if (keyArr.length > 1){ for (String str : keyArr) { rowData = new GenericRowData(columns.size()); keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedisCluster.hget(str, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); } }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){ rowData = new GenericRowData(columns.size()); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedisCluster.hget(key, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); }else{ //Fuzzy matching ,gets the data of the entire table String fuzzyKey = new StringBuffer(key).append("*").toString(); Set<String> keys = jedisCluster.keys(fuzzyKey); for (String keyStr : keys) { keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); rowData = new GenericRowData(columns.size()); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } for (int i = primaryKey.size(); i < columns.size(); i++) { String value = jedisCluster.hget(keyStr, columns.get(i)); rowData.setField(i,BinaryStringData.fromString(value)); } ctx.collect(rowData); } } break; case RedisCommandOptions.HSCAN: cursor = options.get(RedisOptions.CURSOR); ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor); List<Map.Entry<String, String>> result = entries.getResult(); keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); rowData = new GenericRowData(columns.size()); for (int i = 0; i < primaryKey.size(); i++) { rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()])); } position = primaryKey.size(); for (int i = 0; i < result.size(); i++) { value = result.get(i).getValue(); rowData.setField(position,BinaryStringData.fromString(value)); position++; } break; case RedisCommandOptions.LRANGE: start = options.get(RedisOptions.START); end = options.get(RedisOptions.END); List<String> list = jedisCluster.lrange(key, start, end); rowData = new GenericRowData(list.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); list.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; case RedisCommandOptions.SMEMBERS: Set<String> smembers = jedisCluster.smembers(key); rowData = new GenericRowData(smembers.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); smembers.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; case RedisCommandOptions.ZRANGE: start = options.get(RedisOptions.START); end = options.get(RedisOptions.END); Set<String> sets = jedisCluster.zrange(key, start, end); rowData = new GenericRowData(sets.size() +1); rowData.setField(0,BinaryStringData.fromString(key)); sets.forEach(s -> { rowData.setField(position,BinaryStringData.fromString(s)); position++;}); break; default: LOG.error("Cannot process such data type: {}", command); break;} if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ ctx.collect(rowData); } }else{ LOG.error("Unsupport such {} mode",mode);} } @Override public void cancel() { if(jedis != null){ jedis.close(); } if(jedisCluster != null){ jedisCluster.close(); } }}
4. Redis sink 写入类
public class RedisDynamicTableSink implements DynamicTableSink { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class); private ReadableConfig options; private List<String> primaryKey; private List<String> columns; public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) { this.options = Preconditions.checkNotNull(options); this.columns = Preconditions.checkNotNull(columns); this.primaryKey = Preconditions.checkNotNull(primaryKey); } @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.DELETE) .addContainedKind(RowKind.UPDATE_BEFORE) .addContainedKind(RowKind.UPDATE_AFTER) .build(); } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey); return SinkFunctionProvider.of(myRedisSinkFunction); } @Override public DynamicTableSink copy() { return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey); } @Override public String asSummaryString() { return "redis table sink"; }}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;import org.apache.flink.common.RedisCommandOptions;import org.apache.flink.common.RedisOptions;import org.apache.flink.common.RedisSplitSymbol;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.table.data.RowData;import org.apache.flink.util.Preconditions;import org.apache.flink.util.RedisUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisCluster;import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{ private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class); private ReadableConfig options; private List<String> primaryKey; private List<String> columns; private String fields; private Jedis jedis; private JedisCluster jedisCluster; private String[] fieldsArr; private StringBuffer redisTableKey; private String value; public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){ this.options = Preconditions.checkNotNull(options); this.columns = Preconditions.checkNotNull(columns); this.primaryKey = Preconditions.checkNotNull(primaryKey); } @Override public void invoke(RowData rowData, Context context) throws Exception { String password = options.get(RedisOptions.PASSWORD); Preconditions.checkNotNull(password,"password is null,please set value for password"); Integer expire = options.get(RedisOptions.EXPIRE); String key = options.get(RedisOptions.KEY); Preconditions.checkNotNull(key,"key is null,please set value for key"); String command = options.get(RedisOptions.COMMAND); Preconditions.checkNotNull(command,"command is null,please set value for command"); String mode = options.get(RedisOptions.MODE); Preconditions.checkNotNull(command,"mode is null,please set value for mode"); Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE); Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL); Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS); Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW); Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN); Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE); if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) { String host = options.get(RedisOptions.SINGLE_HOST); Integer port = options.get(RedisOptions.SINGLE_PORT); JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal, maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle); jedis = jedisPool.getResource(); jedis.auth(password); switch (command.toUpperCase()){ case RedisCommandOptions.SET: value = rowData.getString(0).toString(); jedis.set(String.valueOf(key),String.valueOf(value)); break; case RedisCommandOptions.HSET: String field = columns.get(1); //construct redis key:table_name:primary key col name: primary key value redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { if(primaryKey.size() <= 1){ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); break; }else{ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); } redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); } value = rowData.getString(1).toString(); jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value)); case RedisCommandOptions.HMSET: //construct redis key:table_name:primary key col name: primary key value redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { if(primaryKey.size() <= 1){ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); break; }else{ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); } if (i != primaryKey.size() -1){ redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); } } for (int i = 1; i < columns.size(); i++) { if (!primaryKey.contains(columns.get(i))){ value = rowData.getString(i).toString(); jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value)); } } break; case RedisCommandOptions.LPUSH: value = rowData.getString(0).toString(); jedis.lpush(key,value); break; case RedisCommandOptions.RPUSH: value = rowData.getString(0).toString(); jedis.rpush(key,value); break; case RedisCommandOptions.SADD: value = rowData.getString(0).toString(); jedis.sadd(key,value); break; default: LOG.error("Cannot process such data type: {}", command); break;} }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){ String nodes = options.get(RedisOptions.CLUSTER_NODES); String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT); String[] host = new String[hostAndPorts.length]; int[] port = new int[hostAndPorts.length]; for (int i = 0; i < hostAndPorts.length; i++) { String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); host[i] = splits[0]; port[i] = Integer.parseInt(splits[1]); } Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS); Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS); Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS); jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal, maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle); switch (command.toUpperCase()){ case RedisCommandOptions.SET: value = rowData.getString(0).toString(); jedisCluster.set(String.valueOf(key),String.valueOf(value)); break; case RedisCommandOptions.HSET: String field = columns.get(1); //construct redis key:table_name:primary key col name: primary key value redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { if(primaryKey.size() <= 1){ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); break; }else{ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); } redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); } value = rowData.getString(1).toString(); jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value)); case RedisCommandOptions.HMSET: //construct redis key:table_name:primary key col name: primary key value redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); for (int i = 0; i < primaryKey.size(); i++) { if(primaryKey.size() <= 1){ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); break; }else{ redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); redisTableKey.append(rowData.getString(i).toString()); } redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT); } for (int i = 1; i < columns.size(); i++) { value = rowData.getString(i).toString(); jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value)); } break; case RedisCommandOptions.LPUSH: value = rowData.getString(0).toString(); jedisCluster.lpush(key,value); break; case RedisCommandOptions.RPUSH: value = rowData.getString(0).toString(); jedisCluster.rpush(key,value); break; case RedisCommandOptions.SADD: value = rowData.getString(0).toString(); jedisCluster.sadd(key,value); break; default: LOG.error("Cannot process such data type: {}", command); break;} }else{ LOG.error("Unsupport such {} mode",mode);} } @Override public void close() throws Exception { if(jedis != null){ jedis.close(); } if(jedisCluster != null){ jedisCluster.close(); } }}
对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:
Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客
最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源
历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git
目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)
希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。