此文档源于在2018年2月做Apache NiFi大数据处理和分发系统培训整理的文档。培训版本基于Nifi1.5,同时附有Nifi安装配置和常用Processor使用的PDF文档。
1.NiFi功能概述、原理和架构
1.1 NiFi的功能概述
Apache NiFi 是一个易于使用、功能强大而且可靠的数据处理和分发系统。Apache NiFi 是为数据流设计。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑。
> NiFi这个项目在2015年7月成功成为Apache开源社区的顶级项目,NiFi之前是在美国国家安全局使用了8年的一个可视化,可定制的数据集成产品,2014年贡献给了Apache开源社区。我们认为NiFi的出现填补的大数据集成领域的空白, 它不仅提供了可视化的工具实现大数据环境下不同系统之间进行数据移动和互联互通如SQL,HDFS,Kafka, HBASE, Solr,HTTP(s)、(s)FTP 而且还可以以可视化的方式进行各种数据格式的转换,json,xml,avro等。NiFi的出现使得大数据环境下的数据移动变得简单、高效、安全、可视化,大大方便了业务和开发人员。简单来说,NiFi就是一个致力于数据对接的集成框架。
NiFi的一些高级特性如下:
- 基于Web的UI:设计、控制、反馈和监控之间的无缝体验
- 高度可配置:丢失容忍度与保证交付、低延迟与高吞吐量、动态优先级、可以在运行时修改流、支持回压
- 数据溯源:支持从头到尾跟踪数据流
- 支持扩展设计:支持自定义Processor、支持快速开发和有效测试
- 安全特性:支持SSL, SSH, HTTPS,内容加密等等;支持多租户授权和内部授权管理
1.2 NiFi核心概念和架构
1.2.1 NiFi核心概念
NiFi的基本设计概念与基于流的编程思想(Flow Based Programming,简称FBP)密切相关,以下是一些NiFi的主要概念与基于流的编程思想的对照:
| NiFi | FBP | 描述 |
|---|---|---|
| FlowFile | Information Packet | FlowFile表示每个对象从一个系统移动到另一个系统的逻辑单元,NiFi以key/value对的保存属性(attribute),同时关联其内容(content),内容可以是0个字节或者更多字节。 |
| FlowFile Processor | Black Box | Processors实际上是数据处理工作的,利用Processor可以在系统之间实现数据路由、数据转换。Processor可以访问FlowFile的属性(attribute)和内容流。Processor可以在一个工作单元中一次性处理0个或者多个FlowFiles来进行提交或回滚。 |
| Connection | Bounded Buffer | Connection提供Processor之间的链接。它们充当消息队列,允许不同的进程以不同的速率进行交互。这些队列可以动态的改变排序优先级,同时还可以设置队列的上限(数量和空间大小两个维度),从而实现回压(back pressure)。 |
| Flow Controller | Scheduler | Flow Controller维护着进程之间的连接和管理者进程内的线程使用分配情况。Flow Controller在两个processor之间作为Broker来促进FlowFiles的交换。 |
| Process Group | subnet | Process Group是一组特定的进程及其连接,它可以通过Input Ports输入数据,并且向Output Ports发送数据。通过这种方式,Process Group可以通过其他组件的组合来创建全新的组件。 |
这种设计模式,提供了很多优秀特性,使得NiFi成为构建强大而可扩展的数据流的非常有效的平台。一些优点如下:
- 适用于基于“所见即所得”的方式来创建和管理处理器的有向图
- 本质上是异步的,即使在处理和流量波动时也允许非常高的吞吐量和自然缓冲
- 提供高度并发的模型,而开发人员不必担心并发性的典型复杂性
- 促进高内聚和松散耦合组件的开发,这些组件可以在其他上下文中重用,并促进可测试单元。
- 资源受限的连接使诸如回压(back-pressure)和压力释放(pressure release)等关键功能变得非常自然和直观。
- 错误处理变得与基本逻辑一样自然,而不是粗粒度的一网打尽
- 数据如何进出系统和流程易于理解并且易于跟踪
1.2.2 NiFi架构和NiFi基本模块介绍

