Hadoop HDFS 与 MapReduce 入门实践
Hadoop 生态系统基础与实操:HDFS 与 MapReduce 入门实践
引言:从“大数据”到 Hadoop
随着数据爆炸式增长,传统处理工具在 TB 甚至 PB 级数据面前显得力不从心。Hadoop 正是为解决这类问题而生。它提供了一种分布式存储和计算方案,让普通机器集群也能高效地存储和处理海量数据。
本教程将带你快速入门 Hadoop 生态中最核心的两个组件——HDFS(Hadoop 分布式文件系统)和 MapReduce(分布式计算框架)。你将了解它们的原理,并通过实操掌握基本命令和编程模型。
第一部分:HDFS——Hadoop 分布式文件系统
1.1 什么是 HDFS?
HDFS 的设计理念是“一次写入、多次读取”,适合大文件存取和流式数据访问。它把文件切分成多个数据块(默认 128 MB),并将这些块分散存储在多个节点上,同时为每个块创建多个副本(默认为 3 份),从而保证高容错性。
1.2 HDFS 核心架构
HDFS 采用主从(Master/Slave)架构,两个关键角色:
-
NameNode(主节点)
管理文件系统的命名空间、维护文件与块的映射关系、处理客户端请求。它不存储实际数据,所有元数据存储在内存中,因此对内存要求高。 -
DataNode(从节点)
实际存储数据块,并定期向 NameNode 发送心跳(默认 3 秒)和块报告,证明自己正常运行。
1.3 HDFS 基本操作实操
假设你已安装 Hadoop 环境(若未安装可使用 Docker 或伪分布式模式)。以下命令在终端执行。
1. 创建目录
hdfs dfs -mkdir /user/hadoop/input
该命令在 HDFS 中创建 /user/hadoop/input 目录。
2. 上传文件
hdfs dfs -put localfile.txt /user/hadoop/input/
将本地 localfile.txt 上传到 HDFS 指定目录。
3. 查看文件列表
hdfs dfs -ls /user/hadoop/input
4. 查看文件内容
hdfs dfs -cat /user/hadoop/input/localfile.txt
5. 下载文件到本地
hdfs dfs -get /user/hadoop/input/localfile.txt ./
6. 删除文件或目录
hdfs dfs -rm /user/hadoop/input/localfile.txt # 删除文件
hdfs dfs -rm -r /user/hadoop/input # 递归删除目录
7. 查看文件块信息
hdfs fsck /user/hadoop/input/localfile.txt -files -blocks
该命令可查看文件被分成多少块以及每个块所在节点位置。
第二部分:MapReduce——分布式计算引擎
2.1 MapReduce 核心思想
MapReduce 将计算分为两个阶段:
-
Map(映射)阶段
处理原始数据,将输入键值对(K1, V1)转换为中间键值对(K2, V2)。 -
Reduce(归约)阶段
对中间结果按键分组,聚合同一键的所有值,输出最终结果(K3, V3)。
中间过程还包括 Shuffle & Sort,负责将 Map 输出的数据按分区传输到对应的 Reduce 端,并进行排序和分组。
2.2 经典案例:单词统计(WordCount)
以一个经典例子理解 MapReduce:统计文本文件中每个单词出现的次数。
输入
Hello Hadoop
Hello World
期望输出
Hadoop 1
Hello 2
World 1
Map 函数逻辑 对每一行文本,将其拆分成单词,并输出每个单词和一个计数 1。
Map(line):
for word in line.split():
emit(word, 1)
Reduce 函数逻辑 对于每个单词,对其所有值求和,得到总次数。
Reduce(word, values):
sum = 0
for v in values:
sum += v
emit(word, sum)
2.3 WordCount 代码实现(Java 版)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// Mapper 类
public static class TokenizerMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
word.set(token);
context.write(word, one);
}
}
}
// Reducer 类
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 可选,map 局部聚合
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译与运行(假设 jar 包名为 wordcount.jar):
hadoop jar wordcount.jar WordCount /user/hadoop/input /user/hadoop/output
执行成功后,查看结果:
hdfs dfs -cat /user/hadoop/output/part-r-00000
2.4 MapReduce 运行流程总结
- 提交作业:客户端向 YARN 提交 MapReduce 作业。
- 启动 AM:ResourceManager 分配一个 Container 运行 ApplicationMaster。
- 划分任务:AM 根据输入分片(InputSplit)创建若干个 Map Task 和 Reduce Task。
- Map 阶段:每个 Map Task 处理一个分片,输出中间结果到本地磁盘。
- Shuffle:Reduce Task 从各 Map 端拉取属于自己的分区数据,合并排序。
- Reduce 阶段:执行归约逻辑,生成最终输出文件。
- 结束:任务完成,资源释放。
第三部分:进阶概念与调优建议
3.1 HDFS 高可用与联邦
- 高可用(HA):通过部署一对 Active/Standby NameNode,配合 JournalNode 实现元数据同步,避免单点故障。
- 联邦(Federation):多个 NameNode 分管不同目录块池,扩展命名空间和提高横向扩展能力。
3.2 MapReduce 调优点
- 合并小文件:使用
CombineFileInputFormat或将小文件预先合并上传到 HDFS。 - 适当增加复制因子:可通过
dfs.replication调整副本数以提高数据本地性。 - 设置压缩:Map 输出压缩使用
mapreduce.map.output.compress=true,减少 Shuffle 数据量。 - 调整内存:
mapreduce.map.memory.mb和mapreduce.reduce.memory.mb避免内存溢出。 - 使用 Combiner:在 Map 端做局部聚合,降低网络传输压力。
第四部分:常见问题与排错
4.1 HDFS 无法上传文件,提示 SafeMode
NameNode 启动后会进入安全模式,此时只读不可写。通常等待数据块报告达到比例后自动离开,或手动退出:
hdfs dfsadmin -safemode leave
4.2 MapReduce 作业长时间卡在 MAP 100% 不进入 Reduce
可能原因:
- Reduce 个数设置过大或过小。
- 某个 Map 输出数据分布不均(数据倾斜)。
- 网络拥塞导致 Shuffle 缓慢。 可检查日志或使用 Web UI(默认 http://: 8088)查看任务详情。
4.3 DataNode 频繁掉线
检查 DataNode 与 NameNode 的网络连接,确认主机名解析正常,同时观察 DataNode 的 CPU 和磁盘 IO 是否饱和。日志通常位于 $HADOOP_HOME/logs 下。
总结
通过本教程,你掌握了:
- HDFS 的基本架构与常用命令行操作。
- MapReduce 编程模型与一个可运行的 Java 示例。
- 运行流程与常见优化手段。
Hadoop 生态远不止这些,后续可以进一步探索 YARN 资源调度、Hive 数据仓库、Spark 内存计算等组件。但 HDFS 与 MapReduce 是理解整个大数据体系的根基,动手实践是掌握它们的最佳路径。
下一篇教程预告:Hive 数据仓库入门与 HQL 实战。