技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • mysql

    • MySQL 问题汇总
    • MySQL 索引介绍
    • MySQL 锁介绍
    • MySQL 索引优化工具 explain
    • MySQL 主从复制(GTID)
    • MySQL 8安装
    • MySQL 8.x新特性总结
    • MySQL UDF以及新类型JSON
    • MySQL 高可用MGR(一) 理论
    • MySQL 高可用MGR(二) 搭建
    • MySQL 高可用MGR(三) 测试
  • Elasticsearch

    • ES 7.8.0(一) 入门介绍
    • ES 7.8.0(二) 读、写和写索引流程以及文档分析过程
    • ES 7.8.0(三) 文档冲突
  • mongodb

    • mongodb
  • hadoop

    • Hadoop 伪分布式及集群
    • Hadoop 指令
    • Hadoop 读写流程详解
    • Hadoop SpringBoot集成
    • Hadoop MapReduce机制
    • Hadoop YARN
    • Hadoop MapReduce配置和编写job及数据倾斜的解决
    • Hadoop MapReduce自定义格式输入输出
      • 自定义格式输入组件
        • 定义RecordReader
        • 定义FileInputFormat
        • 在job中设置环境
      • 自定义格式输出组件
        • 定义RecordWriter
        • 定义FileOutputFormat
        • 在job中设置环境
      • 多输入源
      • 多输出源
        • Reduce
        • job
  • clickhouse

    • ClickHouse 介绍及安装
    • ClickHouse 数据类型
    • ClickHouse 表引擎
    • ClickHouse SQL操作
    • ClickHouse 副本配置
    • ClickHouse 分片与集群部署
    • ClickHouse Explain及建表优化
    • ClickHouse 语法优化规则
    • ClickHouse 查询优化
    • ClickHouse 数据一致性
    • ClickHouse 物化视图
    • ClickHouse MaterializeMySQL引擎
    • ClickHouse 监控及备份
  • hbase

    • Hbase 介绍及安装
    • Hbase 优化
    • Hbase phoenix安装及使用
    • Hbase LSM-TREE
  • hive

    • Hive 介绍及安装
    • Hive 内外部表、分区表、分桶表概念及hiveSQL命令
    • Hive 数据类型
    • Hive 函数 MySQL联合
    • Hive 数据倾斜和优化
    • Hive Sqoop安装及指令
  • flink

    • Flink 介绍及安装
    • Flink 配置介绍及Demo
    • Flink API讲解
    • Flink 运行架构
    • Flink 时间语义及Watermark
    • Flink 状态管理
    • Flink 容错,检查点,保存点
    • Flink 状态一致性
    • Flink Table API 和 Flink SQL
    • Flink CEP编程
    • Flink Joining编程
    • Flink CDC
  • flume

    • Flume 日志收集系统介绍及安装
    • Flume Source支持的类型
    • Flume Sink支持的类型
    • Flume Channel支持的类型
    • Flume Selector
    • Flume Interceptor拦截器类型
    • Flume Process
  • sqlite

    • SQLite介绍
目录

Hadoop MapReduce自定义格式输入输出

# 自定义格式输入组件

自定义输入组件会在 MapperTask 之前执行,并作为 MapperTask 的输入 key 和 value。

# 定义 RecordReader

package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;

/**
 * @author big uncle
 * @date 2021/5/14 16:49
 * @module
 **/
public class LineNumberInputRecordReader extends RecordReader<IntWritable, Text> {

    /**
     * 件切片
    **/
    private FileSplit fs;
    /**
     * 输入key
    **/
    private IntWritable key;
    /**
     * 输入value
    **/
    private Text value;
    /**
     * 行读取器
    **/
    private LineReader reader;
    /**
     * 记录行号
    **/
    private int count;


    /**
     * 初始化方法,用于初始化文件切片和行读取器
    **/
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        // 初始化文件切片
        fs = (FileSplit) inputSplit;
        // 通过切片获取切片路径
        Path path = fs.getPath();
        // 获取job的环境变量参数
        Configuration conf = taskAttemptContext.getConfiguration();
        // 获取HDFS文件系统对象
        FileSystem system = path.getFileSystem(conf);
        // 获取切片对应的文件数据的输入流
        InputStream in = system.open(path);
        // 初始化航都读取器
        reader = new LineReader(in);
    }

    /**
     * 此方法会被掉用多次,如果return true会继续被调用,直到return false
     * 一行一行读取数据,直到读完为止
    **/
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        key = new IntWritable();
        value = new Text();
        Text tmp = new Text();
        // 每调一次,会读取一行
        int length = reader.readLine(tmp);
        if(length==0){
            // 文件数据以读完
            return false;
        }
        count++;
        // 将行号复赋值给输入key
        key.set(count);
        // 将每行内容赋值给value
        value.set(tmp);
        return true;
    }

    /**
     * 此方法用于将输入key传给Mapper组件
     * nextKeyValue方法被调用一次,该方法也被调用一次
    **/
    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }
    /**
     * 此方法用于将输入value传给Mapper组件
     * nextKeyValue方法被调用一次,该方法也被调用一次
    **/
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        reader = null;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