NiFi运行于JVM之上,其主要组件如下:
Web Server
Web服务器的目的是托管NiFi的基于HTTP的命令和控制API。
Flow Controller
Flow Controller是操作的大脑。它提供用于扩展程序运行的线程,并管理扩展程序接收资源以及执行的时间调度。
Extensions
Nifi同样提供各种各样的扩展,并且这些扩展也运行于JVM之上。
FlowFile Repository
FlowFile Repository是NiFi跟踪目前在流程中活动的给定FlowFile的状态。Repository的实现是可插拔的。默认方法是位于指定磁盘分区上的持久写入WAL。
Content Repository
Content Repository是给定FlowFile的实际内容字节。 Repository的实现是可插拔的。默认方法是一个相当简单的机制,它将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便获得不同的物理分区,以减少任何单个卷上的争用。
Provenance Repository
Provenance Repository是存储所有来源的事件数据的地方。存储库构造是可插入的,默认实现是使用一个或多个物理磁盘卷。在每个位置内,事件数据被索引和可搜索。
NiFi还能够以集群方式运行:

从NiFi 1.0版本开始,开始采用Zero-Master Clustering范式实现集群方式运行。NiFi集群中的每个节点对数据执行相同的任务,但是每个节点都在不同的数据集上进行操作。Apache ZooKeeper选择单个节点作为群集协调器,故障转移由ZooKeeper自动处理。所有群集节点向群集协调器报告心跳和状态信息。集群协调器负责断开连接节点。此外,每个群集都有一个主节点,也由ZooKeeper选择。作为DataFlow管理器,您可以通过任何节点的用户界面(UI)与NiFi集群进行交互。您所做的任何更改都会复制到群集中的所有节点,也就是允许NiFi管理有多个入口点。
1.3 NiFi的性能特点
NiFi旨在充分利用其正在运行的底层主机系统的功能。对于CPU和磁盘,资源最大化特别强。详细可以参考官方文档(Administration Guide)
- 对于IO
可以看到的吞吐量或延迟可能会有很大的不同,这取决于系统的配置方式。考虑到大多数主要的NiFi子系统都有可插拔的方法,性能取决于实现。但是,对于具体和广泛适用的内容,请考虑开箱即用的默认实现。这些都是持续的保证交付,并使用本地磁盘。因此,保守的,假设典型服务器中适度的磁盘或RAID卷上的读/写速率大约为每秒50 MB。对于大量数据流,NiFi应该能够有效地达到每秒100 MB或更多的吞吐量。那是因为每个物理分区和内容存储库都预期添加到NiFi的线性增长。这将在FlowFile存储库和来源存储库的某个时间点出现瓶颈。我们计划提供一个基准测试和性能测试模板,以包含在构建中,从而允许用户轻松测试他们的系统,并确定瓶颈在哪里,以及哪些可能成为一个因素。此模板还可使系统管理员轻松进行更改并验证其影响。
- 对于CPU
流控制器用作引擎,指定特定处理器何时被执行线程。写处理器一经执行任务就会立即返回线程。流控制器可以给出一个配置值,指示其维护的各种线程池的可用线程。使用的理想线程数取决于主机系统资源的核心数量,无论该系统是否运行其他服务,以及流程中的处理特性。对于典型的IO大流量,可以使数十条线程可用。
- 对于RAM
NiFi存在于JVM中,因此限于由JVM提供的内存空间。JVM垃圾收集成为限制实际堆大小的一个非常重要的因素,同时优化应用程序运行时间。当定期阅读相同的内容时,NiFi工作可能是I / O密集型的。配置足够大的磁盘以优化性能。
1.4 Nifi的一些高级特性
NiFi的一些高级特性包括主要的功能类别包括流量管理、易用性、安全性、可扩展的体系结构和灵活的伸缩模型。
流量管理
- 保证交付
NiFi的核心理念是,即使在非常高的规模,保证交付是必须的。这是通过有效使用专用的持久预写日志和内容存储库来实现的。它们一起设计成允许非常高的事务速率,有效的负载传播,写时复制以及发挥传统磁盘读/写功能的优势。
-
数据缓冲带背压和压力释放
NiFi支持对所有排队的数据进行缓冲,以及当队列达到指定限制时提供背压的能力,或者在数据达到指定年龄时使其老化(其值已经消失)的能力。
-
优先排队
NiFi允许设置一个或多个优先级排序方案来了解如何从队列中检索数据。默认值是最早的,但有时候数据应该被拉到最新,最大的第一个或其他一些自定义方案。
-
流特定QoS(延迟v吞吐量,丢失容限等)
数据流的一些点数据绝对关键,并且是不容忍的。还有一段时间,它必须在几秒钟内被处理和交付成为任何价值。NiFi使得细粒度流特定配置这些问题。
使用方便
- 所见即所得的指挥与控制
数据流可能变得相当复杂。能够可视化这些流程并在视觉上表达它们可以大大减少复杂性并确定需要简化的领域。NiFi不仅可以直观地建立数据流,而且可以实时地实现。而不是设计和部署它更像是成型粘土。如果对更改的数据流进行更改立即生效。更改是细粒度的,并且与受影响的组件隔离。您不需要停止整个流程或流程只是为了进行一些具体的修改。
-
流模板
数据流往往是高度模式化的,而通常有许多不同的方式来解决问题,它可以大大地分享这些最佳实践。模板允许主题专家构建和发布他们的流程设计,并为其他人创造和合作。
-
来源追溯
NiFi自动记录,索引并提供可用的来源数据,因为对象即使在扇入,扇出,转换等过程中也可以流经系统。该信息在支持合规性,故障排除,优化和其他场景方面变得非常重要。
-
恢复/记录细粒历史的滚动缓冲区
NiFi的内容存储库旨在作为历史的滚动缓冲区。只有当数据从内容存储库中老化或者需要空间时才会被删除。这与数据来源功能相结合,使得在对象的生命周期中甚至跨越世代的特定点上实现点击内容,内容下载和重放非常有用的基础。
安全
- 系统到系统
数据流只是安全的一样好。数据流中每一点的NiFi都可以通过使用诸如双向SSL等加密协议提供安全交换。此外,NiFi使得流可以加密和解密内容,并使用发件人/收件人方程的任一侧上的共享密钥或其他机制。
-
用户到系统
NiFi支持双向SSL身份验证,并提供可插拔授权,从而可以正确控制用户的访问和特定级别(只读,数据流管理器,管理员)。如果用户在流程中输入密码等敏感属性,则立即加密服务器端,即使在加密形式下也不会再次暴露在客户端。
-
多租户授权
给定数据流的权限级别适用于每个组件,允许管理员用户具有细粒度的访问控制。这意味着每个NiFi集群都能够处理一个或多个组织的要求。与独立拓扑相比,多租户授权可实现数据流管理的自助服务模式,从而允许每个团队或组织对流程进行管理,同时充分了解流程的其他部分,无法访问。
可扩展架构
- 延期
NiFi的核心是扩展的核心,因此它是数据流处理可以以可预测和可重复的方式执行和交互的平台。扩展点包括:处理器,控制器服务,报告任务,优先级和客户用户界面。
-
分类器隔离
对于任何基于组件的系统,可能会迅速发生依赖问题。NiFi通过提供自定义类加载器模型来解决这个问题,确保每个扩展捆绑包都暴露在非常有限的依赖关系中。因此,可以构建扩展,而不用担心它们是否可能与另一个扩展冲突。这些扩展束的概念称为NiFi Archives,并在开发人员指南中有更详细的讨论。
-
站点到站点通信协议
NiFi实例之间的首选通信协议是NiFi站点到站点(S2S)协议。S2S可以方便,高效,安全地将数据从一个NiFi实例传输到另一个。NiFi客户端库可以轻松构建并捆绑到其他应用程序或设备中,以通过S2S与NiFi通信。S2S中都支持基于套接字的协议和HTTP(S)协议作为底层传输协议,从而可以将代理服务器嵌入到S2S通信中。
灵活的缩放模型
- 横向扩展(聚类)
NiFi旨在通过如上所述将多个节点聚类在一起使用来展开。如果单个节点被配置并配置为每秒处理数百MB,则可以配置适度的集群来处理每秒的GB数。这将带来NiFi与获取数据的系统之间的负载平衡和故障转移的有趣挑战。使用基于异步排队的协议(如消息传递服务,Kafka等)可以帮助您。使用NiFi的站点到站点功能也非常有效,因为它是允许NiFi和客户端(包括另一个NiFi集群)相互通话,共享关于加载的信息以及在特定授权端口上交换数据的协议。
-
放大和缩小
NiFi也被设计成以非常灵活的方式进行放大和缩小。在从NiFi框架的角度增加吞吐量方面,可以在配置时增加“计划”选项卡下的处理器上的并发任务数量。这允许更多的进程同时执行,提供更大的吞吐量。另一方面,您可以将NiFi完美地缩放到适合于在硬件资源有限的边缘设备上运行,因为需要较小的占用空间。为了专门解决第一个英里数据收集挑战和边缘用例。
NiFi也被设计成以非常灵活的方式进行放大和缩小。在从NiFi框架的角度增加吞吐量方面,可以在配置时增加“计划”选项卡下的处理器上的并发任务数量。这允许更多的进程同时执行,提供更大的吞吐量。另一方面,您可以将NiFi完美地缩放到适合于在硬件资源有限的边缘设备上运行,因为需要较小的占用空间。为了专门解决第一个英里数据收集挑战和边缘用例。
1.5 NiFi Groups、Processor、queue和connector模块介绍
1.6 NiFi界面演示
2. NiFi安装和基本配置
2.1 NiFi单机版安装
- 下载路径:http://nifi.apache.org/download.html
- 支持Windows和Linux下安装使用
- 提前安装JDK,并配置JAVA_HOME和PATH
2.1.1 解压NiFi到任意位置
目录结构如下:
- bin 可执行脚本
- conf 所有相关配置文件
- docs 使用文档,可以在Nifi界面右击点击Usage调用查看
- lib NiFi依赖的一些jar包,后缀为nar的为Nifi Processor包,我们可以自定义开发nar包扩展Processor
如果采用默认配置,启动后还会创建以下几个目录
- logs 存放运行过程中日志
- run pid和status文件
- work jetty服务相关目录
- content_repository flowfile content目录,建议配置多个磁盘
- database_repository h2数据库文件
- flowfile_repository 保存flowfile状态
- provenance_repository 保存flowfile来源追溯,保存时间可配置
- state
2.1.2 修改配置文件$NIFI_HOME/bin/conf/nifi.properties
启动前,最主要关注两个配置选项,其他选项可以默认
#如果不设置,启动会提示找不到主机名等问题
nifi.web.http.host=192.168.11.101
#防止端口冲突
nifi.web.http.port=8181
2.1.3 启动NiFi
#启动
bin/nifi.sh start
#查看status
bin/nifi.sh status
# 如果输出....Apache NiFi is currently running 则表示启动成功,启动不成功可以查看logs
2.1.4 将NiFi安装为Linux服务
#安装为服务
bin/nifi.sh install nifi
#添加到自动启动项中
chkconfig --add nifi
chkconfig --list nifi
2.1.5 查看UI
使用主机地址:端口/nifi,查看UI,如下:

