java實時監聽日志寫入kafka(轉)


原文鏈接:http://www.sjsjw.com/kf_cloud/article/020376ABA013802.asp

目的

實時監聽某目錄下的日志文件,如有新文件切換到新文件,並同步寫入kafka,同時記錄日志文件的行位置,以應對進程異常退出,能從上次的文件位置開始讀取(考慮到效率,這里是每100條記一次,可調整)

源碼:

復制代碼
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;



import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


/*
* 自己在源服務器寫生產者往kafka插入數據,注意文件"producer.properties放在linux下該jar文件同一目錄
* 監聽某個目錄下的文件數據然后寫入kafka
* nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position >/home/sre/portalhandler/handler.log 2>&1 &
*
*
*/
public class PortalLogTail_Line {

private Producer<String,String> inner;
java.util.Random ran
= new Random();
public PortalLogTail_Line() throws FileNotFoundException, IOException {
Properties properties
= new Properties();
// properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));

properties.load(
new FileInputStream("producer.properties"));

ProducerConfig config
= new ProducerConfig(properties);

inner
= new Producer<String, String>(config);

}


public void send(String topicName,String message) {
if(topicName == null || message == null){
return;
}
// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
//隨機作為key,hash分散到各個分區
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
// KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
inner.send(km);

}

public void send(String topicName,Collection<String> messages) {
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List
<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
for(String entry : messages){
KeyedMessage
<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
}
inner.send(kms);
}

public void close(){
inner.close();
}



public String getNewFile(File file)
{
File[] fs
=file.listFiles();
long maxtime=0;
String newfilename
="";
for (int i=0;i<fs.length;i++)
{
if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))
{
maxtime
=fs[i].lastModified();
newfilename
=fs[i].getAbsolutePath();

}
}
return newfilename;
}
//寫入文件名及行號
public void writePosition(String path,int rn,String positionpath)
{
try {
BufferedWriter out
= new BufferedWriter(new FileWriter(positionpath));
out.write(path
+","+rn);
out.close();
}
catch (IOException e) {
}
}
LineNumberReader randomFile
=null;
String newfile
=null;
String thisfile
=null;
String prefile
=null;
int ln=0;
int beginln=0;
public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{

//啟動一個線程每1秒鍾讀取新增的日志信息
new Thread(new Runnable(){
public void run() {
thisfile
=getNewFile(file);
prefile
=thisfile;
//訪問position文件,如果記錄了文件路徑,及行號,則定位,否則使用最新的文件
try {
BufferedReader br
=new BufferedReader(new FileReader(positionpath));
String line
=br.readLine();
if (line!=null &&line.contains(","))
{
thisfile
=line.split(",")[0];
prefile
=thisfile;
beginln
=Integer.parseInt(line.split(",")[1]);
}


}
catch (FileNotFoundException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}

//指定文件可讀可寫
try {
randomFile
= new LineNumberReader(new FileReader(thisfile));
}
catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while (true)
{
try {
Thread.sleep(
100);

}
catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
//獲得變化部分的
// randomFile.seek(lastTimeFileSize);
String tmp = "";
while( (tmp = randomFile.readLine())!= null) {
int currln=randomFile.getLineNumber();
//beginln默認為0
if (currln>beginln)
send(topicname,
new String(tmp.getBytes("utf8")));

ln
++;

//每發生一條寫一次影響效率,連續發100次后再記錄位置
if (ln>100)
{
writePosition(thisfile,currln,positionpath);
ln
=0;
}

}
thisfile
=getNewFile(file);
if(!thisfile.equals(prefile))

{
randomFile.close();
randomFile
= new LineNumberReader(new FileReader(thisfile));
prefile
=thisfile;
beginln
=0;
}


}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}}).start();
}

/**
*
@param args
*
@throws Exception
*/
public static void main(String[] args) throws Exception {
PortalLogTail_Line producer
= new PortalLogTail_Line();
if (args.length!=3)
{
System.out.println(
"usage:topicname pathname positionpath");
System.exit(
1);
}
String topicname
=args[0];
String pathname
=args[1];
String positionpath
=args[2];
final File tmpLogFile = new File(pathname);
producer.realtimeShowLog(tmpLogFile,topicname,positionpath);



}

}
復制代碼
復制代碼
producer.properties文件放在同級目錄下

metadata.broker.list
=xxx:10909,xxx:10909

# name of the partitioner
class for partitioning events; default partition spreads data randomly
#partitioner.
class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type
=sync
#producer.type
=async

# specify the compression codec
for all data generated: none , gzip, snappy.
# the old config values work as well:
0, 1, 2 for none, gzip, snappy, respectivally
compression.codec
=none
#compression.codec
=gzip

# message encoder
serializer.
class=kafka.serializer.StringEncoder
復制代碼

測試

最后執行: 

 nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
分類: Java,Kafka

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com