接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。
本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!
1、flume配置文件
agent.sources = so1
agent.channels = c1
agent.sinks = s1
# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30
# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.s1.type = logger
#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1
2、测试代码
public class FlumeLogTest {
private Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
DOMConfigurator.configureAndWatch("config/log4j.xml");
new FlumeLogTest().start();
}
public void start() {
while(true){
logger.debug("flume log test:{}",System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
<appender name="flume"
class="org.apache.flume.clients.log4jappender.Log4jAppender">
<param name="hostname" value="192.168.113.181" />
<param name="port" value="44444" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" />
</layout>
</appender>
<appender name="async" class="org.apache.log4j.AsyncAppender">
<param name="Blocking" value="false" />
<param name="BufferSize" value="500" />
<appender-ref ref="flume" />
</appender>
<appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender">
<param name="target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" />
</layout>
<filter class="org.apache.log4j.varia.LevelRangeFilter">
<param name="LevelMin" value="debug" />
<param name="LevelMax" value="info" />
<param name="AcceptOnMatch" value="false" />
</filter>
</appender>
<logger name="org.springframework">
<level value="ERROR" />
</logger>
<logger name="com.cp.flume">
<level value="debug" />
</logger>
<root>
<priority value="info"></priority>
<appender-ref ref="async" />
<appender-ref ref="CONSOLE.OUT" />
</root>
</log4j:configuration>
4、pom.xml
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)
package org.apache;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcClientFactory.ClientType;
import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
/**
* @project: flume-log4j-test
* @Title: FailoverLog4jAppender.java
* @Package: org.apache
@author: chenpeng
* @email: 46731706@qq.com
* @date: 2016年2月24日下午2:12:16
* @description:
* @version:
*/
public class FailoverLog4jAppender extends Log4jAppender {
private String hosts;
private String maxAttempts;
private boolean configured = false;
public void setHosts(String hostNames) {
this.hosts = hostNames;
}
public void setMaxAttempts(String maxAttempts) {
this.maxAttempts = maxAttempts;
}
@Override
public synchronized void append(LoggingEvent event) {
if (!configured) {
String errorMsg = "Flume Log4jAppender not configured correctly! Cannot"
+ " send events to Flume.";
LogLog.error(errorMsg);
if (getUnsafeMode()) {
return;
}
throw new FlumeException(errorMsg);
}
try {
super.append(event);
} catch (FlumeException e) {
e.printStackTrace();
}
}
/**
*
* @throws FlumeException
* if the FailoverRpcClient cannot be instantiated.
*/
@Override
public void activateOptions() throws FlumeException {
try {
final Properties properties = getProperties(hosts, maxAttempts,
getTimeout());
rpcClient = RpcClientFactory.getInstance(properties);
if (layout != null) {
layout.activateOptions();
}
configured = true;
} catch (Exception e) {
String errormsg = "RPC client creation failed! " + e.getMessage();
LogLog.error(errormsg);
if (getUnsafeMode()) {
return;
}
throw new FlumeException(e);
}
}
/**
*/
private Properties getProperties(String hosts, String maxAttempts,
long timeout) throws FlumeException {
if (StringUtils.isEmpty(hosts)) {
throw new FlumeException("hosts must not be null");
}
Properties props = new Properties();
String[] hostsAndPorts = hosts.split("\\s+");
StringBuilder names = new StringBuilder();
for (int i = 0; i < hostsAndPorts.length; i++) {
String hostAndPort = hostsAndPorts[i];
String name = "h" + i;
props.setProperty(
RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,
hostAndPort);
names.append(name).append(" ");
}
props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
names.toString());
props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
ClientType.DEFAULT_FAILOVER.toString());
if (StringUtils.isEmpty(maxAttempts)) {
throw new FlumeException("hosts must not be null");
}
props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS,
maxAttempts);
props.setProperty(
RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
String.valueOf(timeout));
props.setProperty(
RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
String.valueOf(timeout));
return props;
}
}
分享到:
相关推荐
log4j输出日志到flume例子,包含log4j配置,flume配置,测试类
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
Flume + kafka + log4j构建日志采集系统,附实例及文档。
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
apache-log4j-1.2.15.jar, apache-log4j-extras-1.0.jar, apache-log4j-extras-1.1.jar, apache-log4j.jar, log4j-1.2-api-2.0.2-javadoc.jar, log4j-1.2-api-2.0.2-sources.jar, log4j-1.2-api-2.0.2.jar, log4j-...
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
该压缩包下commons-lang3-3.3.2.jar,spark-streaming-flume_2.10-1.6.0.jar,scala-compiler-2.10.5.jar用于实现Flume监控文件夹中的内容变化,然后Spark Streaming对数据进行分析。
LogDemox 收集信息通过log4j直接打到flume中,进行日志收集
log4j+flume+kafka+storm整合
hadoop集群配置之————flume安装配置(详细版)
Apache Log4j 2.0 发布第 4 个 Beta 版本,包括的新特性有: o Added Log4j 2 to SLF4J adapter. o LOG4J2-131: Add SMTPAppender. Thanks to Scott Severtson. o Added hostName and contextName to property ...
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
Flume采集Nginx日志到Hive的事务表时需要导入到Flume下的Jar文件,具体使用方式可参见博文:https://blog.csdn.net/l1028386804/article/details/97975539
Apache Log4j Flume Appender org.apache.logging.log4j/log4j-flume-ng/2.0-rc2/log4j-flume-ng-2.0-rc2.jar
flume log4f示例源码
Apache Flume, Distributed Log Collection for Hadoop,2015 第二版,Packt Publishing
log4j对log4j修改,重新修改日志写出格式以便支持flume-ng的格式对exception信息重构
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
NULL 博文链接:https://chengjianxiaoxue.iteye.com/blog/2169989
'Log4j2 中文文档.epub' 'Apache FreeMarker 2.3.28 中文文档.epub' 'Logback 1.3.0-alpha4 中文文档.epub' 'Apache Hive 3.1.1 中文文档.epub' 'Nginx 中文文档.epub' 'Apache Server 2.4 中文文档.epub' '...