`

征服flume之三——使用log4j输出日志到flume

阅读更多
接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。

本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!

1、flume配置文件
agent.sources = so1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined

agent.sinks.s1.type = logger

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1


2、测试代码
public class FlumeLogTest {
	private Logger logger = LoggerFactory.getLogger(getClass());
	public static void main(String[] args) throws Exception {
		DOMConfigurator.configureAndWatch("config/log4j.xml");
		new FlumeLogTest().start();
	}

	public void start() {
		while(true){
			logger.debug("flume log test:{}",System.currentTimeMillis());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
}


3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
	<appender name="flume"
		class="org.apache.flume.clients.log4jappender.Log4jAppender">
		<param name="hostname" value="192.168.113.181" />
		<param name="port" value="44444" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" />
		</layout>
	</appender>
	<appender name="async" class="org.apache.log4j.AsyncAppender">
		<param name="Blocking" value="false" />
		<param name="BufferSize" value="500" />
		<appender-ref ref="flume" />
	</appender>

	<appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender">
		<param name="target" value="System.out" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" />
		</layout>
		<filter class="org.apache.log4j.varia.LevelRangeFilter">
			<param name="LevelMin" value="debug" />
			<param name="LevelMax" value="info" />
			<param name="AcceptOnMatch" value="false" />
		</filter>
	</appender>

	<logger name="org.springframework">
		<level value="ERROR" />
	</logger>
	<logger name="com.cp.flume">
		<level value="debug" />
	</logger>
	<root>
		<priority value="info"></priority>
		<appender-ref ref="async" />
		<appender-ref ref="CONSOLE.OUT" />
	</root>
</log4j:configuration>

4、pom.xml
		<dependency>
			<groupId>org.apache.flume.flume-ng-clients</groupId>
			<artifactId>flume-ng-log4jappender</artifactId>
			<version>1.7.0-SNAPSHOT</version>
		</dependency>


这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)

package org.apache;


import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcClientFactory.ClientType;
import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/** 
* @project:		flume-log4j-test
* @Title:		FailoverLog4jAppender.java
* @Package:		org.apache
  @author: 		chenpeng
* @email: 		46731706@qq.com
* @date:		2016年2月24日下午2:12:16
* @description:
* @version:
*/ 
public class FailoverLog4jAppender extends Log4jAppender {

	private String hosts;
	private String maxAttempts;
	private boolean configured = false;

	public void setHosts(String hostNames) {
		this.hosts = hostNames;
	}

	public void setMaxAttempts(String maxAttempts) {
		this.maxAttempts = maxAttempts;
	}

	@Override
	public synchronized void append(LoggingEvent event) {
		if (!configured) {
			String errorMsg = "Flume Log4jAppender not configured correctly! Cannot"
					+ " send events to Flume.";
			LogLog.error(errorMsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(errorMsg);
		}
		try {
			super.append(event);
		} catch (FlumeException e) {
			e.printStackTrace();
		}
	}

	/**
	 *
	 * @throws FlumeException
	 *             if the FailoverRpcClient cannot be instantiated.
	 */
	@Override
	public void activateOptions() throws FlumeException {
		try {
			final Properties properties = getProperties(hosts, maxAttempts,
					getTimeout());
			rpcClient = RpcClientFactory.getInstance(properties);
			if (layout != null) {
				layout.activateOptions();
			}
			configured = true;
		} catch (Exception e) {
			String errormsg = "RPC client creation failed! " + e.getMessage();
			LogLog.error(errormsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(e);
		}

	}

	/**
	*/
	private Properties getProperties(String hosts, String maxAttempts,
			long timeout) throws FlumeException {

		if (StringUtils.isEmpty(hosts)) {
			throw new FlumeException("hosts must not be null");
		}

		Properties props = new Properties();
		String[] hostsAndPorts = hosts.split("\\s+");
		StringBuilder names = new StringBuilder();
		for (int i = 0; i < hostsAndPorts.length; i++) {
			String hostAndPort = hostsAndPorts[i];
			String name = "h" + i;
			props.setProperty(
					RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,
					hostAndPort);
			names.append(name).append(" ");
		}
		props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
				names.toString());
		props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
				ClientType.DEFAULT_FAILOVER.toString());

		if (StringUtils.isEmpty(maxAttempts)) {
			throw new FlumeException("hosts must not be null");
		}

		props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS,
				maxAttempts);

		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
				String.valueOf(timeout));
		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
				String.valueOf(timeout));
		return props;
	}

}
分享到:
评论
1 楼 di1984HIT 2017-07-21  
不错,学习了~~

相关推荐

Global site tag (gtag.js) - Google Analytics