ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

FlinkSQL自定义函数(UDF)维表转换

2021-09-07 14:02:02  阅读:677  来源: 互联网

标签:String 自定义 FlinkSQL UDF 函数 new class jedisCluster


前言

Table和SQL的关系:SQL是Table的继承封装(这点在Flink的概念有所体现),比如说:StreamTableEnvironment继承自TableEnvironment便可体现出来。故官文中Table的使用均可在SQL中体现出来,比如说自定义函数Table API & SQL下的自定义函数中只给出了Table方式的TableEnvironment 创建自定义函数,我们可以修改为ste对象实现在SQL中。


在这里插入图片描述

应用场景

利用FlinkSQL进行Redis维表信息转换。redis获取维表信息后存储在函数中。

代码

producer代码引用:FlinkSQL使用DDL创建Kafka生产和消费者其中的生产数据类型由json改为csv(此文中补充有)。

或者使用Table的方式:Flink SQL & Table简单实例

模拟生产数据

生产者DDL:

	        String ddl = "CREATE TABLE CbryProduce(\n" +
	                "phoneNum STRING,\n" +
	                "rechargeNum STRING,\n" +
	                "provinceCode STRING,\n" +
	                "cityCode STRING,\n" +
	                "rechargeChannelCode STRING\n" +
	                ") WITH(\n" +
	                "'connector.type'='kafka',\n" +
	                "'connector.version'='universal',\n" +
	                "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" +
	                //"'connector.properties.bootstrap.servers'='localhost:9092',\n" +
	                "'connector.topic'='event_topic_1',\n" +
	                "'format.type'='csv',\n" +
	                "'format.field-delimiter'='|'\n" +
	                ")\n"
	                ;

DML:
String insert2 = "insert into CbryProduce(phoneNum,rechargeNum,provinceCode,cityCode,rechargeChannelCode)" +
	                        "values('1024','100','051','0750','2')";

生成Redis维表信息

如何生成JedisCluster对象插入数据:

Redis(一) Jedis单机和集群连接

Redis(三)redisTemplate实操和五种基础数据类型

//		模拟数据创建
//		Map<String, String> cityDimensionMap = new HashedMap();
//		cityDimensionMap.put("0020", "广州");
//		cityDimensionMap.put("0750", "深圳");
//		
//		Map<String, String> rechargeChannelsMap = new HashedMap();
//		rechargeChannelsMap.put("1", "手机app充值");
//		rechargeChannelsMap.put("2", "营业厅充值");
//		
//		jedisCluster.hmset("CityCode", cityDimensionMap);
//		jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
//		System.out.println(jedisCluster.hgetAll("CityCode"));
//		System.out.println(jedisCluster.hgetAll("RechargeChannels"));
//		System.out.println(jedisCluster.get("testttt"));  //空值返回null

自定义SQL函数

如何使用FlinkSQL:FlinkSQL使用DDL创建Kafka生产和消费者或者使用Table的方式:Flink SQL & Table简单实例

这里在ScalarFunction我们只要通过自定义/重载一个eval方法即可:

如下:我们对自定义函数传入一个cityNum返回cityCodeMap对应的值。

	// define function logic
	// 自定义SQL函数
	public static class cityCodeTranslateFunction extends ScalarFunction{
		
		Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
		
		  public String eval(String cityNum) {
			 String res = cityCodeMap.get(cityNum);
			return res == null ? "Error" : res;
		  }
	}

引入自定义函数

将我们的自定义函数引入SQL的StreamTableEnvironment执行环境中

//StreamTableEnvironment继承自TableEnvironment
	ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
	ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);

执行打印

	Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
			+ " from CbryConsumer");
	
	DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
	result.printToErr();

输出结果

在这里插入图片描述

1> 1024,100,0750,深圳,051,2,营业厅充值

1> 1024,100,0020,广州,051,1,手机app充值


整体代码

public class UserDefinedFuctions {
	
	static JedisCluster jedisCluster;
	
