本文共 4766 字,大约阅读时间需要 15 分钟。
Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。
操作系统环境:
flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。
JDK:
要求Java 7或者更高1、JDK安装步骤此处省略,安装后验证下JDK环境
$ java -versionopenjdk version "1.8.0_144"OpenJDK Runtime Environment (build 1.8.0_144-b01)OpenJDK 64-Bit Server VM (build 25.144-b01, mixed mode)
2、安装部署flink
本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。找到下载的flink压缩包,进行解压
$ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz
首先是local模式,最为简单。
$ cd flink-1.4.2$ bin/start-local.shStarting job manager
我们可以通过查看日志确认是否启动成功
$ tailf flink-csap-taskmanager-0-XXXX.log2018-05-03 10:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager#-524742300.2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc @ LY1F-R021707-VM14.local (dataPort=55234)2018-05-03 10:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 1 task slot(s).2018-05-03 10:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]2018-05-03 10:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)2018-05-03 10:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.2018-05-03 10:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:52382. Starting BLOB cache.2018-05-03 10:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b2018-05-03 10:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778
JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。
可以打开页面查看到相关信息,说明local模式部署是没问题的。下面来看一下standlone部署方式。
安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,。
修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINK_TM_HEAP参数类覆盖值钱的设置。
我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:
192.168.0.100192.168.0.101...192.168.0.150
然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群
$ bin/start-cluster.sh
查看日志是否成功。
以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。
flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:
object SocketWindowWordCount { def main(args: Array[String]) : Unit = { // the port to connect to val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port specified. Please run 'SocketWindowWordCount --port'") return } } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text = env.socketTextStream("localhost", port, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1) env.execute("Socket Window WordCount") } // Data type for words with count case class WordWithCount(word: String, count: Long)}
这个demo是监控端口,然后对端口输入单子进行wordcount的程序。
运行demo,首先打开一个窗口进行端口数据输入:
$ nc -l 9001hellohellowordworld
然后运行demo监控端口单词输入统计:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
运行后可以看到结果统计:
$ more flink-csap-taskmanager-0-XXX.out.1hello : 1hello : 1word : 1world : 1
以上就是flink的hello world,大家初步对flink有个了解。
转载地址:http://hnucf.baihongyu.com/