# 定义 FileInputFormat

package com.example.demo.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 16:47
 * 自定义格式输入组件,决定Mapper的输入Key和输入Value类型
 * 第一个泛型是Mapper的输入Key
 * 第二个泛型是Mapper的输入Value
 **/
public class LineNumberInputFormat extends FileInputFormat<IntWritable, Text> {

    @Override
    public RecordReader<IntWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new LineNumberInputRecordReader();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 在 job 中设置环境

// 设置自定义格式化输入 默认是 TextInputFormat
job.setInputFormatClass(LineNumberInputFormat.class);
1
2

# 自定义格式输出组件

# 定义 RecordWriter

package com.example.demo.mr;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.checkerframework.checker.units.qual.K;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 17:52
 * @module
 **/
public class AuthRecodeWriter<K,V> extends RecordWriter<K,V> {

    private FSDataOutputStream outputStream;

    public AuthRecodeWriter(FSDataOutputStream outputStream) {
        this.outputStream = outputStream;
    }

    @Override
    public void write(K k, V v) throws IOException, InterruptedException {
        // 将输入key挟到文件里,如果只有mapper 则是mapper的输出key,
        // 既有 mapper和 reduce,则是reduce的输出key
        outputStream.write(k.toString().getBytes());
        // 输出k v分隔符,默认是Tab制表符
        outputStream.write("|".getBytes());

        outputStream.write(v.toString().getBytes());
        // 输出行于行的分隔符
        outputStream.write("@@@".getBytes());
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        outputStream.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 定义 FileOutputFormat

package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/14 17:48
 * @module
 **/
public class AuthOutputFormat<K,V>  extends FileOutputFormat<K,V> {

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Path path = super.getDefaultWorkFile(taskAttemptContext,"");
        Configuration conf = taskAttemptContext.getConfiguration();
        FileSystem system = path.getFileSystem(conf);
        // 获取输出流
        FSDataOutputStream outputStream = system.create(path);

        return new AuthRecodeWriter<K,V>(outputStream);
    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

读取多个文件,把文件结果输出到同一个结果文件,结果内容会以交替的形式存在。

# 在 job 中设置环境

// 设置自定义数组组件 默认是 TextOutFormat,kv分隔符默认tab制表符,行与行默认换行符
job.setOutputFormatClass(AuthOutputFormat.class);
1
2

# 多输入源

package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/11 11:13
 * @module
 **/
public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 获取job对象1
        Job job = Job.getInstance(configuration);
        // 设置job方法入口的驱动类
        job.setJarByClass(WordCountDriver.class);

        // 读取多个文件
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);

        // 设置Mapper输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        // 输出Mapper输出的value的类型
        job.setMapOutputValueClass(Flow.class);

        // 输出到指定目录,要求结果路径事先不存在
        FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
        // 提交job
        job.waitForCompletion(true);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 多输出源

# Reduce

package com.example.demo.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/15 10:44
 * @module
 **/
public class MultipleOutputReduce extends Reducer<Text,Text,Text,Text> {

    /**
     * 多输出源
    **/
    private MultipleOutputs<Text,Text> outputs;

    /**
     * 初始化方法
    **/
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outputs=new MultipleOutputs(context);
    }

    /**
     * 处理内容
    **/
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for(Text text:values){
            if("tom".equals(key)){
                // 输出到 tomFile 文件
                outputs.write("tomFile",key,text);
            }else if("rose".equals(key)){
                outputs.write("roseFile",key,text);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# job

package com.example.demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * @author big uncle
 * @date 2021/5/11 11:13
 * @module
 **/
public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 获取job对象1
        Job job = Job.getInstance(configuration);
        // 设置job方法入口的驱动类
        job.setJarByClass(WordCountDriver.class);

        // 读取多个文件
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/aa.txt"), TextInputFormat.class,WordCountMapper1.class);
        MultipleInputs.addInputPath(job,new Path("hdfs://node113:9000/test/bb.txt"), TextInputFormat.class,WordCountMapper2.class);

        // 设置Mapper输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        // 输出Mapper输出的value的类型
        job.setMapOutputValueClass(Flow.class);

        // 设置Reduce
        job.setReducerClass(MultipleOutputReduce.class);
        // 多输出源
        MultipleOutputs.addNamedOutput(job,"tomFile", TextOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"roseFile", TextOutputFormat.class,Text.class,Text.class);

        // 输出到指定目录,要求结果路径事先不存在
        FileOutputFormat.setOutputPath(job,new Path("hdfs://node113:9000/test1/result"));
        // 提交job
        job.waitForCompletion(true);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
上次更新: 6/11/2025, 4:10:30 PM
Hadoop MapReduce配置和编写job及数据倾斜的解决
ClickHouse 介绍及安装

← Hadoop MapReduce配置和编写job及数据倾斜的解决 ClickHouse 介绍及安装→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式