Hadoop HDFS 与 MapReduce 入门实践

FreeGuideOnline 最新 2026-06-12

Hadoop 生态系统基础与实操:HDFS 与 MapReduce 入门实践

引言:从“大数据”到 Hadoop

随着数据爆炸式增长,传统处理工具在 TB 甚至 PB 级数据面前显得力不从心。Hadoop 正是为解决这类问题而生。它提供了一种分布式存储和计算方案,让普通机器集群也能高效地存储和处理海量数据。

本教程将带你快速入门 Hadoop 生态中最核心的两个组件——HDFS(Hadoop 分布式文件系统)和 MapReduce(分布式计算框架)。你将了解它们的原理,并通过实操掌握基本命令和编程模型。

第一部分:HDFS——Hadoop 分布式文件系统

1.1 什么是 HDFS?

HDFS 的设计理念是“一次写入、多次读取”,适合大文件存取和流式数据访问。它把文件切分成多个数据块(默认 128 MB),并将这些块分散存储在多个节点上,同时为每个块创建多个副本(默认为 3 份),从而保证高容错性。

1.2 HDFS 核心架构

HDFS 采用主从(Master/Slave)架构,两个关键角色:

  • NameNode(主节点)
    管理文件系统的命名空间、维护文件与块的映射关系、处理客户端请求。它不存储实际数据,所有元数据存储在内存中,因此对内存要求高。

  • DataNode(从节点)
    实际存储数据块,并定期向 NameNode 发送心跳(默认 3 秒)和块报告,证明自己正常运行。

1.3 HDFS 基本操作实操

假设你已安装 Hadoop 环境(若未安装可使用 Docker 或伪分布式模式)。以下命令在终端执行。

1. 创建目录

hdfs dfs -mkdir /user/hadoop/input

该命令在 HDFS 中创建 /user/hadoop/input 目录。

2. 上传文件

hdfs dfs -put localfile.txt /user/hadoop/input/

将本地 localfile.txt 上传到 HDFS 指定目录。

3. 查看文件列表

hdfs dfs -ls /user/hadoop/input

4. 查看文件内容

hdfs dfs -cat /user/hadoop/input/localfile.txt

5. 下载文件到本地

hdfs dfs -get /user/hadoop/input/localfile.txt ./

6. 删除文件或目录

hdfs dfs -rm /user/hadoop/input/localfile.txt        # 删除文件
hdfs dfs -rm -r /user/hadoop/input                   # 递归删除目录

7. 查看文件块信息

hdfs fsck /user/hadoop/input/localfile.txt -files -blocks

该命令可查看文件被分成多少块以及每个块所在节点位置。

第二部分:MapReduce——分布式计算引擎

2.1 MapReduce 核心思想

MapReduce 将计算分为两个阶段:

  • Map(映射)阶段
    处理原始数据,将输入键值对 (K1, V1) 转换为中间键值对 (K2, V2)

  • Reduce(归约)阶段
    对中间结果按键分组,聚合同一键的所有值,输出最终结果 (K3, V3)

中间过程还包括 Shuffle & Sort,负责将 Map 输出的数据按分区传输到对应的 Reduce 端,并进行排序和分组。

2.2 经典案例:单词统计(WordCount)

以一个经典例子理解 MapReduce:统计文本文件中每个单词出现的次数。

输入

Hello Hadoop
Hello World

期望输出

Hadoop  1
Hello   2
World   1

Map 函数逻辑 对每一行文本,将其拆分成单词,并输出每个单词和一个计数 1。

Map(line):
  for word in line.split():
    emit(word, 1)

Reduce 函数逻辑 对于每个单词,对其所有值求和,得到总次数。

Reduce(word, values):
  sum = 0
  for v in values:
    sum += v
  emit(word, sum)

2.3 WordCount 代码实现(Java 版)

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    // Mapper 类
    public static class TokenizerMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\s+");
            for (String token : tokens) {
                word.set(token);
                context.write(word, one);
            }
        }
    }

    // Reducer 类
    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);   // 可选,map 局部聚合
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

编译与运行(假设 jar 包名为 wordcount.jar):

hadoop jar wordcount.jar WordCount /user/hadoop/input /user/hadoop/output

执行成功后,查看结果:

hdfs dfs -cat /user/hadoop/output/part-r-00000

2.4 MapReduce 运行流程总结

  1. 提交作业:客户端向 YARN 提交 MapReduce 作业。
  2. 启动 AM:ResourceManager 分配一个 Container 运行 ApplicationMaster。
  3. 划分任务:AM 根据输入分片(InputSplit)创建若干个 Map Task 和 Reduce Task。
  4. Map 阶段:每个 Map Task 处理一个分片,输出中间结果到本地磁盘。
  5. Shuffle:Reduce Task 从各 Map 端拉取属于自己的分区数据,合并排序。
  6. Reduce 阶段:执行归约逻辑,生成最终输出文件。
  7. 结束:任务完成,资源释放。

第三部分:进阶概念与调优建议

3.1 HDFS 高可用与联邦

  • 高可用(HA):通过部署一对 Active/Standby NameNode,配合 JournalNode 实现元数据同步,避免单点故障。
  • 联邦(Federation):多个 NameNode 分管不同目录块池,扩展命名空间和提高横向扩展能力。

3.2 MapReduce 调优点

  • 合并小文件:使用 CombineFileInputFormat 或将小文件预先合并上传到 HDFS。
  • 适当增加复制因子:可通过 dfs.replication 调整副本数以提高数据本地性。
  • 设置压缩:Map 输出压缩使用 mapreduce.map.output.compress=true,减少 Shuffle 数据量。
  • 调整内存mapreduce.map.memory.mbmapreduce.reduce.memory.mb 避免内存溢出。
  • 使用 Combiner:在 Map 端做局部聚合,降低网络传输压力。

第四部分:常见问题与排错

4.1 HDFS 无法上传文件,提示 SafeMode

NameNode 启动后会进入安全模式,此时只读不可写。通常等待数据块报告达到比例后自动离开,或手动退出:

hdfs dfsadmin -safemode leave

4.2 MapReduce 作业长时间卡在 MAP 100% 不进入 Reduce

可能原因:

  • Reduce 个数设置过大或过小。
  • 某个 Map 输出数据分布不均(数据倾斜)。
  • 网络拥塞导致 Shuffle 缓慢。 可检查日志或使用 Web UI(默认 http://: 8088)查看任务详情。

4.3 DataNode 频繁掉线

检查 DataNode 与 NameNode 的网络连接,确认主机名解析正常,同时观察 DataNode 的 CPU 和磁盘 IO 是否饱和。日志通常位于 $HADOOP_HOME/logs 下。

总结

通过本教程,你掌握了:

  • HDFS 的基本架构与常用命令行操作。
  • MapReduce 编程模型与一个可运行的 Java 示例。
  • 运行流程与常见优化手段。

Hadoop 生态远不止这些,后续可以进一步探索 YARN 资源调度、Hive 数据仓库、Spark 内存计算等组件。但 HDFS 与 MapReduce 是理解整个大数据体系的根基,动手实践是掌握它们的最佳路径。


下一篇教程预告:Hive 数据仓库入门与 HQL 实战。