	private static  GenericObjectPoolConfig getGenericObjectPoolConfig() {
		GenericObjectPoolConfig genericObjectPool = new GenericObjectPoolConfig();
		genericObjectPool.setMaxIdle(10);
		genericObjectPool.setMaxTotal(100);
		genericObjectPool.setMinEvictableIdleTimeMillis(30000); // 逐出连接的最小空闲时间 30s
		genericObjectPool.setSoftMinEvictableIdleTimeMillis(60000); // 空闲逐出时间1分钟
		return genericObjectPool;
	}

	
	static {
		HostAndPort hostAndPort = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort2 = new HostAndPort("ip", 7001);
		HostAndPort hostAndPort3 = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort4 = new HostAndPort("ip", 7001);
		HostAndPort hostAndPort5 = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort6 = new HostAndPort("ip", 7001);
		Set<HostAndPort> hostAndPortSet = new HashSet<>();
		hostAndPortSet.add(hostAndPort);
		hostAndPortSet.add(hostAndPort2);
		hostAndPortSet.add(hostAndPort3);hostAndPortSet.add(hostAndPort4);hostAndPortSet.add(hostAndPort5);hostAndPortSet.add(hostAndPort6);
		jedisCluster = new JedisCluster(hostAndPortSet, 6000, 6000, 10, password,UserDefinedFuctions.getGenericObjectPoolConfig());
		
//		模拟数据创建
//		Map<String, String> cityDimensionMap = new HashedMap();
//		cityDimensionMap.put("0020", "广州");
//		cityDimensionMap.put("0750", "深圳");
//		
//		Map<String, String> rechargeChannelsMap = new HashedMap();
//		rechargeChannelsMap.put("1", "手机app充值");
//		rechargeChannelsMap.put("2", "营业厅充值");
//		
//		jedisCluster.hmset("CityCode", cityDimensionMap);
//		jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
//		System.out.println(jedisCluster.hgetAll("CityCode"));
//		System.out.println(jedisCluster.hgetAll("RechargeChannels"));
//		System.out.println(jedisCluster.get("testttt"));  //空值返回null
	}
	
	// define function logic
	// 自定义SQL函数
	public static class cityCodeTranslateFunction extends ScalarFunction{
		
		Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
		
		  public String eval(String cityNum) {
			 String res = cityCodeMap.get(cityNum);
			return res == null ? "Error" : res;
		  }
	}
	
	public static class rechargeChannelTranslateFunction extends ScalarFunction{
		
		Map<String, String> rechargeChannelsMap = jedisCluster.hgetAll("RechargeChannels");
		
		  public String eval(String rechargeChannel) {
			 
			String res = rechargeChannelsMap.get(rechargeChannel);
			return res == null ? "Error" : res;
		  }
	}
	
public static void main(String[] args) {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
			// .useOldPlanner() // flink
			.useBlinkPlanner() // blink
			.build();
	
	StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);
	

	String ddl = "CREATE TABLE CbryConsumer(\n" + 
            "phoneNum String,\n" +
            "rechargeNum String,\n" +
            "provinceCode String,\n" +
            "cityCode String,\n" +
            "rechargeChannelCode String\n" +
			") WITH(\n" + "'connector.type'='kafka',\n"
			+ "'connector.version'='universal',\n" + "'connector.properties.group.id'='g2_group',\n"
			+ "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n"
			+ "'connector.topic'='event_topic_1',\n" + "'connector.startup-mode' = 'latest-offset',\n"
			+ "'format.type'='csv',\n" 
			+ "'format.field-delimiter'='|'\n" +
            ")\n"
            ;
	ste.executeSql(ddl);

	//StreamTableEnvironment继承自TableEnvironment
	ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
	ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
	
	Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
			+ " from CbryConsumer");
	
	DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
	result.printToErr();

	try {
		env.execute();
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}

}
}

带交互的实现

在实际使用下,我们不可能说实现一个函数写一次代码,如何实现”交互“形态的自定义函数呢? 答曰:使用java的多态进行重载构造函数:

    // define function logic
    // 自定义SQL函数
    public static class AutoAdaptaMapDefineFunction extends ScalarFunction {

        Map<String, String> redisMap;

        public AutoAdaptaMapDefineFunction(String dimensionName) {
            redisMap =  jedisCluster.hgetAll(dimensionName);
        }

        public String eval(String dimensionKey) {

            String res = redisMap.get(dimensionKey);
            return res == null ? "Error" : res;
        }

    }

//ste.createTemporarySystemFunction("cityTranslate", new AutoAdaptaMapDefineFunction("CityCodeDimensionMapKey"));

对于交互式会话,还可以在使用或注册函数之前对其进行参数化。在这种情况下,可以将函数实例而不是函数用作临时函数。

它要求参数是可序列化的,以便将函数实例传送到集群。



PS:有两种注入函数的方式:一个是传对象,一个是传class对象

    void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
    void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);

这也就给我们提供了自定义类加载器,指定特定class对象进行函数注入Flink作业的可能。

标签:String,自定义,FlinkSQL,UDF,函数,new,class,jedisCluster
来源: https://blog.csdn.net/qq_37334150/article/details/120156357

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有