Hadoop兮,杀鸡别用牛刀,python+shell实现一般日志文件的查询、统计

Stella981
• 阅读 843

简单的日志统计是不需要使用重量级的Hadoop,我用python实现了日志的统计。原理是用fabric登录到远程linux,组合使用grep、uniq、sort、awk对日志进行操作,可以根据正则表达式指定规则抽取符合规则的日志,做查询,计数,分类统计。

注意:要安装fabric

主文件:LogQuery.py

#encoding=utf-8

from fabric.api import run,env,local,cd
from fabric.tasks import execute,abort
from fabric.contrib.console import confirm
import logging

logging.basicConfig(format='[%(levelname)s]: %(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
logging.getLogger('paramiko.transport').setLevel(logging.ERROR)
logger.setLevel(logging.DEBUG)

EXECUTE_RESULT = {}

def hosts(hostarr):
    '''
    set hosts
    hostarr:[(hostname,password),(hostname,password)...]
    '''
    env.hosts =  [x[0] for x in hostarr]
    env.passwords = dict(x for x in hostarr)

def query(expression,hostname,logfile,unique=True,sort=None,output=None,pattern=None,path=None):
    '''
    expression: regex rule
    hostname: hostname as specified hosts()
    logfile: log file name, wildcard supported, eg:*.log
    unique: whether result is unique
    sort: 1(ASC) or -1(DESC) ,default None
    output:None or file name, default None imply print stream
    pattern: group pattern , default None imply '1'
    path: cd to path before execution
    '''

    if not path:
        path = r'.'
    cmd_str = generate_cmd(expression,logfile,unique,sort,output,pattern)
    execute(executor,hostname,cmd_str,path,host=hostname)
    result = EXECUTE_RESULT[hostname]
    return result

def aggregate(expression,hostname,logfile,output=None,pattern=None,path=None):
    '''
    expression: regex rule
    hostname: hostname as specified hosts()
    logfile: log file name, wildcard supported, eg:*.log
    output:None or file name, default None imply print stream
    pattern: group pattern , default None imply '1'
    path: cd to path before execution
    '''
    if not path:
        path = r'.'
    cmd_str = generate_cmd(expression,logfile,False,None,output,pattern,True,True)
    execute(executor,hostname,cmd_str,path,host=hostname)
    result = EXECUTE_RESULT[hostname]
    return result

def count(expression,hostname,logfile,unique=True,sort=None,output=None,pattern=None,path=None):
    '''
    expression: regex rule
    hostname: hostname as specified hosts()
    logfile: log file name, wildcard supported, eg:*.log
    unique: whether result is unique
    sort: 1(ASC) or -1(DESC) ,default None
    output:None or file name, default None imply print stream
    pattern: group pattern , default None imply '1'
    path: cd to path before execution
    '''

    if not path:
        path = r'.'
    cmd_str = generate_cmd(expression,logfile,unique,sort,output,pattern,True)
    execute(executor,hostname,cmd_str,path,host=hostname)
    result = EXECUTE_RESULT[hostname]
    if result:
        result = int(result[0])
    return result


def executor(hostname,cmd_str,path=None):
    '''
    executor , called by execute
    '''
    if not path:
        path = r'.'
    with cd(path):
        res = run(cmd_str,quiet=True)
        logger.debug('Command: %s:%s > %s'%(hostname,path,cmd_str))
        logger.debug('Command Execute Successful:%s, Failure:%s'%(res.succeeded,res.failed))
        EXECUTE_RESULT[hostname] = res.splitlines()

def generate_cmd(expression,logfile,unique=True,sort=None,output=None,pattern=None,count=False,aggregate=False):
    '''
    generate command
    '''
    if not pattern:
        pattern = r'\1'

    if aggregate:
        aggregate = '''| awk  '{a[$1]++}END{for (j in a) print j","a[j]}' '''
        unique = False
        sort = False
        count = False
    else:
        aggregate = ''

    if not unique:
        unique = ''
    else:
        unique = '| uniq'

    if sort:
        if sort==1:
            sort = '| sort'
        elif sort==-1:
            sort = '| sort -r'
        else:
            sort = ''
    else:
        sort = ''

    if count:
        count = '| wc -l'
    else:
        count = ''

    if output:
        output = '>%s'%output
    else:
        output = ''

    cmd_str = '''cat %s | grep "%s" | sed 's/%s/%s/g' %s %s %s %s %s'''%(logfile,expression,expression,pattern,unique,sort,count,output,aggregate)
    return cmd_str

假设你的日志是这样的:

spider.A crawled http://www.163.com/abc.html
spider.A crawled http://www.yahoo.com/xyz.html
spider.B crawled http://www.baidu.com/mnq.html
other log no crawing infomation involved
spider.C crawled http://www.sina.com.cn/yyy.html

使用案例:test.py

#encoding=utf-8

import LogQuery

#定义多个主机,用户名@主机,登录密码
myhosts = [('rootman@192.168.2.228','123'),('rootman@192.168.2.229','123'),('rootman@192.168.2.219','123')]
LogQuery.hosts(myhosts)

'''
案例1:
查询有哪些域名被抓取过,使用query方法,会返回所有符合规则的数据
预期返回:
www.163.com
www.yahoo.com
...
'''
res = LogQuery.query('\(.*crawled http:\/\/\)\([^\/]*\)\(\/.*\)',myhosts[0][0],'gcrawler.*.log',unique=True,sort=None,output=None,pattern=r'\2',path='/home/workspace/Case/trunk/src/gcrawler/log')
'''
上一行代码解读:
第一个参数指定了表示抓取的日志正则表达式,并且将其分组(为了提取域名),分组的括号用\(,\)表示,第二组是域名的提取。
第二个参数指定了要查询那一台主机上的日志
第三个参数指定了要分析的日志文件名,*表示任何字符
第四个参数unique,是否对返回的条目进行排重,例如:日志中发现多个www.163.com,只算一个
第五个参数sort,是否需要对抽取的条目进行排序,1:正序、-1:倒序,这里为None,即不需要排序
第六个参数output,可以指定运行结果输出到某个文件,在这里不需要输出,为None
第七个参数pattern,是指从正则表达式中抽取哪个分组,默认是第一组,这里用r'\2'指定第二组
第八个参数path指定了日志在操作系统上所在的目录
以下的count、aggregate方法使用的参数和query都是一样的意义
'''

'''
案例2:
统计被抓取过的域名有几个,使用count方法,会返回所有符合规则的统计总数
预期返回:4
...
'''
res = LogQuery.count('\(.*crawled http:\/\/\)\([^\/]*\)\(\/.*\)',myhosts[1][0],'gcrawler.*.log',unique=True,sort=None,output=None,pattern=r'\2',path='/home/workspace/Case/trunk/src/gcrawler/log')

'''
案例3:
分别统计每个域名被抓取的数量
返回的结果:
域名1,统计数字
域名2,统计数字
...
'''
res = LogQuery.aggregate('\(.*crawled http:\/\/\)\([^\/]*\)\(\/.*\)',myhosts[2][0],'gcrawler.*.log',output=None,pattern=r'\2',path='/home/workspace/Case/trunk/src/gcrawler/log')#这里是分类统计就没必要指定unique和sort了。
#打印分组统计的情况
for i in res:
    domain,count = i.split(',')
    total += int(count)
    print domain,'=>',count
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Shell 中常见的日志统计方法
前面我发布过"Hadoop兮,杀鸡别用牛刀,pythonshell实现一般日志文件的查询、统计(http://my.oschina.net/waterbear/blog/149881)",需要结合python,可能还是有一定的门槛,现将shell部分剥离出来.举例一些最基本的日志统计方法.(1)查看文件more craw
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Spring Boot日志集成
!(https://oscimg.oschina.net/oscnet/1bde8e8d00e848be8b84e9d1d44c9e5c.jpg)SpringBoot日志框架SpringBoot支持JavaUtilLogging,Log4j2,Lockback作为日志框架,如果你使用star
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这