大数据之mapreduce小实战

news/2024/8/26 17:09:19 标签: 大数据, java, 开发工具

手写wordcount的程序

1、pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>

2、新建Mapper类

package com.hsiehchou.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 海量数据
*
* hello hsiehchou
* nihao
*
* 数据的输入与输出以Key value进行传输
* keyIN:LongWritable(Long) 数据的起始偏移量
* valuewIN:具体数据
*
* mapper需要把数据传递到reducer阶段(<hello,1>)
* keyOut:单词 Text
* valueOut:出现的次数IntWritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//对数据进行打散 ctrl+o
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据 hello nihao
String line = value.toString();
//2、对数据进行切分
String[] words = line.split(" ");
//3、写出以<hello,1>
for (String w:words){
//写出reducer端
context.write(new Text(w), new IntWritable(1));
}
}
}

3、新建Reducer类

package com.hsiehchou.wordcount;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer阶段接收的是Mapper输出的数据
* mapper的输出是reducer输入
*
* keyIn:mapper输出的key的类型
* valueIn:mapper输出的value的类型
*
* reducer端输出的数据类型,想要一个什么样的结果<hello,1888>
* keyOut:Text
* valueOut:IntWritalble
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//key-->单词 value-->次数
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、记录出现的次数
int sum = 0;
for (IntWritable v:values){
sum += v.get();
}
//2、l累加求和输出
context.write(key, new IntWritable(sum));
}
}

4、新建驱动类

