flink 介绍
Flink 是一个大规模数据处理引擎,适用于需要处理大量数据的场景,如大数据分析、实时数据流处理等。它是一个开源的分布式流处理框架,由 Apache 软件基金会开发,可以在多种平台上运行,如 Hadoop、YARN 等。Flink 提供了实时数据流处理、批处理和机器学习等功能,具有高吞吐量、低延迟和可扩展等优点。在 Flink 中,数据流被视为无界的,这意味着数据处理可以随时进行,而不需要等待数据集的完整。Flink 的核心是一个基于时间和事件驱动的执行引擎,可以高效地执行复杂的流处理和批处理程序。Flink 在大数据领域有着广泛的应用,是大数据处理领域的一个重要工具。
flink 简单使用
-
•
- 安装: 首先,你需要在你的机器上安装 Apache Flink。你可以从 Apache Flink 的官方网站下载最新的版本。下载完成后,解压缩文件,并设置环境变量。 2. Flink 程序结构: 一个基本的 Flink 程序包括以下部分:** **
- 获取执行环境** **
- 加载/创建初始数据** **
- 指定此数据的转换** **
- 指定计算结果的位置** **
- 触发程序执行** **
-
•
3. 创建一个 Flink 项目: 你可以使用任何支持 Java 的 IDE 创建一个新的 Flink 项目。创建新项目后,需要添加 Flink 的依赖项。** ** -
•
4. 编写一个简单的 Flink 程序: 以下是一个简单的 Flink 程序,它读取文本文件的内容,将文本分割成单词,并计算每个单词出现的次数。
创建一个 maven 项目
导入 Maven 依赖
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
创建 Main 程序启动类
public class WordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("path_to_your_text_file");
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1));
counts.print();
env.execute("WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
在这个程序中,我们首先从一个文本文件中读取数据,然后使用 flatMap 函数将每一行文本分割成单词,并为每个单词生成一个(key, 1)的元组。然后我们按 key 分组,并将每组的值相加,得到每个单词的出现次数。
上述代码是通过文件模拟流处理,模拟有界流,当我们在生产环境当中,一般都是无界流,来一个流数据处理一个,一般通过 kafka,这里测试使用 nc 工具
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStreamSource<String> source = env.socketTextStream(host, port);
source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1).print();
env.execute("Flink");
}
}
通过 args 参数传递 host 和 port,通过 idea 程序启动,将参数传递
nc -lk yourport
可以看到服务端来一个消息,flink 接收一个消息
flink 安装
官网下载 gz
压缩
环境装有 jdk1.8 以上
flink 安装 bin 目录启动 start-cluster.sh
访问 localhoost:8081
代表安装成功
在 Web ui 提交任务
- 将之前 idea 代码 packge 打成 jar 包
- webUI 将 jar 包上传
- 添加相关启动类入口,程序参数,并行度
- 点击 show plan 和 submit 按钮提交任务
最后在 task managers 监控可以看到结果输出
!!!如果当 submit 提示出现服务器错误,查看 job manager 日志发现
排查 maven 打包是否将启动类打包,如果打包了,还是出现该问题再 maven pom 文件添加
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
最后重新 packge 即可,提交
flink 还有许多高级应用,需要后续自己钻研,先初步记录在此