ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink入门-WordCount

2022-06-11 00:01:40  阅读:149  来源: 互联网

标签:Flink java 入门 flink WordCount api import apache org


 以一个简单的入门例子,统计每个单词出现的次数开始。

1. pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>study-flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <!-- 引入Flink相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

  Flink 底层的架构使用了Akka 来实现分布式通信,Akka是用Scala 开发的。因此指定了Scala 版本。

2. 编写程序进行测试

1. 项目下新建文件  file/words.txt  内容如下:

hello world
hello flink
hello java
java nb
java pl

2. 批处理程序:

  思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 批处理逻辑
 */
public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = txtDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] strs = line.split(" ");
            for (String str : strs) {
                out.collect(Tuple2.of(str, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG)); // lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
        // 4. 按照word 进行分组(按照第一个字段分组。 也就是按照String 类型的词分组). 有下面两种方式
        // 第一种,指定属性名称。 f0 是 org.apache.flink.api.java.tuple.Tuple2.f0
//        UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = wordAndOne.groupBy("f0");
        UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = wordAndOne.groupBy(0);
        // 5. 分组聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);
        // 6. 打印结果
        sum.print();
    }
}

结果:

(nb,1)
(flink,1)
(world,1)
(hello,3)
(java,3)
(pl,1)

  可以看到将文档中的所有单词的频次,全部统计打出来。

  需要注意,这种批处理是基于DataSetAPI的,也就是数据集API。

3. 流处理-有界数据

  DataStreamAPI用于处理流处理。

package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class BoundedStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境(流处理执行环境)
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
        // 5. 求和 (sum、min、max 可以用字段名称,也可以用字段顺序)
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        // 6. 打印
        sum.print();

        // 7. 执行
        executionEnvironment.execute();
    }
}

与批处理程序不同的是:

1》创建的执行环境不同

2》每一步处理转换之后,得到的数据对象类型不同

3》分组操作调用的是keyBy 方法,可以传入一个匿名函数作为键选择器,指定当前分组的key 是什么。

4》代码末尾需要调用execute 方法开始执行任务。 

结果:

5> (world,1)
3> (hello,1)
2> (java,1)
4> (nb,1)
2> (java,2)
2> (pl,1)
7> (flink,1)
3> (hello,2)
2> (java,3)
3> (hello,3)

  从结果可以看出。批处理针对每个单词,只会输出一个最终的统计个数。而在流处理的打印结果中,"hello"这个单词每出现一次就会统计一次。这也是流处理的特点,数据逐个处理。

  另外,Flink是一个分布式处理引擎。在上面开发环境中,execute 之后实际会用多线程来模拟一个Flink 集群。前面的数字5>、3> 等指示了本地执行的不同线程,对应着Flink 运行时不同的并行资源。可以理解为是线程的资源信息,默认为CPU核数。

4. 流处理-无界数据流

  模拟监听socket并且处理接收的数据。

(1) linux 用nc 监听端口 (nc 是linux 自带的一个netcat 工具)

nc -l 7777

(2) 编写代码监听7777 socket 端口并处理数据

package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境(流处理执行环境)
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("localhost", 7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        // 6. 打印
        sum.print();
        // 7. 执行
        executionEnvironment.execute();
    }
}

(3)测试

1》nc 连接窗口输入如下信息

hello china hello java

2》控制台输出

3> (hello,1)
2> (java,1)
3> (china,1)
3> (hello,2)

 5. 本地环境求最大最小

package cn.qz;

import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;

import java.io.Serializable;
import java.util.List;

public class LocalEnvTest {


    public static void main(String[] args) throws Exception {
        // 1. 开启本地环境,默认开启一个线程
        LocalEnvironment localEnvironment = ExecutionEnvironment.createLocalEnvironment();
        // 2. 构造元素
        final User user1 = new User().setName("user1").setAge(12);
        User user2 = new User().setName("user2").setAge(139);
        User user3 = new User().setName("user3").setAge(14);
        DataSource<User> userDataSource = localEnvironment.fromElements(user1, user2, user3);
        // 2. 返回二元数组对象
        MapOperator<User, Tuple2<Integer, User>> map = userDataSource.map(new MapFunction<User, Tuple2<Integer, User>>() {
            public Tuple2 map(User user) throws Exception {
                return new Tuple2(user.getAge(), user);
            }
        });
        // 3. 求最大最小
        /**
         * max与maxBy都能用来排序,他们都只能对Tuple类型数据源生效。
         * max用来对指定那一列/多列进行排序,其它列不保证,因此返回结果中指定的一列/多列是最大值,其它列为数据源迭代中的最后一条记录。
         * maxBy可以根据指定的一列/多列进行排序,最终返回的是最大的那列对应记录。maxBy本质上就是对Tuple类型数据某个位置元素进行比较排序,类似于索引,最终返回最大的那个元素
         */
        List<Tuple2<Integer, User>> collect = map.max(0).collect();
        collect.forEach(tmp -> {
            System.out.println(tmp);
        });
        System.out.println("======");
        List<Tuple2<Integer, User>> collect2 = map.maxBy(0).collect();
        collect2.forEach(tmp -> {
            System.out.println(tmp);
        });

        System.out.println("******");

        List<Tuple2<Integer, User>> collect3= map.min(0).collect();
        collect3.forEach(tmp -> {
            System.out.println(tmp);
        });
        System.out.println("======");
        List<Tuple2<Integer, User>> collect4 = map.minBy(0).collect();
        collect4.forEach(tmp -> {
            System.out.println(tmp);
        });
    }

    @Data
    @Accessors(chain = true)
    public static class User implements Serializable {

        private String name;

        private Integer age;
    }
}

结果:(可以看出,max/min 只是针对指定的字段求最大最小,后面的元素是数据源迭代最后一条记录; maxBy/minBy 是找到指定的最大最小,且后面的元素也是与之对应的元素。)

(139,LocalEnvTest.User(name=user3, age=14))
======
(139,LocalEnvTest.User(name=user2, age=139))
******
(12,LocalEnvTest.User(name=user3, age=14))
======
(12,LocalEnvTest.User(name=user1, age=12))

 

标签:Flink,java,入门,flink,WordCount,api,import,apache,org
来源: https://www.cnblogs.com/qlqwjy/p/16342919.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有