import os import glob from tqdm import tqdm import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed import argparse import time from brep2sdf.utils.logger import logger import numpy as np from OCC.Core.STEPControl import STEPControl_Reader from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh from OCC.Core.IFSelect import ( IFSelect_RetDone, IFSelect_RetError, IFSelect_RetFail, IFSelect_RetStop, IFSelect_RetVoid ) # 操作状态码器 from OCC.Core.StlAPI import StlAPI_Writer # Add this import from OCC.Core.Bnd import Bnd_Box from OCC.Core.BRepBndLib import brepbndlib from OCC.Core.gp import gp_Trsf, gp_Vec import tempfile import trimesh class STEPFileReadError(Exception): """自定义STEP文件读取异常类""" def __init__(self, status, file_path): self.status = status self.file_path = file_path status_messages = { IFSelect_RetVoid: "未执行任何操作", IFSelect_RetError: "文件读取过程中发生错误", IFSelect_RetFail: "文件读取完全失败", IFSelect_RetStop: "读取过程中断" } message = f"STEP文件读取失败: {status_messages.get(status, '未知错误')}\n文件: {file_path}" super().__init__(message) def load_step(step_path): """加载STEP文件并返回B-rep模型""" reader = STEPControl_Reader() status = reader.ReadFile(step_path) if status != IFSelect_RetDone: raise STEPFileReadError(status, step_path) reader.TransferRoots() return reader.OneShape() def brep_to_mesh(brep_shape, linear_deflection=0.01): """将B-rep转换为三角网格并归一化""" mesh = BRepMesh_IncrementalMesh(brep_shape, linear_deflection) mesh.Perform() # 导出为STL临时文件 stl_writer = StlAPI_Writer() stl_writer.SetASCIIMode(False) with tempfile.NamedTemporaryFile(suffix='.stl') as tmp: stl_writer.Write(brep_shape, tmp.name) mesh = trimesh.load(tmp.name) # 对mesh进行归一化 if len(mesh.vertices) == 0: raise ValueError("网格顶点为空") # 计算边界框 bounds = np.array([mesh.vertices.min(axis=0), mesh.vertices.max(axis=0)]) center = bounds.mean(axis=0) scale = 1.8 / max(bounds[1] - bounds[0]) # 留出0.1的余量 # 应用变换 mesh.vertices = (mesh.vertices - center) * scale # 验证结果 if not np.isfinite(mesh.vertices).all(): raise ValueError("归一化后的网格包含无效值(NaN或Inf)") bounds = np.array([mesh.vertices.min(), mesh.vertices.max()]) if not (-1.0 <= bounds.min() and bounds.max() <= 1.0): raise ValueError(f"归一化后的网格超出范围: [{bounds.min():.3f}, {bounds.max():.3f}]") return mesh def process_step_brep2mesh(step_path, output_obj_path=None, linear_deflection=0.01): """处理单个STEP文件""" try: # 1. 加载STEP文件 brep = load_step(step_path) if brep.IsNull(): raise ValueError("加载的BREP数据为空") # 2. 转换为三角网格并归一化 mesh = brep_to_mesh(brep, linear_deflection) if mesh is None: raise ValueError("网格转换失败") # 3. 验证网格 vertices = mesh.vertices faces = mesh.faces if len(vertices) == 0 or len(faces) == 0: raise ValueError(f"生成的网格无效: {len(vertices)}个顶点, {len(faces)}个面") # 4. 保存OBJ文件 if output_obj_path: try: mesh.export(output_obj_path) except Exception as e: raise IOError(f"保存OBJ文件失败: {str(e)}") return { 'mesh': mesh, 'vertices': vertices.astype(np.float32), 'faces': faces.astype(np.int32), 'is_watertight': mesh.is_watertight, 'obj_path': output_obj_path, 'stats': { 'num_vertices': len(vertices), 'num_faces': len(faces), 'bbox': [vertices.min(), vertices.max()] } } except Exception as e: # logger.error(f"处理STEP文件失败 {step_path}: {str(e)}") raise RuntimeError(f"处理STEP文件失败: {str(e)}") from e def prepare_task(step_path, args): """准备任务参数(在主进程执行)""" rel_path = os.path.relpath(step_path, args.input_dir) dir_name = os.path.dirname(rel_path) base_name = os.path.basename(step_path) # 生成输出文件名 obj_name = f"{os.path.basename(dir_name)}.obj" if dir_name else \ f"{os.path.splitext(base_name)[0]}.obj" return step_path, os.path.join(args.output_dir, obj_name) def worker_process(params): """工作进程函数(在子进程执行)""" step_path, obj_path, deflection = params try: if not os.path.exists(step_path): return { 'status': 'failed', 'file': step_path, 'error': f"文件不存在: {step_path}" } # 添加文件锁防止重复处理 lockfile = f"{obj_path}.lock" if os.path.exists(lockfile): return { 'status': 'failed', 'file': step_path, 'error': "文件正在被其他进程处理" } with open(lockfile, 'w') as f: f.write(str(os.getpid())) try: # 使用process_step_brep2mesh处理文件 result = process_step_brep2mesh(step_path, obj_path, deflection) return { 'status': 'success', 'file': step_path, 'vertices': len(result['vertices']), 'faces': len(result['faces']), 'watertight': result['is_watertight'] } except Exception as e: return { 'status': 'failed', 'file': step_path, 'error': str(e) } except Exception as e: return { 'status': 'failed', 'file': step_path, 'error': str(e) } finally: if 'lockfile' in locals() and os.path.exists(lockfile): os.remove(lockfile) def main(): ''' 批量处理 brep(.step文件),保存归一化 mesh 文件到 obj 文件夹结构: input_dir/ ├── id1/ │ └── id1_xxx.step ├── id2/ │ └── id2_xx.step output_dir/ ├── id1.obj ├── id2.obj ''' # 配置命令行参数 parser = argparse.ArgumentParser(description='STEP文件批量处理工具') parser.add_argument('-i', '--input_dir', required=True, help='输入目录路径,包含STEP文件的文件夹') parser.add_argument('-o', '--output_dir', required=True, help='输出目录路径,用于保存OBJ文件') parser.add_argument('-d', '--deflection', type=float, default=0.01, help='网格精度参数 (默认: 0.01)') parser.add_argument('-f', '--force', action='store_true', help='覆盖已存在的输出文件') parser.add_argument('-v', '--verbose', action='store_true', help='显示详细处理信息') args = parser.parse_args() # 创建输出目录 os.makedirs(args.output_dir, exist_ok=True) # 获取所有STEP文件 step_files = glob.glob(os.path.join(args.input_dir, "**/*.step"), recursive=True) step_files += glob.glob(os.path.join(args.input_dir, "**/*.stp"), recursive=True) if not step_files: print(f"未找到STEP文件,请检查输入目录: {input_dir}") return print(f"找到 {len(step_files)} 个STEP文件,开始处理...") # 准备所有任务 cpu_count = min(multiprocessing.cpu_count(), 8) max_workers = max(1, cpu_count - 1) # 保留1个核心 print(f"使用 {max_workers} cpu 并行处理") # 准备任务(主进程执行) tasks = [] for step_path in step_files: step_path, obj_path = prepare_task(step_path, args) if not args.force and os.path.exists(obj_path): if args.verbose: print(f"跳过: {obj_path}") continue tasks.append((step_path, obj_path, args.deflection)) # 进程池执行 with ProcessPoolExecutor( max_workers=max_workers, mp_context=multiprocessing.get_context('spawn') ) as executor: # 分块提交任务 chunk_size = 50 futures = [] success_count = 0 failure_count = 0 processed_files = set() start_time = time.time() # 初始化进度条 pbar = tqdm(total=len(tasks), desc="整体进度", unit="文件") for i in range(0, len(tasks), chunk_size): chunk = tasks[i:i + chunk_size] futures.extend(executor.submit(worker_process, task) for task in chunk) # 处理已完成任务 for future in as_completed(futures): result = future.result() if result['file'] in processed_files: # 跳过已处理文件 continue processed_files.add(result['file']) # 更新计数器 if result['status'] == 'success': success_count += 1 # 详细成功日志 pbar.write( f"[成功] {os.path.basename(result['file'])} " f"顶点:{result['vertices']} 面:{result['faces']} " f"水密:{result['watertight']}" ) else: failure_count += 1 if args.verbose: pbar.write( f"[失败] {os.path.basename(result['file'])} " f"错误: {result['error'][:100]}" ) # 更新进度条和实时统计 pbar.update(1) pbar.set_postfix({ '成功': success_count, '失败': failure_count, '成功率': f"{success_count/(success_count+failure_count):.1%}", '速度': f"{pbar.n/(time.time()-start_time):.1f}文件/秒" }) # 最终统计 pbar.close() print("\n" + "="*50) print(f"处理完成! 总耗时: {time.strftime('%H:%M:%S', time.gmtime(time.time()-start_time))}") print(f"成功: {success_count} | 失败: {failure_count} | 总数: {len(tasks)}") print(f"成功率: {success_count/len(tasks):.1%}") print("="*50) def test_single_step(step_path, output_obj_path=None, linear_deflection=0.01): """测试处理单个STEP文件 Args: step_path: STEP文件路径 output_obj_path: 输出OBJ文件路径(可选) linear_deflection: 网格精度参数 """ print(f"开始处理STEP文件: {step_path}") try: result = process_step_brep2mesh(step_path, output_obj_path, linear_deflection) # 打印处理结果 print("\n处理成功!") print(f"顶点数量: {len(result['vertices'])}") print(f"面片数量: {len(result['faces'])}") print(f"是否水密: {result['is_watertight']}") print(f"边界框: {result['stats']['bbox']}") if output_obj_path: print(f"已保存到: {output_obj_path}") return result except Exception as e: print(f"\n处理失败: {str(e)}") return None if __name__ == "__main__": main() ''' test_single_step( "/home/wch/brep2sdf/data/step/00002736/00002736_82034c87704b46a891e498d6_step_004.step", "/home/wch/brep2sdf/data/gt_mesh/00002736.obj" ) '''