2.2 NiFi分布式安装
NiFi的分布式安装需要提前安装zookeeper
2.3 NiFi slaves节点配置
2.4 数据丢失容错和保证交付
- 保证交付
NiFi的核心理念是,即使在非常高的规模,保证交付是必须的。这是通过有效使用专用的持久预写日志和内容存储库来实现的。它们一起设计成允许非常高的事务速率,有效的负载传播,写时复制以及发挥传统磁盘读/写功能的优势。
-
数据缓冲带背压和压力释放
NiFi支持对所有排队的数据进行缓冲,以及当队列达到指定限制时提供背压的能力,或者在数据达到指定年龄时使其老化(其值已经消失)的能力。
2.5 低延迟和高吞吐量
流特定QoS(延迟v吞吐量,丢失容限等)数据流的一些点数据绝对关键,并且是不容忍的。还有一段时间,它必须在几秒钟内被处理和交付成为任何价值。NiFi使得细粒度流特定配置这些问题。
2.6 动态优先级
NiFi允许设置一个或多个优先级排序方案来了解如何从队列中检索数据。默认值是最早的,但有时候数据应该被拉到最新,最大的第一个或其他一些自定义方案。
2.7 流可以在运行时修改
NiFi支持在线运行时编辑相关配置,能够保证消息的稳定交付。
3. NiFi GUI 操作介绍
NiFi使用的三部曲拖拽-配置-链接
3.1 拖拽

