LarryDpk
发布于 2025-04-12 / 21 阅读
0

Parquet文件

Parquet文件

Parquet 文件概述

Parquet 是一种开源的 列式存储文件格式,专为大数据处理场景设计。它通过高效的编码和压缩技术,优化了数据存储和查询性能,尤其适合 OLAP(联机分析处理)类任务。


核心优势

  1. 列式存储
  • 数据按列而非行存储,查询时仅需读取相关列,大幅减少 I/O 和内存开销。
  • 示例:若查询仅需 user_idtimestamp,Parquet 可跳过其他列数据。
  1. 高压缩率
  • 原因:同一列数据类型一致,重复值多(如枚举字段),支持高效的编码和压缩算法(如字典编码、RLE、Snappy、ZSTD)。
  • 效果:压缩率通常比 CSV/JSON 高 2~10 倍,存储成本显著降低。
  1. 高效查询性能
  • 列式存储结合元数据(如 Min/Max、统计信息),支持谓词下推(Predicate Pushdown),提前过滤无关数据块。
  1. 兼容性与扩展性
  • 支持复杂嵌套数据结构(通过 Dremel 算法),兼容 Avro、Thrift 等数据模型。
  • 跨平台(Hadoop、Spark、云服务)通用,生态工具丰富。

压缩率高的原因

  1. 列局部性:同列数据值相似,利于压缩算法发现重复模式。
  2. 智能编码
  • 字典编码:将重复值映射为短整数(如 "Male"=1, "Female"=2)。
  • 位打包:对整型数据紧凑存储。
  • Delta 编码:存储相邻值的差值(适用于时间序列)。
  1. 可选压缩算法:支持 Snappy(速度优先)、GZIP(压缩率优先)、ZSTD(平衡型)等。

典型应用场景

  1. 大数据分析
  • 场景:数据仓库(如 Hive)、交互式查询(如 Presto)、ETL 处理(如 Spark)。
  • 案例:分析十亿级日志数据,仅扫描 error_code 列快速定位问题。
  1. 云数据湖
  • 结合 Amazon S3、Azure Data Lake,用于低成本存储海量数据。
  1. 机器学习特征存储
  • 高效读取部分特征列,加速模型训练。

常用集成工具与平台

类别 工具/平台
计算引擎 Apache Spark, Apache Hive, Presto, Impala, Flink
云服务 AWS Athena/Glue, Google BigQuery, Azure Synapse, Databricks
文件系统 HDFS, Amazon S3, Google Cloud Storage
序列化框架 Apache Avro, Apache Thrift(定义数据结构)
查询优化 Apache Arrow(内存加速), Parquet-MR(Hadoop 读写库)

Python处理Parquet

以下是使用 Python 操作 Parquet 文件的完整指南,包含创建、读取和切割的代码示例:


一、环境准备

# 安装必要库  
pip install pandas pyarrow fastparquet dask  

二、创建 Parquet 文件

方法 1: 使用 Pandas

import pandas as pd  
import numpy as np  
  
# 创建示例 DataFramedata = {  
 'id': range(1, 6), 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 'age': [25, 30, 35, 40, 45], 'timestamp': pd.date_range('2023-01-01', periods=5)}  
df = pd.DataFrame(data)  
  
# 写入 Parquet 文件(默认使用 snappy 压缩)  
df.to_parquet('data.parquet', engine='pyarrow')  # 或 engine='fastparquet'  

方法 2: 使用 PyArrow

import pyarrow as pa  
import pyarrow.parquet as pq  
  
# 创建 PyArrow Tabletable = pa.Table.from_pandas(df)  
  
# 写入文件(指定压缩算法)  
pq.write_table(table, 'data.parquet', compression='ZSTD')  

三、读取 Parquet 文件

基本读取

# 读取整个文件  
df = pd.read_parquet('data.parquet', engine='pyarrow')  
  
# 只读取特定列  
df = pd.read_parquet('data.parquet', columns=['id', 'name'])  

分块读取(大型文件)

# 使用 PyArrow 分块读取  
parquet_file = pq.ParquetFile('large_data.parquet')  
  
