博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
实例讲解hadoop中的map/reduce查询(python语言实现)
阅读量:5788 次
发布时间:2019-06-18

本文共 4429 字,大约阅读时间需要 14 分钟。

条件,假设你已经装好了hadoop集群,配好了hdfs并可以正常运行。
$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129
$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index
数据格式如下
20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>
1.map脚本取数据explorer_map.py
#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
debug = False#设置lzo文件偏移位
if debug:
        lzo = 0
else:
        lzo = 1
for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#hadoop查询走标准输入,数据以\t分隔,去掉每行中的\n
                if len(flags) == 0:
                        break
                if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1,lzo设置为False则+1
                        continue
                stat_date=flags[0+lzo]#日期
                stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                version = flags[4+lzo]
                xmlstr = flags[10+lzo]
                #xmlstr=line
                dom = cElementTree.fromstring(xmlstr)
#xml字段对象,以下均为取值操作
                uuid = dom.attrib['UUID']
                node = dom.find('UserDoubleClick')
                associateKey=node.get('AssociateKey')
                associateKeys=associateKey.split('.')
                player = associateKeys[0]
                fileext=node.get('FileExt')
                count=node.get('Count')
                print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
#将\t前的字符串作为key输入reduce,\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常        
2.reduce脚本计算结果并输出explorer_red.py
#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
import os
import string
res = {}
for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#拆分\t以获得map传过来的key和value
                if len(flags) != 2:
#\t切割后,如果数据有问题,元素多于2或者少于2则认为数据不合法,跳出继续下一行
                        continue
                skey= flags[0]
#取出第一个元素作为key
                count=int(flags[1])
#取出第二个元素作为value
                if res.has_key(skey) == False:
                        res[skey]=0
                res[skey] += count
#计算count总和
        except Exception,e:
                pass
#不抛出,继续执行
for key in res.keys():
        print key+','+'%s' % res[key]
#格式化输出,以放入临时文件
3.放入crontab执行的脚本
#!/bin/sh
[ $1 ] && day=$1 DATE=`date -d "$1" +%Y%m%d`
[ $1 ] || day=`date -d "1 day ago" +%Y%m%d`     DATE=`date -d "1 day ago" +%Y%m%d`
#取昨天日期
cd /opt/modules/hadoop/hadoop-0.20.203.0/
#进入hadoop工作目录
bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -file /home/rsync/explorer/explorer_map.py -file /home/rsync/explorer/explorer_red.py -mapper /home/rsync/explorer/explorer_map.py -reducer /home/rsync/explorer/explorer_red.py -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -input /data/dw/explorer/$DATE -output /tmp/explorer_$DATE
#执行map/reduce,并将排序完结果放入hdfs:///tmp/explorer
bin/hadoop fs -copyToLocal /tmp/explorer_$DATE /tmp
#将m/r结果从hdfs://tmp/explorer_$DATE 保存到本地/tmp下
bin/hadoop dfs -rmr /tmp/explorer_$DATE
#删除hdfs下临时文件夹
cd
#返回自身目录
cd explorer
#进入explorer文件夹
./rm.py $DATE
执行入库和删除临时文件夹脚本
4.将/tmp生成的结果入库并删除临时文件夹
#!/usr/bin/python
import os
import sys
import string
if len(sys.argv) == 2:
                date = sys.argv[1:][0] #取脚本参数
                os.system ("mysql -h192.168.1.229 -ujobs -p223238 -P3306    bf5_data    -e \"load data local infile '/tmp/explorer_"+date+"/part-00000' into table explorer FIELDS TERMINATED
BY '\,' (stat_date,ver,FileExt,player,AssociateKey,count)\"")#执行入库sql语句,并用load方式将数据加载到统计表中
                os.system ("rm -rf /tmp/explorer_"+date)#删除map/reduce过的数据
else:
                print "Argv error"
#因为没有安装MySQLdb包,所以用运行脚本的方式加载数据。
原始数据和最后完成的输出数据对比,红色为原数据,绿色为输出数据
20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>
-----------------------------------------------------------
2011-11-29,5.05.1026.1111,mp3,Audio,Audio.mp3,1
5.调试技巧
因为这种方式比较抽象,所以你很难得到一个直观的调试过程。建议调试如下
#将hadoop中的数据文本copy出来一个,lzo需要解压缩,然后将map中的debug模式置为True,也就是不加hadoop中的lzo偏移量。
#用head输入hadoop里的文件,通过管道操作放入map/reduce中执行,看输出结果
$head explorer_20111129 | explorer_map.py | explorer_red.py
一天的数据大概几十个G,以前用awk和perl脚本跑需要至少半小时以上,改用map/reduce方式后,大概20几秒跑完,效率还是提高了很多的。

转载地址:http://ymlyx.baihongyu.com/

你可能感兴趣的文章
延伸产业链 中国产粮大省向“精深”问发展
查看>>
消费贷用户70%月收入低于5000元 80、90后是主要人群
查看>>
2018年内蒙古外贸首次突破1000亿元
查看>>
CTOR有助于BCH石墨烯技术更上一层楼
查看>>
被遗忘的CSS
查看>>
Webpack中的sourcemap以及如何在生产和开发环境中合理的设置sourcemap的类型
查看>>
做完小程序项目、老板给我加了6k薪资~
查看>>
java工程师linux命令,这篇文章就够了
查看>>
关于React生命周期的学习
查看>>
webpack雪碧图生成
查看>>
搭建智能合约开发环境Remix IDE及使用
查看>>
Spring Cloud构建微服务架构—服务消费基础
查看>>
RAC实践采坑指北
查看>>
runtime运行时 isa指针 SEL方法选择器 IMP函数指针 Method方法 runtime消息机制 runtime的使用...
查看>>
LeetCode36.有效的数独 JavaScript
查看>>
Scrapy基本用法
查看>>
PAT A1030 动态规划
查看>>
自制一个 elasticsearch-spring-boot-starter
查看>>
【人物志】美团前端通道主席洪磊:一位产品出身、爱焊电路板的工程师
查看>>
一份关于数据科学家应该具备的技能清单
查看>>