需求描述
统计视频观看数Top10
计视频类别热度Top10
统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
统计视频观看数Top50所关联视频的所属类别Rank
计每个类别中的视频热度Top10,以Music为例
统计每个类别视频观看数Top10
统计上传视频最多的用户Top10以及他们上传的视频观看次数在前20的视频
数据结构
视频表
字段 | 备注 | 详细描述 |
---|---|---|
videoId | 视频唯一id(String) | 11位字符串 |
uploader | 视频上传者(String) | 上传视频的用户名String |
age | 视频年龄(int) | 视频在平台上的整数天 |
category | 视频类别(Array |
上传视频指定的视频分类 |
length | 视频长度(Int) | 整形数字标识的视频长度 |
views | 观看次数(Int) | 视频被浏览的次数 |
rate | 视频评分(Double) | 满分5分 |
Ratings | 流量(Int) | 视频的流量,整型数字 |
conments | 评论数(Int) | 一个视频的整数评论数 |
relatedId | 相关视频id(Array |
相关视频的id,最多20个 |
用户表
字段 | 备注 | 字段类型 |
---|---|---|
uploader | 上传者用户名 | string |
videos | 上传视频数 | int |
friends | 朋友数量 | int |
ETL
封装工具类
public class ETLUtil {
/**
* 数据清洗方法
*/
public static String etlData(String srcData){
StringBuffer resultData = new StringBuffer();
//1. 先将数据通过\t 切割
String[] datas = srcData.split("\t");
//2. 判断长度是否小于9
if(datas.length <9){
return null ;
}
//3. 将数据中的视频类别的空格去掉
datas[3]=datas[3].replaceAll(" ","");
//4. 将数据中的关联视频id通过&拼接
for (int i = 0; i < datas.length; i++) {
if(i < 9){
//4.1 没有关联视频的情况
if(i == datas.length-1){
resultData.append(datas[i]);
}else{
resultData.append(datas[i]).append("\t");
}
}else{
//4.2 有关联视频的情况
if(i == datas.length-1){
resultData.append(datas[i]);
}else{
resultData.append(datas[i]).append("&");
}
}
}
return resultData.toString();
}
}
Mapper
/**
* 清洗谷粒影音的原始数据
* 清洗规则
* 1. 将数据长度小于9的清洗掉
* 2. 将数据中的视频类别中间的空格去掉 People & Blogs
* 3. 将数据中的关联视频id通过&符号拼接
*/
public class EtlMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//清洗
String resultData = ETLUtil.etlData(line);
if(resultData != null) {
//写出
k.set(resultData);
context.write(k,NullWritable.get());
}
}
}
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EtlDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(EtlDriver.class);
job.setMapperClass(EtlMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
- 将ETL程序打包为etl.jar,并上传到Linux的/opt/module/hive/datas目录下
上传原始数据
cd /opt/module/hive/datas
hadoop fs -mkdir -p /gulivideo/video
hadoop fs -mkdir -p /gulivideo/user
hadoop fs -put gulivideo/user/user.txt /gulivideo/user
hadoop fs -put gulivideo/video/*.txt /gulivideo/video
ETL数据
hadoop jar etl.jar cn.lixuan.hive.etl.EtlDriver /gulivideo/video /gulivideo/video/output
准备表
需要准备的表
- 创建原始数据表
- gulivideo_ori,gulivideo_user_ori
- 创建最终表
- gulivideo_orc、gulivideo_user_orc
gulivideo_ori
create table gulivideo_ori(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
row format delimited fields terminated by "\t"
collection items terminated by "&"
stored as textfile;
gulivideo_user_ori
create table gulivideo_user_ori(
uploader string,
videos int,
friends int)
row format delimited
fields terminated by "\t"
stored as textfile;
gulivideo_orc
create table gulivideo_orc(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
stored as orc
tblproperties("orc.compress"="SNAPPY");
gulivideo_user_orc
create table gulivideo_user_orc(
uploader string,
videos int,
friends int)
row format delimited
fields terminated by "\t"
stored as orc
tblproperties("orc.compress"="SNAPPY");
向ori表插入数据
load data inpath "/gulivideo/video/output" into table gulivideo_ori;
load data inpath "/gulivideo/user" into table gulivideo_user_ori;
向orc表插入数据
insert into table gulivideo_orc select * from gulivideo_ori;
insert into table gulivideo_user_orc select * from gulivideo_user_ori;
安装Tez引擎
- Tez是一个Hive的运行引擎,性能优于MR
# 将tez安装包拷贝到集群,并解压tar包
mkdir /opt/module/tez
tar -zxvf /opt/software/tez-0.10.1-SNAPSHOT-minimal.tar.gz -C /opt/module/tez
# 上传tez依赖到HDFS
hadoop fs -mkdir /tez
hadoop fs -put /opt/software/tez-0.10.1-SNAPSHOT.tar.gz /tez
# 新建tez-site.xml
vim $HADOOP_HOME/etc/hadoop/tez-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>tez.lib.uris</name>
<value>${fs.defaultFS}/tez/tez-0.10.1-SNAPSHOT.tar.gz</value>
</property>
<property>
<name>tez.use.cluster.hadoop-libs</name>
<value>true</value>
</property>
<property>
<name>tez.am.resource.memory.mb</name>
<value>1024</value>
</property>
<property>
<name>tez.am.resource.cpu.vcores</name>
<value>1</value>
</property>
<property>
<name>tez.container.max.java.heap.fraction</name>
<value>0.4</value>
</property>
<property>
<name>tez.task.resource.memory.mb</name>
<value>1024</value>
</property>
<property>
<name>tez.task.resource.cpu.vcores</name>
<value>1</value>
</property>
</configuration>
# 修改Hadoop环境变量
vim $HADOOP_HOME/etc/hadoop/shellprofile.d/tez.sh
# 添加Tez的Jar包相关信息
hadoop_add_profile tez
function _tez_hadoop_classpath
{
hadoop_add_classpath "$HADOOP_HOME/etc/hadoop" after
hadoop_add_classpath "/opt/module/tez/*" after
hadoop_add_classpath "/opt/module/tez/lib/*" after
}
# 修改Hive的计算引擎
vim $HIVE_HOME/conf/hive-site.xml
<property>
<name>hive.execution.engine</name>
<value>tez</value>
</property>
<property>
<name>hive.tez.container.size</name>
<value>1024</value>
</property>
# 解决日志Jar包冲突
rm /opt/module/tez/lib/slf4j-log4j12-1.7.10.jar
业务分析
统计视频观看数Top10
SELECT
videoId,
views
FROM
gulivideo_orc
ORDER BY
views DESC
LIMIT 10;
统计视频类别热度Top10
SELECT
t1.category_name ,
COUNT(t1.videoId) hot
FROM
(
SELECT
videoId,
category_name
FROM
gulivideo_orc
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
) t1
GROUP BY
t1.category_name
ORDER BY
hot
DESC
LIMIT 10
统计视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
SELECT
t2.category_name,
COUNT(t2.videoId) video_sum
FROM
(
SELECT
t1.videoId,
category_name
FROM
(
SELECT
videoId,
views ,
category
FROM
gulivideo_orc
ORDER BY
views
DESC
LIMIT 20
) t1
lateral VIEW explode(t1.category) t1_tmp AS category_name
) t2
GROUP BY t2.category_name
统计视频观看数Top50所关联视频的所属类别排序
SELECT
t6.category_name,
t6.video_sum,
rank() over(ORDER BY t6.video_sum DESC ) rk
FROM
(
SELECT
t5.category_name,
COUNT(t5.relatedid_id) video_sum
FROM
(
SELECT
t4.relatedid_id,
category_name
FROM
(
SELECT
t2.relatedid_id ,
t3.category
FROM
(
SELECT
relatedid_id
FROM
(
SELECT
videoId,
views,
relatedid
FROM
gulivideo_orc
ORDER BY
views
DESC
LIMIT 50
)t1
lateral VIEW explode(t1.relatedid) t1_tmp AS relatedid_id
)t2
JOIN
gulivideo_orc t3
ON
t2.relatedid_id = t3.videoId
) t4
lateral VIEW explode(t4.category) t4_tmp AS category_name
) t5
GROUP BY
t5.category_name
ORDER BY
video_sum
DESC
) t6
统计每个类别中的视频热度Top10,以Music为例
SELECT
t1.videoId,
t1.views,
t1.category_name
FROM
(
SELECT
videoId,
views,
category_name
FROM gulivideo_orc
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
)t1
WHERE
t1.category_name = "Music"
ORDER BY
t1.views
DESC
LIMIT 10
统计每个类别视频观看数Top10
SELECT
t2.videoId,
t2.views,
t2.category_name,
t2.rk
FROM
(
SELECT
t1.videoId,
t1.views,
t1.category_name,
rank() over(PARTITION BY t1.category_name ORDER BY t1.views DESC ) rk
FROM
(
SELECT
videoId,
views,
category_name
FROM gulivideo_orc
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
)t1
)t2
WHERE t2.rk <=10
统计上传视频最多的用户Top10以及他们上传的视频观看次数在前20的视频
SELECT
t2.videoId,
t2.views,
t2.uploader
FROM
(
SELECT
uploader,
videos
FROM gulivideo_user_orc
ORDER BY
videos
DESC
LIMIT 10
) t1
JOIN gulivideo_orc t2
ON t1.uploader = t2.uploader
ORDER BY
t2.views
DESC
LIMIT 20
常见错误及解决方案
更换Tez引擎后,执行任务卡住,可以尝试调节容量调度器的资源调度策略
将$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml文件中的
<property> <name>yarn.scheduler.capacity.maximum-am-resource-percent</name> <value>0.1</value> <description> Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. </description> </property>
改成
<property> <name>yarn.scheduler.capacity.maximum-am-resource-percent</name> <value>1</value> <description> Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. </description> </property>
hive默认的输入格式处理是CombineHiveInputFormat,会对小文件进行合并
set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
# 可以采用HiveInputFormat就会根据分区数输出相应的文件
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
JVM堆内存溢出
java.lang.OutOfMemoryError: Java heap space
在yarn-site.xml中加入如下代码
<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx1024m</value> </property>
虚拟内存限制
yarn-site.xml中添加
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>