3.2 选择Processor

选择Processor可以使用左边的标签组或者右上角的filter实现tags的查询,如果没有processor,我们可以自己实现自定义的processor。
3.3 配置Processor
右击Processor可以看到很多选项,点击configure进行配置(也可以双击Canvas上的Processor UI进入配置)。

进入配置

最常用的是scheduling和properties,以GetFile为例,其Properties如下:

3.4 链接Processor
在链接Processor之前,我们再拖拽一个Processor,并且按住鼠标从第一个Processor拖拽到另一个Processor上,松开。

根据relationships创建链接,这里的relationship表示上一个processor处理的结果,success表示,处理成功之后的flowfile会进入到下一个processor,我们可以将处理成功和处理失败的文件分别发送到不同的processor中实现差异处理。
链接后如下:

此时两个Processor左上角都有警告标志,这些警告一般是用于提示配置选项有误或者配置不完善。
GetFile配置完成后如下:表示获取/tmp/nifi_in这个目录下的所有文件然后输入到下一个processor中。

PutFile配置如下:表示将上一个传过来的flowfile文件保存到/tmp/nifi_out目录下。

如果PutFile作为数据流的最后一环,我们需要设置PutFile的relationship如下:表示不管成功和失败都运行结束。
当然,我们也可以将failure指向自身,实现失败重试。如下:

通过上面我们可以看到两个Processor已经配置完成,分别右击两个Processor,选择start,启动Processor,我们可以往/tmp/nifi_in下创建一个文件,然后看会不会自动传输到/tmp/nifi_out目录下。
#创建临时文件
[root@hadoop1 nifi-1.5.0]# touch /tmp/nifi_in/test
#发现test文件已经不存在
[root@hadoop1 nifi-1.5.0]# ls /tmp/nifi_in/
#查看发现test文件被传输到这里
[root@hadoop1 nifi-1.5.0]# ls /tmp/nifi_out/
test
[root@hadoop1 nifi-1.5.0]#
以上就是一个简单的NiFi使用流程。在配置Processor过程中,我们可以单独stop任意一个processor来重新配置。中间产生的flowfile会积压在中间链接的queue中。
在配置Processor过程中,可以右击Processor点击Usage查看帮助文档。
3.5 配置链接
右击中间的链接,或者双击,可以配置链接

配置主要包括两部分,一个是设置queue的积压大小或者数量,类似于kafka的topic配置,另一个是配置flowfile的处理优先级选项。默认是按照时间从前到后来处理的。如下:

4. NiFi Processor介绍
4.1 Processor的原理和功能
为了创建有效的数据流,用户必须了解哪些类型的处理器可供它们使用。NiFi提供了很多开箱即用的Processor,这些处理器提供从许多不同的系统中摄取数据的能力,路由、转换、处理、拆分和聚合数据,并向许多系统分发数据。每个版本中,NiFi会新增或者更新一些Processor。
Processor从功能上讲大致可以分为以下几类:
- 数据转换:比如CompressContent、ConvertCharacterSet、EncryptContent、ReplaceText、TransformXml等等。
- 路由和调流:比如ControlRate、DetectDuplicate、DistributeLoad、RouteOnAttribute、ScanAttribute、RouteOnContent、ScanContent等等。
- 数据库操作:ConvertJSONToSQL、ExecuteSQL、PutSQL、SelectHiveQL、PutHiveQL
- 属性提取:EvaluateJsonPath、EvaluateXPath、EvaluateXQuery、ExtractText、HashAttribute、HashContent、UpdateAttribute等等。
- 系统交互:ExecuteProcess、ExecuteStreamCommand
- 数据提取:GetFile、GetFTP、GetSFTP、GetJMSQueue、GetJMSTopic、GetHTTP、ListenHTTP、ListenUDP、GetHDFS、ListHDFS / FetchHDFS、FetchS3Object、GetKafka、GetMongo、GetTwitter
- 输出数据:PutEmail、PutFile、PutFTP、PutSFTP、PutJMS、PutSQL、PutKafka、PutMongo
- 分割和聚合:SplitText、SplitJson、SplitXml、UnpackContent、MergeContent、SegmentContent、SplitContent
- HTTP相关:GetHTTP、ListenHTTP、InvokeHTTP、PostHTTP、HandleHttpRequest / HandleHttpResponse
- Amazon Web Services:FetchS3Object、PutS3Object、PutSNS、GetSQS、PutSQS、DeleteSQS
4.2 Kafka Processor介绍和实战
4.3 JMS Processor、AMQP Processor介绍和实战
4.4 Hbase、Cassandera、Redis Processor介绍和实战
PutHBaseJSON
FlowFile格式如下:
{
"id" : "1",
"name" : "adam",
"age" : "25"
}
# 其中id对应rowkey
其他的字段会按照列族名:字段名 存储到HBase中
比如f2:name value : adam
f2:age value : 25

