Parquet文件
Parquet 文件概述
Parquet 是一种开源的 列式存储文件格式,专为大数据处理场景设计。它通过高效的编码和压缩技术,优化了数据存储和查询性能,尤其适合 OLAP(联机分析处理)类任务。
核心优势
- 列式存储
- 数据按列而非行存储,查询时仅需读取相关列,大幅减少 I/O 和内存开销。
- 示例:若查询仅需
user_id
和timestamp
,Parquet 可跳过其他列数据。
- 高压缩率
- 原因:同一列数据类型一致,重复值多(如枚举字段),支持高效的编码和压缩算法(如字典编码、RLE、Snappy、ZSTD)。
- 效果:压缩率通常比 CSV/JSON 高 2~10 倍,存储成本显著降低。
- 高效查询性能
- 列式存储结合元数据(如 Min/Max、统计信息),支持谓词下推(Predicate Pushdown),提前过滤无关数据块。
- 兼容性与扩展性
- 支持复杂嵌套数据结构(通过 Dremel 算法),兼容 Avro、Thrift 等数据模型。
- 跨平台(Hadoop、Spark、云服务)通用,生态工具丰富。
压缩率高的原因
- 列局部性:同列数据值相似,利于压缩算法发现重复模式。
- 智能编码:
- 字典编码:将重复值映射为短整数(如
"Male"=1, "Female"=2
)。 - 位打包:对整型数据紧凑存储。
- Delta 编码:存储相邻值的差值(适用于时间序列)。
- 可选压缩算法:支持 Snappy(速度优先)、GZIP(压缩率优先)、ZSTD(平衡型)等。
典型应用场景
- 大数据分析
- 场景:数据仓库(如 Hive)、交互式查询(如 Presto)、ETL 处理(如 Spark)。
- 案例:分析十亿级日志数据,仅扫描
error_code
列快速定位问题。
- 云数据湖
- 结合 Amazon S3、Azure Data Lake,用于低成本存储海量数据。
- 机器学习特征存储
- 高效读取部分特征列,加速模型训练。
常用集成工具与平台
类别 | 工具/平台 |
---|---|
计算引擎 | Apache Spark, Apache Hive, Presto, Impala, Flink |
云服务 | AWS Athena/Glue, Google BigQuery, Azure Synapse, Databricks |
文件系统 | HDFS, Amazon S3, Google Cloud Storage |
序列化框架 | Apache Avro, Apache Thrift(定义数据结构) |
查询优化 | Apache Arrow(内存加速), Parquet-MR(Hadoop 读写库) |
Python处理Parquet
以下是使用 Python 操作 Parquet 文件的完整指南,包含创建、读取和切割的代码示例:
一、环境准备
# 安装必要库
pip install pandas pyarrow fastparquet dask
二、创建 Parquet 文件
方法 1: 使用 Pandas
import pandas as pd
import numpy as np
# 创建示例 DataFramedata = {
'id': range(1, 6), 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 'age': [25, 30, 35, 40, 45], 'timestamp': pd.date_range('2023-01-01', periods=5)}
df = pd.DataFrame(data)
# 写入 Parquet 文件(默认使用 snappy 压缩)
df.to_parquet('data.parquet', engine='pyarrow') # 或 engine='fastparquet'
方法 2: 使用 PyArrow
import pyarrow as pa
import pyarrow.parquet as pq
# 创建 PyArrow Tabletable = pa.Table.from_pandas(df)
# 写入文件(指定压缩算法)
pq.write_table(table, 'data.parquet', compression='ZSTD')
三、读取 Parquet 文件
基本读取
# 读取整个文件
df = pd.read_parquet('data.parquet', engine='pyarrow')
# 只读取特定列
df = pd.read_parquet('data.parquet', columns=['id', 'name'])
分块读取(大型文件)
# 使用 PyArrow 分块读取
parquet_file = pq.ParquetFile('large_data.parquet')
for batch in parquet_file.iter_batches(batch_size=1000):
chunk_df = batch.to_pandas() # 处理每个分块...
四、切割 Parquet 文件
场景 1: 按行数切割
# 将 DataFrame 分为两个文件
df1 = df.iloc[:3] # 前3行
df2 = df.iloc[3:] # 剩余行
df1.to_parquet('split_part1.parquet')
df2.to_parquet('split_part2.parquet')
场景 2: 按条件筛选
# 按年龄筛选
young_df = df[df['age'] < 35]
old_df = df[df['age'] >= 35]
# 写入不同文件
young_df.to_parquet('young_data.parquet')
old_df.to_parquet('old_data.parquet')
场景 3: 按列值分割(高效方法)
# 使用 PyArrow 直接操作(避免加载全量数据)
table = pq.read_table('data.parquet', columns=['id', 'age'])
# 过滤年龄大于30的数据
filtered_table = table.filter(pa.compute.greater(table['age'], 30))
pq.write_table(filtered_table, 'filtered_data.parquet')
五、处理大型文件(Dask 示例)
import dask.dataframe as dd
# 分块读取大型 Parquetddf = dd.read_parquet('large_data.parquet', engine='pyarrow')
# 并行筛选和写入
filtered_ddf = ddf[ddf['age'] > 30]
filtered_ddf.to_parquet('output_dir/', partition_on='age') # 按年龄分区存储
六、关键参数说明
参数 | 作用 | 常用值 |
---|---|---|
engine |
指定读写引擎 | 'pyarrow' , 'fastparquet' |
compression |
压缩算法 | 'snappy' , 'gzip' , 'zstd' |
columns |
指定读取列 | ['col1', 'col2'] |
partition_on |
按列分区存储(目录结构) | 'date' , 'category' |
七、注意事项
- 引擎选择:
pyarrow
:性能更好,支持最新 Parquet 格式fastparquet
:兼容性更佳,适合旧系统
- 压缩算法:
snappy
:快速压缩/解压(默认推荐)gzip
:高压缩率,速度较慢zstd
:平衡型(需 PyArrow>=2.0)
- 内存管理:
- 超过 1GB 的数据建议使用 Dask 分块处理
- 使用
pd.read_parquet(..., filters=...)
提前过滤数据
以上代码可直接在 Jupyter Notebook 或 Python 脚本中运行,根据数据规模选择合适的方法。
Java处理Parquet
以下是使用 Java 操作 Parquet 文件的完整代码示例,包含创建、读取和切割操作:
一、环境依赖 (Maven)
<dependencies>
<!-- Parquet Core --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.13.1</version> </dependency> <!-- Avro 数据模型 --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.3</version> </dependency> <!-- Hadoop Common(文件操作需要) --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.6</version> </dependency></dependencies>
二、创建 Parquet 文件
步骤 1:定义 Avro Schema
// 创建 schema.avsc 文件
{
"type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": "int"}, {"name": "timestamp", "type": "long"} ]}
步骤 2:生成 Avro 类
java -jar avro-tools-1.11.3.jar compile schema schema.avsc .```
#### 步骤 3:写入 Parquet 文件
```java
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetWriterExample {
public static void main(String[] args) throws IOException { Path path = new Path("users.parquet"); Schema schema = new Schema.Parser().parse(new File("schema.avsc")); try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(path) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .build()) { // 写入示例数据
for (int i = 1; i <= 5; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("name", "User" + i); record.put("age", 20 + i); record.put("timestamp", System.currentTimeMillis()); writer.write(record); } } }}
三、读取 Parquet 文件
基本读取
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
public class ParquetReaderExample {
public static void main(String[] args) throws IOException { Path path = new Path("users.parquet"); try (ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path) .build()) { GenericRecord record;
while ((record = reader.read()) != null) { System.out.println("Read record: " + record); } } }}
四、切割 Parquet 文件
场景 1:按条件过滤切割
public class ParquetSplitter {
public static void splitByAge(Path inputPath, Path outputPath1, Path outputPath2, int ageThreshold) throws IOException {
// 创建两个写入器
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build(); ParquetWriter<GenericRecord> writer1 = createWriter(outputPath1); ParquetWriter<GenericRecord> writer2 = createWriter(outputPath2)) { GenericRecord record;
while ((record = reader.read()) != null) { int age = (Integer) record.get("age"); if (age < ageThreshold) { writer1.write(record); } else { writer2.write(record); } } } }
private static ParquetWriter<GenericRecord> createWriter(Path path) throws IOException { return AvroParquetWriter.<GenericRecord>builder(path) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); }
public static void main(String[] args) throws IOException { splitByAge( new Path("users.parquet"), new Path("young_users.parquet"), new Path("old_users.parquet"), 25 ); }}
场景 2:按行数分块切割
public class ParquetRowSplitter {
public static void splitByRowCount(Path inputPath, String outputPrefix, int chunkSize) throws IOException {
int fileCounter = 0;
int recordCounter = 0; ParquetWriter<GenericRecord> currentWriter = null; try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build()) {
GenericRecord record;
while ((record = reader.read()) != null) { if (recordCounter % chunkSize == 0) { if (currentWriter != null) currentWriter.close(); currentWriter = createWriter(new Path(outputPrefix + fileCounter + ".parquet")); fileCounter++; } currentWriter.write(record); recordCounter++; } } finally { if (currentWriter != null) currentWriter.close(); } }
// main 方法省略...
}
五、高级操作
1. 使用投影读取部分列
// 只读取 id 和 name 列
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path) .withProjection(Schema.createRecord(Arrays.asList( new Schema.Field("id", Schema.create(Schema.Type.INT), "", null), new Schema.Field("name", Schema.create(Schema.Type.STRING), "", null) ))) .build();
2. 使用谓词下推过滤
// 使用 Filter API 提前过滤
FilterPredicate filter = FilterApi.gt(FilterApi.intColumn("age"), 25);
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(path) .withFilter(filter) .build();
六、性能优化建议
- 批量写入:使用
ParquetWriter
的自动缓冲机制(默认行组大小 128MB) - 压缩选择:
.withCompressionCodec(CompressionCodecName.ZSTD) // 更高压缩率
- 列式处理:优先使用投影减少 I/O
- 内存管理:处理大文件时使用分块读取(示例中的切割代码)
七、常见问题解决
- Schema 不匹配:确保读写使用相同的 Avro Schema
- Hadoop 依赖:若不在 Hadoop 环境运行,需添加
hadoop-common
依赖 - 版本冲突:保持 Parquet/Avro/Hadoop 版本兼容性
以上代码示例展示了 Java 中 Parquet 文件的核心操作流程,可直接用于大数据处理流水线的开发。对于超大规模数据,建议结合 Apache Spark 进行分布式处理。
总结
Parquet 凭借 列式存储、高压缩率、高性能查询,成为大数据生态的核心存储格式,尤其适合需要低成本存储、快速分析海量数据的场景。与 Spark、Presto 等工具深度集成,是构建数据湖和现代数仓的首选格式。