for batch in parquet_file.iter_batches(batch_size=1000):  
 chunk_df = batch.to_pandas() # 处理每个分块...  

四、切割 Parquet 文件

场景 1: 按行数切割

# 将 DataFrame 分为两个文件  
df1 = df.iloc[:3]  # 前3行  
df2 = df.iloc[3:]  # 剩余行  
  
df1.to_parquet('split_part1.parquet')  
df2.to_parquet('split_part2.parquet')  

场景 2: 按条件筛选

# 按年龄筛选  
young_df = df[df['age'] < 35]  
old_df = df[df['age'] >= 35]  
  
# 写入不同文件  
young_df.to_parquet('young_data.parquet')  
old_df.to_parquet('old_data.parquet')  

场景 3: 按列值分割(高效方法)

# 使用 PyArrow 直接操作(避免加载全量数据)  
table = pq.read_table('data.parquet', columns=['id', 'age'])  
  
# 过滤年龄大于30的数据  
filtered_table = table.filter(pa.compute.greater(table['age'], 30))  
pq.write_table(filtered_table, 'filtered_data.parquet')  

五、处理大型文件(Dask 示例)

import dask.dataframe as dd  
  
# 分块读取大型 Parquetddf = dd.read_parquet('large_data.parquet', engine='pyarrow')  
  
# 并行筛选和写入  
filtered_ddf = ddf[ddf['age'] > 30]  
filtered_ddf.to_parquet('output_dir/', partition_on='age')  # 按年龄分区存储  

六、关键参数说明

参数 作用 常用值
engine 指定读写引擎 'pyarrow', 'fastparquet'
compression 压缩算法 'snappy', 'gzip', 'zstd'
columns 指定读取列 ['col1', 'col2']
partition_on 按列分区存储(目录结构) 'date', 'category'

七、注意事项

  1. 引擎选择
  • pyarrow:性能更好,支持最新 Parquet 格式
  • fastparquet:兼容性更佳,适合旧系统
  1. 压缩算法
  • snappy:快速压缩/解压(默认推荐)
  • gzip:高压缩率,速度较慢
  • zstd:平衡型(需 PyArrow>=2.0)
  1. 内存管理
  • 超过 1GB 的数据建议使用 Dask 分块处理
  • 使用 pd.read_parquet(..., filters=...) 提前过滤数据

以上代码可直接在 Jupyter Notebook 或 Python 脚本中运行,根据数据规模选择合适的方法。

Java处理Parquet

以下是使用 Java 操作 Parquet 文件的完整代码示例,包含创建、读取和切割操作:


一、环境依赖 (Maven)

<dependencies>  
 <!-- Parquet Core --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.13.1</version> </dependency> <!-- Avro 数据模型 --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.3</version> </dependency>     <!-- Hadoop Common(文件操作需要) --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.6</version> </dependency></dependencies>  

二、创建 Parquet 文件

步骤 1:定义 Avro Schema

// 创建 schema.avsc 文件  
{  
 "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": "int"}, {"name": "timestamp", "type": "long"} ]}  

步骤 2:生成 Avro 类

java -jar avro-tools-1.11.3.jar compile schema schema.avsc .```  
  
#### 步骤 3:写入 Parquet 文件  
```java  
import org.apache.avro.generic.GenericRecord;  
import org.apache.hadoop.fs.Path;  
import org.apache.parquet.avro.AvroParquetWriter;  
import org.apache.parquet.hadoop.ParquetWriter;  
import org.apache.parquet.hadoop.metadata.CompressionCodecName;  
  
public class ParquetWriterExample {  
 public static void main(String[] args) throws IOException { Path path = new Path("users.parquet"); Schema schema = new Schema.Parser().parse(new File("schema.avsc"));         try (ParquetWriter<GenericRecord> writer = AvroParquetWriter  
 .<GenericRecord>builder(path) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .build()) { // 写入示例数据  
 for (int i = 1; i <= 5; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("name", "User" + i); record.put("age", 20 + i); record.put("timestamp", System.currentTimeMillis()); writer.write(record); } } }}  

三、读取 Parquet 文件

