当FlumeChannel启动时,或者故障恢复时,会经历一次重播(replay)过程,重播的目的就是还原上一次的“现场”,当然,最主要的就是恢复FlumeEventQueue中的内存队列相关数据。重播的主要实现是有Log类来做的,Log类的replay实现了整个重播过程,简单来说,重播过程分为如下几个步骤:
步骤1:获取检查点文件的独占锁(checkpointWriterLock.lock();)。
步骤2:将数据文件ID的初始值设置成0(nextFileID.set(0);)。
步骤3:便利所有数据文件目录(dataDirs),将所有数据文件放入文件列表dataFiles中,将nextFileID设置成当前存在的数据文件ID的值,便于后期调用能生成正确的数据文件ID,将数据文件ID和数据文件的随机读取器之间的映射关系放入idLogFileMap之中。
步骤4:对数据文件列表dataFiles安装数据文件ID进行升序排序。
步骤5:如果use-fast-replay设置成ture(默认值为false)且检查点文件(checkpoint)不存在,则进行快速全播(fast full replay)流程,见步骤7。
步骤6:如果use-fast-replay设置成false,则通过检查点进行重播。见步骤8.
步骤7:类似于步骤8,只是不通过检查点的中最大的写顺序ID开始重播而已(因为当时还没有检查点)。
步骤8:记住检查点的当前写顺序ID(从checkpoint.meta中获得),从inflightputs文件反序列化得到未提交的put的事务ID和Event指针(FlumeEventPointer)的映射inflightPuts,从inflighttakes文件中反序列化得到未提交的take的事务ID和Event指针的映射inflightTakes。
步骤9:遍历数据文件(通过前面得到的数据文件列表dataFiles),找出有包含比检查点的写数据ID还大的写数据ID的数据文件,因为这些数据文件才对恢复File Channel内存队列有用,别忘了File Channel内存队列保存的是所有未被消费的FlumeEventPoint(这当然也需要把那些没完成事务最后一步commit动作的事务给继续完成),并把这些数据文件的该写顺序ID后面的下一个行记录对象(即LogRecord,LogRecord中不仅包含数据记录,还包含记录类型和操作类型,记录类型有put/take/commit/rollback,操作类型有put/take)保存到队列logRecordBuffer中。
步骤10:通过logRecordBuffer可以将所有需要读取的数据文件的剩余部分遍历一遍,这样,我们可以得到一个只包含put和take操作但却未commit和rollback的事务ID对应FlumeEventPoint的Map,即transactionMap,同时,还会更新步骤8中提到的inflightTakes,移除掉已经成功commit的take事务。如果发现有已经提交的事务,则需要进行提交处理,如果是commit的put事务,则将其FlumeEventPoint添加到内存队列队尾,如果是commit的take事务,则从内存队列中移除。 当然,还可以得到当前最大的事务ID(transactionIDSeed)和最大的写顺序ID(writeOrderIDSeed),这个是为了让后面生成的事务ID和写顺序ID可用(TransactionIDOracle.setSeed(transactionIDSeed);WriteOrderOracle.setSeed(writeOrderIDSeed);)。这一步做完,内存队列中已经包含了所有已经完成事务commit但并没有被Sink消费的所有FlumeEventPoint了。
步骤11:将所有没有commit的take事务所包含的数据(inflightTakes中的数据)重新插入到内存队列的头部。
从以上步骤可以看出,Flume中有两种重播方式,一种是不通过检查点(此时必须检查点不存在且配置的use-fast-replay为true)的“快速全播”,一种是普通的通过检查点重播,这也是默认的重播方式。重播的目的就是为了通过磁盘文件来恢复File Channel的内存队列,使File Channel能继续运行,重播需要的时间和当时内存队列中未被消费的FlumeEventPoint成正比
关于步骤1,自不必多说,因为重播过程中,是不能接收消息的,就像JVM GC真正执行时需要Stop World一样。步骤二,需要理解什么是数据文件ID,可以看我前面的Flume快速入门(三)博文(http://manzhizhen.iteye.com/blog/2298394),这里再次阐述一次,为了保证每个数据目录(dataDir)下数据文件的合理大小,当数据超过一个数据文件的最大容量时,Flume会自动在当前目录新建一个数据文件,为了区分同一个数据目录下的数据文件,Flume采用文件名加数字后缀的形式(比如log-1、log-2),其中的数字后缀就是数据文件ID,它由Log实例中的nextFileID属性来维护,它是原子整形,由于在Flume Agent实例中,一个File Channel会对应一个Log实例,所以数据文件ID是唯一的,即使你配置了多个数据目录。每个数据文件都有一个对应的元数据文件(MetaDataFile),它和数据文件在同一目录,命名则是在数据文件后面加上.meta,比如log-1对应的元数据文件是log-1.meta,其实就是将Checkpoint对象通过谷歌的Protos协议序列化到元数据文件中,Checkpoint存储了对应数据文件的诸多重要信息,比如版本、写顺序ID(logWriteOrderID)、队列大小(queueSize)和队列头索引(queueHead)等。元数据文件主要用于快速将数据文件的信息载入到内存中。注意,检查点也有自己的元数据文件(checkpoint.meta)。其实,步骤1-4理解起来都不难,我们知道,检查点目录(checkpointDir)下一共有四个文件(除去锁文件和检查点的元数据文件):检查点文件(checkpoint)、提取未提交文件(inflighttakes)、写入未提交文件(inflightputs)和队列集合文件(queueset,更准确来说这是目录),内存队列正式在这四个文件的相互补充下,得到完整的恢复。
其中上述的步骤10是关键,但也容易理解,比如最常用的通过检查点来恢复,检查点的元数据文件中保存的最大写顺序ID,说明在这个写顺序ID之前的数据要不就已经在检查点文件中了要不就已经被Sink消费掉了,所以通过检查点文件恢复的内存队列,还需要补充两种数据:第一种数据是已经commit的数据但还未来得及对检查点数据刷盘(默认每30秒将内存队列写入检查点文件,可通过checkpointInterval来设置)。第二种数据是飞行中的数据,即还未来得及commit的数据。步骤10中提到的transactionMap的类型是MultiValueMap,所以可以从一个事务ID中找到和其相关的所有操作记录,从上述的步骤10和步骤11可以看出,对于第一种数据,可以通过遍历包含了检查点最大写顺序ID之后数据的数据文件来将其加载到内存队列中,但这时候内存队列中的数据是有冗余的,包含了已经被消费的commit事务的数据,所以这时候未提交数据凭证文件(inflightputs文件和inflighttakes文件)中的数据就起到作用了,将检查点写顺序ID后的所有事务数据(transactionMap)和通过inflightPuts映射、inflightTakes经过运算得到的数据(put未commit或者put已经commit但take没commit的数据)取“交集”得到的FlumeEventPoint集合,就是内存队列需要补充的数据了。
相关推荐
Flume1.6.0入门:安装、部署、及flume的案例
Flume1.5.0入门:安装、部署、及flume的案例Flume1.5.0入门:安装、部署、及flume的案例
视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 1、介绍: ...章节九:Channel选择器 章节十:Sink处理器 章节十一:导入数据到HDFS 章节十二:Flume SDK 章节十三:Flume监控
flume-ng安装
本文讲述了flume中channel和sink简单描述和linux配置 包括:Memory channel、File channel及其它测试阶段的Channel; 及channel通过sink的输出配置Logger Sink、File Roll Sink、HDFS Sink、Avro Sink(多级流动、...
flume介绍与原理: 1、flume的优势 2、Flume具有的特征:source->channel->sink 3、flume配置介绍 4、flume启动命令
尚硅谷大数据技术之Flume
apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar 压缩 到了一个logs.rar文件中,需要的请下载
hadoop12下载flume安装包wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/ap
1.Flume Agent的编写: flume_push_streaming.conf 2.代码 4.本地测试总结 1.Flume Agent的编写: flum
04_Flume中配置使用file channel及HDFS sink 05_Flume中配置HDFS文件生成大小及时间分区 06_Flume中配置Spooling Dir的使用 07_Flume中配置Spooling Dir的文件过滤 08_Flume中配置扇入架构的介绍 09_Flume中...
flume_exporter 普罗米修斯水槽出口商。 要运行它: make build ./flume_exporter [flags] 标志帮助: ./flume_exporter --help 配置:config.yml agents: - name: "flume-agents" enabled: true # ...
$ cd flume-round-robin-channel-selector $ mvn clean package $ ls target flume-round-robin-channel-selector-1.0.jar 将JAR添加到Flume类路径 $ cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-...
第 2 章 Flume 快速入门2.1.1 安装地址2)文档查看地址3)下载地址2.1.2 安装部署2.2.1 监控端口数据官方案例1)案例需求:使用 Flum
flume的包。flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、...
flume入门介绍,简单介绍flume的背景和应用场景,flume的实现原理以及案例分享
Flume入门使用.pdf 学习资料 复习资料 教学资源
Flume 快速入门教程,文本数据采集