ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Flink SQL Hbase Demo

2022-06-29 10:03:51  阅读:198  来源: 互联网

标签:STATUS STRING Demo Flink SIGN DATE Hbase data row


  1. 依赖pom

           <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
                <version>1.12.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-hbase-2.2_2.11</artifactId>
                <version>1.12.1</version>
            </dependency>
    <
     <build>
         <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>module-info.class</exclude>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <!--这块很重要,采用追加的方式-->
                                <transformers combine.children="append">
                                    <transformer
                                      implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.juneyaoair.dataplatform.service.RealTimeLableApplication</mainClass>
                                    </transformer>
                                    <!-- The service transformer is needed to merge META-INF/services files -->
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
    
        </build>
    
  2. Flink SQL Source 连接Kafka

    CREATE TABLE KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE (
      table_name STRING NOT NULL, 
      op_type STRING, 
      op_ts STRING, 
      op_ts_time as CAST(op_ts as timestamp), 
      current_ts STRING, 
      pos STRING, 
      before ROW < `ADDRESS_STATUS` STRING, 
      `AUDITOR_ID` STRING, 
      `BENEFIC_TYPE` STRING, 
      `CARD_STATUS_CODE` STRING, 
      `CHILD_SIGN` STRING, 
      `CLASS_PREFER` STRING, 
      `COMMENTS` STRING, 
      `DEGRADE_SIGN` STRING, 
      `EFFECTIVE_DATE` STRING, 
      `EXPIRE_DATE` STRING, 
      `ID` STRING, 
      `INVITER_CARD_NO` STRING, 
      `IS_ACCOUNT_CLOSED` STRING, 
      `IS_LEVEL_EXPIRE` STRING, 
      `IS_MILEAGE_EXPIRE` STRING, 
      `IS_SMALL_EXEMPT_PWD` STRING, 
      `IS_SMOKING` STRING, 
      `IS_TEST_MEMBER` STRING, 
      `MAIL_ADDRESS_TYPE` STRING, 
      `MAIL_LANGUAGE_CODE` STRING, 
      `MEAL_PREFER` STRING, 
      `MEMBER_LEVEL_CODE` STRING, 
      `MEMBER_STATUS_CODE` STRING, 
      `MEMBER_STATUS_DATE` STRING, 
      `MULTIPLIER_MILES_SIGN` STRING, 
      `OPERATE_DATE` STRING, 
      `OPERATE_USER_ID` STRING, 
      `QUALIFICATION_REASON_CODE` STRING, 
      `REDEEM_SIGN` STRING, 
      `REGIST_DATE` STRING, 
      `REGIST_SOURCE` STRING, 
      `SEAT_PREFER` STRING, 
      `SMS_STATUS` STRING, 
      `SPECIAL_ASSISTANCE` STRING, 
      `STATEMENT_SEND_SIGN` STRING, 
      `STATEMENT_SEND_TYPE` STRING, 
      `SUBMIT_DATE` STRING, 
      `UNITED_CARD_SIGN` STRING, 
      `UPDATE_DATE` STRING, 
      `UPDATE_STATUS_SIGN` STRING, 
      `UPDATE_USER_ID` STRING, 
      `UPGRADE_SIGN` STRING >, 
      after 
        ROW < `ADDRESS_STATUS` STRING, 
        `AUDITOR_ID` STRING, 
        `BENEFIC_TYPE` STRING, 
        `CARD_STATUS_CODE` STRING, 
        `CHILD_SIGN` STRING, 
        `CLASS_PREFER` STRING, 
        `COMMENTS` STRING, 
        `DEGRADE_SIGN` STRING, 
        `EFFECTIVE_DATE` STRING, 
        `EXPIRE_DATE` STRING, 
        `ID` STRING, 
        `INVITER_CARD_NO` STRING, 
        `IS_ACCOUNT_CLOSED` STRING, 
        `IS_LEVEL_EXPIRE` STRING, 
        `IS_MILEAGE_EXPIRE` STRING, 
        `IS_SMALL_EXEMPT_PWD` STRING, 
        `IS_SMOKING` STRING, 
        `IS_TEST_MEMBER` STRING, 
        `MAIL_ADDRESS_TYPE` STRING, 
        `MAIL_LANGUAGE_CODE` STRING, 
        `MEAL_PREFER` STRING, 
        `MEMBER_LEVEL_CODE` STRING, 
        `MEMBER_STATUS_CODE` STRING, 
        `MEMBER_STATUS_DATE` STRING, 
        `MULTIPLIER_MILES_SIGN` STRING, 
        `OPERATE_DATE` STRING, 
        `OPERATE_USER_ID` STRING, 
        `QUALIFICATION_REASON_CODE` STRING, 
        `REDEEM_SIGN` STRING, 
        `REGIST_DATE` STRING, 
        `REGIST_SOURCE` STRING, 
        `SEAT_PREFER` STRING, 
        `SMS_STATUS` STRING, 
        `SPECIAL_ASSISTANCE` STRING, 
        `STATEMENT_SEND_SIGN` STRING, 
        `STATEMENT_SEND_TYPE` STRING, 
        `SUBMIT_DATE` STRING, 
        `UNITED_CARD_SIGN` STRING, 
        `UPDATE_DATE` STRING, 
        `UPDATE_STATUS_SIGN` STRING, 
        `UPDATE_USER_ID` STRING, 
        `UPGRADE_SIGN` STRING >, 
        data_row AS case when op_type = 'D' then before else 
      after 
        end, 
        watermark for op_ts_time as op_ts_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka', 'topic' = 'SEM.FFPTEST.HOFFPDEV.TBL_MEMBER_INFO', 
      'properties.bootstrap.servers' = '172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092', 
      'scan.startup.mode' = 'earliest-offset', 
      'properties.group.id' = 'kafka-flink-sync-hbase-local', 
      'properties.fetch.max.bytes' = '5242880', 
      'format' = 'json'
    )
    
  3. Flink SQL Sink Hbase 需要提前在Hbase中建表

    CREATE TABLE SYNC_HBASE_SINK_TBL_MEMBER_INFO(
      rowkey STRING, 
      field ROW < `ADDRESS_STATUS` STRING, 
      `AUDITOR_ID` STRING, 
      `BENEFIC_TYPE` STRING, 
      `CARD_STATUS_CODE` STRING, 
      `CHILD_SIGN` STRING, 
      `CLASS_PREFER` STRING, 
      `COMMENTS` STRING, 
      `DEGRADE_SIGN` STRING, 
      `EFFECTIVE_DATE` STRING, 
      `EXPIRE_DATE` STRING, 
      `ID` STRING, 
      `INVITER_CARD_NO` STRING, 
      `IS_ACCOUNT_CLOSED` STRING, 
      `IS_LEVEL_EXPIRE` STRING, 
      `IS_MILEAGE_EXPIRE` STRING, 
      `IS_SMALL_EXEMPT_PWD` STRING, 
      `IS_SMOKING` STRING, 
      `IS_TEST_MEMBER` STRING, 
      `MAIL_ADDRESS_TYPE` STRING, 
      `MAIL_LANGUAGE_CODE` STRING, 
      `MEAL_PREFER` STRING, 
      `MEMBER_LEVEL_CODE` STRING, 
      `MEMBER_STATUS_CODE` STRING, 
      `MEMBER_STATUS_DATE` STRING, 
      `MULTIPLIER_MILES_SIGN` STRING, 
      `OPERATE_DATE` STRING, 
      `OPERATE_USER_ID` STRING, 
      `QUALIFICATION_REASON_CODE` STRING, 
      `REDEEM_SIGN` STRING, 
      `REGIST_DATE` STRING, 
      `REGIST_SOURCE` STRING, 
      `SEAT_PREFER` STRING, 
      `SMS_STATUS` STRING, 
      `SPECIAL_ASSISTANCE` STRING, 
      `STATEMENT_SEND_SIGN` STRING, 
      `STATEMENT_SEND_TYPE` STRING, 
      `SUBMIT_DATE` STRING, 
      `UNITED_CARD_SIGN` STRING, 
      `UPDATE_DATE` STRING, 
      `UPDATE_STATUS_SIGN` STRING, 
      `UPDATE_USER_ID` STRING, 
      `UPGRADE_SIGN` STRING >, 
      primary key (rowkey) NOT ENFORCED
    ) WITH (
      'connector' = 'hbase-2.2', 'table-name' = 'ods:tbl_member_info', 
      'zookeeper.quorum' = '172.22.31.53:2181'
    )
    
  4. Flink SQL Dml

    insert into SYNC_HBASE_SINK_TBL_MEMBER_INFO 
    select 
      ID AS rowkey, 
      ROW(
     `ADDRESS_STATUS`,
     `AUDITOR_ID`, 
     `BENEFIC_TYPE`, 
     `CARD_STATUS_CODE`, 
     `CHILD_SIGN`, 
     `CLASS_PREFER`, 
     `COMMENTS`, 
     `DEGRADE_SIGN`, 
     `EFFECTIVE_DATE`, 
     `EXPIRE_DATE`, 
     `ID`, 
     `INVITER_CARD_NO`, 
     `IS_ACCOUNT_CLOSED`, 
     `IS_LEVEL_EXPIRE`,
     `IS_MILEAGE_EXPIRE`, 
     `IS_SMALL_EXEMPT_PWD`, 
     `IS_SMOKING`, 
     `IS_TEST_MEMBER`, 
     `MAIL_ADDRESS_TYPE`, 
     `MAIL_LANGUAGE_CODE`, 
     `MEAL_PREFER`, 
     `MEMBER_LEVEL_CODE`, 
     `MEMBER_STATUS_CODE`, 
     `MEMBER_STATUS_DATE`, 
     `MULTIPLIER_MILES_SIGN`, 
     `OPERATE_DATE`, 
     `OPERATE_USER_ID`, 
     `QUALIFICATION_REASON_CODE`, 
     `REDEEM_SIGN`, 
     `REGIST_DATE`, 
     `REGIST_SOURCE`, 
     `SEAT_PREFER`, 
     `SMS_STATUS`, 
     `SPECIAL_ASSISTANCE`, 
     `STATEMENT_SEND_SIGN`, 
     `STATEMENT_SEND_TYPE`, 
     `SUBMIT_DATE`, 
     `UNITED_CARD_SIGN`, 
     `UPDATE_DATE`, 
     `UPDATE_STATUS_SIGN`, 
     `UPDATE_USER_ID`, 
     `UPGRADE_SIGN`
      ) 
    from 
      (
     select 
       data_row.`ADDRESS_STATUS` AS ADDRESS_STATUS, 
       data_row.`AUDITOR_ID` AS AUDITOR_ID, 
       data_row.`BENEFIC_TYPE` AS BENEFIC_TYPE, 
       data_row.`CARD_STATUS_CODE` AS CARD_STATUS_CODE, 
       data_row.`CHILD_SIGN` AS CHILD_SIGN, 
       data_row.`CLASS_PREFER` AS CLASS_PREFER, 
       data_row.`COMMENTS` AS COMMENTS, 
       data_row.`DEGRADE_SIGN` AS DEGRADE_SIGN, 
       data_row.`EFFECTIVE_DATE` AS EFFECTIVE_DATE, 
       data_row.`EXPIRE_DATE` AS EXPIRE_DATE, 
       data_row.`ID` AS ID, 
       data_row.`INVITER_CARD_NO` AS INVITER_CARD_NO, 
       data_row.`IS_ACCOUNT_CLOSED` AS IS_ACCOUNT_CLOSED, 
       data_row.`IS_LEVEL_EXPIRE` AS IS_LEVEL_EXPIRE, 
       data_row.`IS_MILEAGE_EXPIRE` AS IS_MILEAGE_EXPIRE, 
       data_row.`IS_SMALL_EXEMPT_PWD` AS IS_SMALL_EXEMPT_PWD, 
       data_row.`IS_SMOKING` AS IS_SMOKING, 
       data_row.`IS_TEST_MEMBER` AS IS_TEST_MEMBER, 
       data_row.`MAIL_ADDRESS_TYPE` AS MAIL_ADDRESS_TYPE, 
       data_row.`MAIL_LANGUAGE_CODE` AS MAIL_LANGUAGE_CODE, 
       data_row.`MEAL_PREFER` AS MEAL_PREFER, 
       data_row.`MEMBER_LEVEL_CODE` AS MEMBER_LEVEL_CODE, 
       data_row.`MEMBER_STATUS_CODE` AS MEMBER_STATUS_CODE, 
       data_row.`MEMBER_STATUS_DATE` AS MEMBER_STATUS_DATE, 
       data_row.`MULTIPLIER_MILES_SIGN` AS MULTIPLIER_MILES_SIGN, 
       data_row.`OPERATE_DATE` AS OPERATE_DATE, 
       data_row.`OPERATE_USER_ID` AS OPERATE_USER_ID, 
       data_row.`QUALIFICATION_REASON_CODE` AS QUALIFICATION_REASON_CODE, 
       data_row.`REDEEM_SIGN` AS REDEEM_SIGN, 
       data_row.`REGIST_DATE` AS REGIST_DATE, 
       data_row.`REGIST_SOURCE` AS REGIST_SOURCE, 
       data_row.`SEAT_PREFER` AS SEAT_PREFER, 
       data_row.`SMS_STATUS` AS SMS_STATUS, 
       data_row.`SPECIAL_ASSISTANCE` AS SPECIAL_ASSISTANCE, 
       data_row.`STATEMENT_SEND_SIGN` AS STATEMENT_SEND_SIGN, 
       data_row.`STATEMENT_SEND_TYPE` AS STATEMENT_SEND_TYPE, 
       data_row.`SUBMIT_DATE` AS SUBMIT_DATE, 
       data_row.`UNITED_CARD_SIGN` AS UNITED_CARD_SIGN, 
       data_row.`UPDATE_DATE` AS UPDATE_DATE, 
       data_row.`UPDATE_STATUS_SIGN` AS UPDATE_STATUS_SIGN, 
       data_row.`UPDATE_USER_ID` AS UPDATE_USER_ID, 
       data_row.`UPGRADE_SIGN` AS UPGRADE_SIGN 
     FROM 
       default_catalog.default_database.KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE
      ) t
    

标签:STATUS,STRING,Demo,Flink,SIGN,DATE,Hbase,data,row
来源: https://www.cnblogs.com/qiangsky/p/16422151.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有