基本读取

import org.apache.parquet.avro.AvroParquetReader;  
import org.apache.parquet.hadoop.ParquetReader;  
  
public class ParquetReaderExample {  
 public static void main(String[] args) throws IOException { Path path = new Path("users.parquet");         try (ParquetReader<GenericRecord> reader = AvroParquetReader  
 .<GenericRecord>builder(path) .build()) {             GenericRecord record;  
 while ((record = reader.read()) != null) { System.out.println("Read record: " + record); } } }}  

四、切割 Parquet 文件

场景 1:按条件过滤切割

public class ParquetSplitter {  
 public static void splitByAge(Path inputPath, Path outputPath1, Path outputPath2, int ageThreshold)            throws IOException {  
 // 创建两个写入器  
 try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build(); ParquetWriter<GenericRecord> writer1 = createWriter(outputPath1); ParquetWriter<GenericRecord> writer2 = createWriter(outputPath2)) {             GenericRecord record;  
 while ((record = reader.read()) != null) { int age = (Integer) record.get("age"); if (age < ageThreshold) { writer1.write(record); } else { writer2.write(record); } } } }  
 private static ParquetWriter<GenericRecord> createWriter(Path path) throws IOException { return AvroParquetWriter.<GenericRecord>builder(path) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); }  
 public static void main(String[] args) throws IOException { splitByAge( new Path("users.parquet"), new Path("young_users.parquet"), new Path("old_users.parquet"), 25 ); }}  

场景 2:按行数分块切割

public class ParquetRowSplitter {  
 public static void splitByRowCount(Path inputPath, String outputPrefix, int chunkSize)            throws IOException {  
         int fileCounter = 0;  
 int recordCounter = 0; ParquetWriter<GenericRecord> currentWriter = null;         try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputPath).build()) {  
             GenericRecord record;  
 while ((record = reader.read()) != null) { if (recordCounter % chunkSize == 0) { if (currentWriter != null) currentWriter.close(); currentWriter = createWriter(new Path(outputPrefix + fileCounter + ".parquet")); fileCounter++; } currentWriter.write(record); recordCounter++; } } finally { if (currentWriter != null) currentWriter.close(); } }  
 // main 方法省略...  
}  

五、高级操作

1. 使用投影读取部分列

// 只读取 id 和 name 列  
ParquetReader<GenericRecord> reader = AvroParquetReader  
 .<GenericRecord>builder(path) .withProjection(Schema.createRecord(Arrays.asList( new Schema.Field("id", Schema.create(Schema.Type.INT), "", null), new Schema.Field("name", Schema.create(Schema.Type.STRING), "", null) ))) .build();  

2. 使用谓词下推过滤

// 使用 Filter API 提前过滤  
FilterPredicate filter = FilterApi.gt(FilterApi.intColumn("age"), 25);  
  
ParquetReader<GenericRecord> reader = AvroParquetReader  
 .<GenericRecord>builder(path) .withFilter(filter) .build();  

六、性能优化建议

  1. 批量写入:使用 ParquetWriter 的自动缓冲机制(默认行组大小 128MB)
  2. 压缩选择
 .withCompressionCodec(CompressionCodecName.ZSTD) // 更高压缩率  
  1. 列式处理:优先使用投影减少 I/O
  2. 内存管理:处理大文件时使用分块读取(示例中的切割代码)

七、常见问题解决

  1. Schema 不匹配:确保读写使用相同的 Avro Schema
  2. Hadoop 依赖:若不在 Hadoop 环境运行,需添加 hadoop-common 依赖
  3. 版本冲突:保持 Parquet/Avro/Hadoop 版本兼容性

以上代码示例展示了 Java 中 Parquet 文件的核心操作流程,可直接用于大数据处理流水线的开发。对于超大规模数据,建议结合 Apache Spark 进行分布式处理。


总结

Parquet 凭借 列式存储、高压缩率、高性能查询,成为大数据生态的核心存储格式,尤其适合需要低成本存储、快速分析海量数据的场景。与 Spark、Presto 等工具深度集成,是构建数据湖和现代数仓的首选格式。

parquet