用Python构建基于Hadoop的MapReduce日志分析平台

[复制链接]
查看11 | 回复9 | 2014-2-19 11:55:14 | 显示全部楼层 |阅读模式
流量比较大的日志要是直接写入Hadoop对Namenode负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入HDFS。 根据情况定期合成,写入到hdfs里面。


图片1.jpg (22.27 KB, 下载次数: 12)
下载附件
2013-12-22 22:58 上传


回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
咱们看看日志的大小,200G的dns日志文件,我压缩到了18G,要是用awk perl当然也可以,但是处理速度肯定没有分布式那样的给力。


图片2.jpg (144.35 KB, 下载次数: 13)
下载附件
2013-12-22 22:59 上传

回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
Hadoop Streaming原理
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
任何语言,只要是方便接收标准输入输出就可以做mapreduce~
再搞之前我们先简单测试下shell模拟mapreduce的性能速度~


图片3.jpg (112.87 KB, 下载次数: 9)
下载附件
2013-12-22 23:00 上传

回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
看下他的结果,350M的文件用时35秒左右。


图片4.jpg (48.58 KB, 下载次数: 8)
下载附件
2013-12-22 23:00 上传

回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
这是2G的日志文件,居然用了3分钟。 当然和我写的脚本也有问题,我们是模拟mapreduce的方式,而不是调用shell下牛逼的awk,gawk处理。


图片5.jpg (48.83 KB, 下载次数: 12)
下载附件
2013-12-22 23:02 上传

回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
awk的速度 !果然很霸道,处理日志的时候,我也很喜欢用awk,只是学习的难度有点大,不像别的shell组件那么灵活简单。


p1.jpg (83.67 KB, 下载次数: 7)
下载附件
2013-12-24 14:48 上传

回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
这是官方的提供的两个demo ~
map.py#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_input(file):
for line in file:
# split the line into words
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
for word in words:

print '%s%s%d' % (word, separator, 1)
if __name__ == "__main__":
main()
复制代码
回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
reduce.py的修改方式#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["", ""] items
for current_word, group in groupby(data, itemgetter(0)):
try:

total_count = sum(int(count) for current_word, count in group)

print "%s%s%d" % (current_word, separator, total_count)
except ValueError:

# count was not a number, so silently discard this item

pass
if __name__ == "__main__":
main()
复制代码
回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
顶下楼主
回复

使用道具 举报

千问 | 2014-2-19 11:55:14 | 显示全部楼层
咱们再简单点:#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)
复制代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

主题

0

回帖

4882万

积分

论坛元老

Rank: 8Rank: 8

积分
48824836
热门排行