You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

348 lines
12 KiB

import os
import glob
from tqdm import tqdm
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import argparse
import time
from brep2sdf.utils.logger import logger
import numpy as np
from OCC.Core.STEPControl import STEPControl_Reader
from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh
from OCC.Core.IFSelect import (
IFSelect_RetDone,
IFSelect_RetError,
IFSelect_RetFail,
IFSelect_RetStop,
IFSelect_RetVoid
) # 操作状态码器
from OCC.Core.StlAPI import StlAPI_Writer # Add this import
from OCC.Core.Bnd import Bnd_Box
from OCC.Core.BRepBndLib import brepbndlib
from OCC.Core.gp import gp_Trsf, gp_Vec
import tempfile
import trimesh
class STEPFileReadError(Exception):
"""自定义STEP文件读取异常类"""
def __init__(self, status, file_path):
self.status = status
self.file_path = file_path
status_messages = {
IFSelect_RetVoid: "未执行任何操作",
IFSelect_RetError: "文件读取过程中发生错误",
IFSelect_RetFail: "文件读取完全失败",
IFSelect_RetStop: "读取过程中断"
}
message = f"STEP文件读取失败: {status_messages.get(status, '未知错误')}\n文件: {file_path}"
super().__init__(message)
def load_step(step_path):
"""加载STEP文件并返回B-rep模型"""
reader = STEPControl_Reader()
status = reader.ReadFile(step_path)
if status != IFSelect_RetDone:
raise STEPFileReadError(status, step_path)
reader.TransferRoots()
return reader.OneShape()
def brep_to_mesh(brep_shape, linear_deflection=0.01):
"""将B-rep转换为三角网格并归一化"""
mesh = BRepMesh_IncrementalMesh(brep_shape, linear_deflection)
mesh.Perform()
# 导出为STL临时文件
stl_writer = StlAPI_Writer()
stl_writer.SetASCIIMode(False)
with tempfile.NamedTemporaryFile(suffix='.stl') as tmp:
stl_writer.Write(brep_shape, tmp.name)
mesh = trimesh.load(tmp.name)
# 对mesh进行归一化
if len(mesh.vertices) == 0:
raise ValueError("网格顶点为空")
# 计算边界框
bounds = np.array([mesh.vertices.min(axis=0), mesh.vertices.max(axis=0)])
center = bounds.mean(axis=0)
scale = 1.8 / max(bounds[1] - bounds[0]) # 留出0.1的余量
# 应用变换
mesh.vertices = (mesh.vertices - center) * scale
# 验证结果
if not np.isfinite(mesh.vertices).all():
raise ValueError("归一化后的网格包含无效值(NaN或Inf)")
bounds = np.array([mesh.vertices.min(), mesh.vertices.max()])
if not (-1.0 <= bounds.min() and bounds.max() <= 1.0):
raise ValueError(f"归一化后的网格超出范围: [{bounds.min():.3f}, {bounds.max():.3f}]")
return mesh
def process_step_brep2mesh(step_path, output_obj_path=None, linear_deflection=0.01):
"""处理单个STEP文件"""
try:
# 1. 加载STEP文件
brep = load_step(step_path)
if brep.IsNull():
raise ValueError("加载的BREP数据为空")
# 2. 转换为三角网格并归一化
mesh = brep_to_mesh(brep, linear_deflection)
if mesh is None:
raise ValueError("网格转换失败")
# 3. 验证网格
vertices = mesh.vertices
faces = mesh.faces
if len(vertices) == 0 or len(faces) == 0:
raise ValueError(f"生成的网格无效: {len(vertices)}个顶点, {len(faces)}个面")
# 4. 保存OBJ文件
if output_obj_path:
try:
mesh.export(output_obj_path)
except Exception as e:
raise IOError(f"保存OBJ文件失败: {str(e)}")
return {
'mesh': mesh,
'vertices': vertices.astype(np.float32),
'faces': faces.astype(np.int32),
'is_watertight': mesh.is_watertight,
'obj_path': output_obj_path,
'stats': {
'num_vertices': len(vertices),
'num_faces': len(faces),
'bbox': [vertices.min(), vertices.max()]
}
}
except Exception as e:
# logger.error(f"处理STEP文件失败 {step_path}: {str(e)}")
raise RuntimeError(f"处理STEP文件失败: {str(e)}") from e
def prepare_task(step_path, args):
"""准备任务参数(在主进程执行)"""
rel_path = os.path.relpath(step_path, args.input_dir)
dir_name = os.path.dirname(rel_path)
base_name = os.path.basename(step_path)
# 生成输出文件名
obj_name = f"{os.path.basename(dir_name)}.obj" if dir_name else \
f"{os.path.splitext(base_name)[0]}.obj"
return step_path, os.path.join(args.output_dir, obj_name)
def worker_process(params):
"""工作进程函数(在子进程执行)"""
step_path, obj_path, deflection = params
try:
if not os.path.exists(step_path):
return {
'status': 'failed',
'file': step_path,
'error': f"文件不存在: {step_path}"
}
# 添加文件锁防止重复处理
lockfile = f"{obj_path}.lock"
if os.path.exists(lockfile):
return {
'status': 'failed',
'file': step_path,
'error': "文件正在被其他进程处理"
}
with open(lockfile, 'w') as f:
f.write(str(os.getpid()))
try:
# 使用process_step_brep2mesh处理文件
result = process_step_brep2mesh(step_path, obj_path, deflection)
return {
'status': 'success',
'file': step_path,
'vertices': len(result['vertices']),
'faces': len(result['faces']),
'watertight': result['is_watertight']
}
except Exception as e:
return {
'status': 'failed',
'file': step_path,
'error': str(e)
}
except Exception as e:
return {
'status': 'failed',
'file': step_path,
'error': str(e)
}
finally:
if 'lockfile' in locals() and os.path.exists(lockfile):
os.remove(lockfile)
def main():
'''
批量处理 brep(.step文件),保存归一化 mesh 文件到 obj
文件夹结构:
input_dir/
├── id1/
│ └── id1_xxx.step
├── id2/
│ └── id2_xx.step
output_dir/
├── id1.obj
├── id2.obj
'''
# 配置命令行参数
parser = argparse.ArgumentParser(description='STEP文件批量处理工具')
parser.add_argument('-i', '--input_dir', required=True,
help='输入目录路径,包含STEP文件的文件夹')
parser.add_argument('-o', '--output_dir', required=True,
help='输出目录路径,用于保存OBJ文件')
parser.add_argument('-d', '--deflection', type=float, default=0.01,
help='网格精度参数 (默认: 0.01)')
parser.add_argument('-f', '--force', action='store_true',
help='覆盖已存在的输出文件')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细处理信息')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 获取所有STEP文件
step_files = glob.glob(os.path.join(args.input_dir, "**/*.step"), recursive=True)
step_files += glob.glob(os.path.join(args.input_dir, "**/*.stp"), recursive=True)
if not step_files:
print(f"未找到STEP文件,请检查输入目录: {input_dir}")
return
print(f"找到 {len(step_files)} 个STEP文件,开始处理...")
# 准备所有任务
cpu_count = min(multiprocessing.cpu_count(), 8)
max_workers = max(1, cpu_count - 1) # 保留1个核心
print(f"使用 {max_workers} cpu 并行处理")
# 准备任务(主进程执行)
tasks = []
for step_path in step_files:
step_path, obj_path = prepare_task(step_path, args)
if not args.force and os.path.exists(obj_path):
if args.verbose:
print(f"跳过: {obj_path}")
continue
tasks.append((step_path, obj_path, args.deflection))
# 进程池执行
with ProcessPoolExecutor(
max_workers=max_workers,
mp_context=multiprocessing.get_context('spawn')
) as executor:
# 分块提交任务
chunk_size = 50
futures = []
success_count = 0
failure_count = 0
processed_files = set()
start_time = time.time()
# 初始化进度条
pbar = tqdm(total=len(tasks), desc="整体进度", unit="文件")
for i in range(0, len(tasks), chunk_size):
chunk = tasks[i:i + chunk_size]
futures.extend(executor.submit(worker_process, task) for task in chunk)
# 处理已完成任务
for future in as_completed(futures):
result = future.result()
if result['file'] in processed_files: # 跳过已处理文件
continue
processed_files.add(result['file'])
# 更新计数器
if result['status'] == 'success':
success_count += 1
# 详细成功日志
pbar.write(
f"[成功] {os.path.basename(result['file'])} "
f"顶点:{result['vertices']} 面:{result['faces']} "
f"水密:{result['watertight']}"
)
else:
failure_count += 1
if args.verbose:
pbar.write(
f"[失败] {os.path.basename(result['file'])} "
f"错误: {result['error'][:100]}"
)
# 更新进度条和实时统计
pbar.update(1)
pbar.set_postfix({
'成功': success_count,
'失败': failure_count,
'成功率': f"{success_count/(success_count+failure_count):.1%}",
'速度': f"{pbar.n/(time.time()-start_time):.1f}文件/秒"
})
# 最终统计
pbar.close()
print("\n" + "="*50)
print(f"处理完成! 总耗时: {time.strftime('%H:%M:%S', time.gmtime(time.time()-start_time))}")
print(f"成功: {success_count} | 失败: {failure_count} | 总数: {len(tasks)}")
print(f"成功率: {success_count/len(tasks):.1%}")
print("="*50)
def test_single_step(step_path, output_obj_path=None, linear_deflection=0.01):
"""测试处理单个STEP文件
Args:
step_path: STEP文件路径
output_obj_path: 输出OBJ文件路径(可选)
linear_deflection: 网格精度参数
"""
print(f"开始处理STEP文件: {step_path}")
try:
result = process_step_brep2mesh(step_path, output_obj_path, linear_deflection)
# 打印处理结果
print("\n处理成功!")
print(f"顶点数量: {len(result['vertices'])}")
print(f"面片数量: {len(result['faces'])}")
print(f"是否水密: {result['is_watertight']}")
print(f"边界框: {result['stats']['bbox']}")
if output_obj_path:
print(f"已保存到: {output_obj_path}")
return result
except Exception as e:
print(f"\n处理失败: {str(e)}")
return None
if __name__ == "__main__":
main()
'''
test_single_step(
"/home/wch/brep2sdf/data/step/00002736/00002736_82034c87704b46a891e498d6_step_004.step",
"/home/wch/brep2sdf/data/gt_mesh/00002736.obj"
)
'''