4.5 Hdfs Processor、spark Streaming实战
4.6 JDBC Processor介绍和实战
4.7 FTP、Http、XML、Json Processor介绍和实战
5. 使用Flowfile Attribute
每一个FlowFile在创建都会附带一些Attributes,这些Attributes在FlowFile整个生命周期会不断变化。FlowFile的Attribute是非常强大的,提供了三个主要的好处。
第一,它可以让用户根据Attribute来实现不同flowfile进行路由,实现不同flowfile的差异化处理。通常这个可以使用RouteOnAttribute或者相似的Processor来实现。
第二,Attribute可以被用来配置Processor,这样Processor的配置就依赖于数据本身。例如,PutFile Processor能够根据attribute来存储每个flowfile,而每个flowfile的目录和文件名attribute可以为配置为不同的。
第三,Attribute为数据提供了非常有价值的上下文信息。这在获取一个flowfile的起源时非常有用。这允许用户搜索与特定标准相匹配的出处数据,也允许用户在检查出处事件的细节时查看此上下文。通过这样做,用户就可以获得有价值的见解,以了解为什么数据以这样或那样的方式处理,只需浏览与内容一起进行的上下文即可。
5.1 通用Attribute
每一个FlowFille都有一些Attributes
- filename : 可以用存储数据到本地或者远程文件系统中
-
path:可以用来指定数据存储在本地或者远程文件的目录
-
uuid:一个唯一的ID来表示FlowFiles中的flowfile
-
entryDate: 表示flowfile创建的时间,这个时间是从1970.1.1(UTC)起算。
-
lineageStartDate: 记录FlowFile的溯源时间
-
fileSize:表示FlowFile的Content内容大小。
其中
uuid,entryDate,lineageStartDate, andfileSize是系统生成,无法改变。
5.2 提取Attribute
5.3 添加自定义Attribute
5.4 根据Attribute实现路由
5.5 在Attribute中使用Expression Language
6. NiFi数据流的高级操作
6.1 自定义文件名
UpdateAttribute
6.2 添加自定义属性
6.3 添加正则表达式
6.4 实现Path过滤规则
6.5 Template的使用
7. NiFi安全
7.1 创建用户、用户组、角色
7.2 SSL,SSH,HTTPS加密认证
7.3 可插拔的基于角色的验证/授权
7.4 基于webUI的SLA认证授权
8. NiFi集群配置和管理
8.1 NiFi集群添加节点
8.2 删除节点
8.3 NiFi性能优化
8.4 Slaves节点的负载均衡配置
8.5 制定特定的Processor到特定的node节点
8.6 集群动态负载均衡配置
9. 用户自定义Processor
9.1 eclipse工程创建
9.2 实现一个FilterFileProcessor功能
9.3 编译&添加到NiFi的lib下面
9.4 创建用户自定义的FilterFile处理器
9.5 使用动态脚本处理flowfile(groovy,python等)
10. NiFi的监控
10.1 状态栏
10.2 Processor组件统计信息
10.3 通知显示
11. 数据溯源

12 版本控制
12.1 NiFi Registry安装
同NiFi安装类似
以Group级别的版本控制。
http://192.168.11.102:18080/nifi-registry/
NiFi端配置