package com.hsiehchou.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、指定jar包位置
job.setJarByClass(WordCountDriver.class);
//3、关联使用的Mapper类
job.setMapperClass(WordCountMapper.class);
//4、关联使用的Reducer类
job.setReducerClass(WordCountReducer.class);
//5、设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6、设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9、提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs ? 0:1);
}
}
运行结果
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
[root@hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
fd 1
fdgs 1
fdsbv 1
gd 1
hello 3

5、IDEA的相关使用

Ctrl+O导入相关未实现的方法
Maven中的Lifecycle的package可以直接打包成jar

案例分析

需求:运营商流量日志
10086
计算每个用户当前使用的总流量
思路?总流量 = 上行流量+下行流量
三个字段:手机号 上行流量 下行流量
技术选型:PB+
数据分析:海量数据(存储hdfs)
海量数据计算(分布式计算框架MapReduce)

实现

FlowBean类
java">
java">package com.hsiehchou.logs;
java">
java">import org.apache.hadoop.io.Writable;
java">
java">import java.io.DataInput;
java">
java">import java.io.DataOutput;
java">
java">import java.io.IOException;
java">
java">/**
java">
java"> * 封装数据类型需要怎么做
java">
java"> * hadoop数据类型实现了序列化接口
java">
java"> * 如果自定义需要实现这个序列化接口
java">
java"> */
java">
java">public class FlowBean implements Writable {
java">
java"> //定义属性:上行流量 下行流量 总流量总和
java">
java"> private long upFlow;
java">
java"> private long dfFlow;
java">
java"> private long flowsum;
java">
java">
java">
java"> public FlowBean(){}
java">
java"> public FlowBean(long upFlow, long dfFlow){
java">
java"> this.upFlow = upFlow;
java">
java"> this.dfFlow = dfFlow;
java">
java"> this.flowsum = upFlow + dfFlow;
java">
java"> }
java">
java"> public long getUpFlow(){
java">
java"> return upFlow;
java">
java"> }
java">
java">
java">
java"> public void setUpFlow(long upFlow){
java">
java"> this.upFlow = upFlow;
java">
java"> }
java">
java"> public long getDfFlow(){
java">
java"> return dfFlow;
java">
java"> }
java">
java"> public void setDfFlow(long dfFlow){
java">
java"> this.dfFlow = dfFlow;
java">
java"> }
java">
java"> public long getFlowsum(){
java">
java"> return flowsum;
java">
java"> }
java">
java"> public void setFlowsum(long flowsum){
java">
java"> this.flowsum = flowsum;
java">
java"> }
java">
java"> //序列化
java">
java"> public void write(DataOutput out) throws IOException {
java">
java"> out.writeLong(upFlow);
java">
java"> out.writeLong(dfFlow);
java">
java"> out.writeLong(flowsum);
java">
java"> }
java">
java"> //反序列化
java">
java"> public void readFields(DataInput in) throws IOException {
java">
java"> upFlow = in.readLong();
java">
java"> dfFlow = in.readLong();
java">
java"> flowsum = in.readLong();
java">
java"> }
java">
java"> @Override
java">
java"> public String toString() {
java">
java"> return upFlow + "\t" + dfFlow + "\t" + flowsum;
java">
java"> }
java">
java">}
java">
FlowCountMapper类
package com.hsiehchou.logs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* keyIN:
* valueIN:
*
* 思路:根据想要的结果的kv类型 手机号 流量总和(上行+下行)自定义类
* keyOut:
* valueOut:
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据源
String line = value.toString();
//2、切割 \t
String[] fields = line.split("\t");
//3、拿到关键字段
String phoneNr = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long dfFlow = Long.parseLong(fields[fields.length - 2]);
//4、写出到reducer
context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
}
}
FlowCountReducer类
package com.hsiehchou.logs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlow_sum = 0;
long dfFlow_sum = 0;
for (FlowBean v:values){
upFlow_sum += v.getUpFlow();
dfFlow_sum += v.getDfFlow();
}
FlowBean rsSum = new FlowBean(upFlow_sum, dfFlow_sum);
//输出结果
context.write(key, rsSum);
}
}
FlowCountDriver类
package com.hsiehchou.logs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowCountDriver.class);
//3.关联使用的Mapper
job.setMapperClass(FlowCountMapper.class);
//4.关联使用的Reducer类
job.setReducerClass(FlowCountReducer.class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//优化含有大量小文件的数据
//设置读取数据切片的类
job.setInputFormatClass(CombineTextInputFormat.class);
//最大切片大小8M
CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
//最小切片大小6M
CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9.提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
运行结果
[root@hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
[root@hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
[root@hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

小文件优化

如果企业中存在海量的小文件数据
TextInputFormat按照文件规划切片,文件不管多小都是一个单独的切片,启动mapt
ask任务去执行,这样会产生大量的maptask,浪费资源。
优化手段:
小文件合并大文件,如果不动这个小文件内容。

转载于:https://www.cnblogs.com/hsiehchou/p/10403452.html


http://www.niftyadmin.cn/n/706636.html

相关文章

TCP 相关参数解释

为什么80%的码农都做不了架构师&#xff1f;>>> tcp_syn_retries &#xff1a;INTEGER 默认值是5 对于一个新建连接&#xff0c;内核要发送多少个 SYN 连接请求才决定放弃。不应该大于255&#xff0c;默认值是5&#xff0c;对应于180秒左右时间。(对于大负载而物理…

目录文件管理

双tab查看所有命令 反斜杠\强制换行 ctrl加u删除到行位 ctrl加k删除行末 ctrl加L清屏 ctrl加y &#xff01;34调用34号命令前面加&#xff01;可以调用 man手册 >重定向输出加两个>>不覆盖原有文件&#xff0c;直接添加文件 echo回显的意思echo“123”输入到屏幕当…

系统学习Devops的30天

D1: 灰度更新 在有关微服务、DevOps、Cloud-native、系统部署等的讨论中&#xff0c;蓝绿部署、A/B 测试、灰度发布、滚动发布、红黑部署等概念经常被提到&#xff0c;它们有什么区别呢&#xff1f;通过搜索相关资料&#xff0c;做一个简单的辨析&#xff0c;如下&#xff1a;…

Linux 线程浅析

进程和线程的区别与联系 在许多经典的操作系统教科书中&#xff0c;总是把进程定义为程序的执行实例&#xff0c;它并不执行什么, 只是维护应用程序所需的各种资源&#xff0c;而线程则是真正的执行实体。 为了让进程完成一定的工作&#xff0c;进程必须至少包含一个线程。 进程…

windows下 安装 rabbitMQ 及操作常用命令

2019独角兽企业重金招聘Python工程师标准>>> rabbitMQ是一个在AMQP协议标准基础上完整的&#xff0c;可服用的企业消息系统。它遵循Mozilla Public License开源协议&#xff0c;采用 Erlang 实现的工业级的消息队列(MQ)服务器&#xff0c;Rabbit MQ 是建立在Erlang …

Yii安装时会出现的问题

今天打算学习一下Yii,但是在安装过程中出现了很多问题。通过composer安装&#xff1a;composer global require "fxp/composer-asset-plugin:~1.1.1" composer create-project --prefer-dist yiisoft/yii2-app-basic basic第一条命令安装 Composer asset plugin&…

BigDecimal的加减乘除

Java在java.math包中提供的API类BigDecimal&#xff0c;用来对超过16位有效位的数进行精确的运算。双精度浮点型变量double可以处理16位有效数。在实际应用中&#xff0c;需要对更大或者更小的数进行运算和处理。float和double只能用来做科学计算或者是工程计算&#xff0c;在商…

从“野路子”到“正规军”的软件开发之路

大家好&#xff0c;我是宝玉。我的专栏《软件工程之美》刚刚上线&#xff0c;很开心看到了很多同学对软件工程的理解和期待。 有同学说是从自学编程出身的&#xff0c;碰到过很多的问题&#xff0c;和很多人一样&#xff0c;我也是野路子出身的&#xff0c;2000年自学Asp编程&a…