import os import glob from tqdm import tqdm import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed import argparse import time import numpy as np from OCC.Core.STEPControl import STEPControl_Reader from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh from OCC.Core.TopoDS import TopoDS_Shape from OCC.Core.IFSelect import ( IFSelect_RetDone, IFSelect_RetError, IFSelect_RetFail, IFSelect_RetStop, IFSelect_RetVoid ) # 操作状态码 from OCC.Core.StlAPI import StlAPI_Writer from OCC.Core.Bnd import Bnd_Box from OCC.Core.BRepBndLib import brepbndlib from OCC.Core.gp import gp_Trsf, gp_Vec import tempfile import trimesh class STEPFileReadError(Exception): """自定义STEP文件读取异常类""" def __init__(self, status, file_path): self.status = status self.file_path = file_path status_messages = { IFSelect_RetVoid: "未执行任何操作", IFSelect_RetError: "文件读取过程中发生错误", IFSelect_RetFail: "文件读取完全失败", IFSelect_RetStop: "读取过程中断" } message = f"STEP文件读取失败: {status_messages.get(status, '未知错误')}\n文件: {file_path}" super().__init__(message) def load_step(step_path): """加载STEP文件并返回B-rep模型""" reader = STEPControl_Reader() status = reader.ReadFile(step_path) if status != IFSelect_RetDone: raise STEPFileReadError(status, step_path) reader.TransferRoots() return reader.OneShape() def normalize_brep(brep_shape): """归一化B-rep模型到单位立方体空间[-1,1]³""" # 计算原始包围盒 bbox = Bnd_Box() brepbndlib.Add(brep_shape, bbox) xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get() # 计算变换参数 center = gp_Vec((xmin+xmax)/2, (ymin+ymax)/2, (zmin+zmax)/2) max_dim = max(xmax-xmin, ymax-ymin, zmax-zmin) scale = 2.0 / max_dim # 缩放到[-1,1]范围 # 创建并应用变换 transform = gp_Trsf() transform.SetTranslation(-center) transform.SetScaleFactor(scale) from OCC.Core.BRepBuilderAPI import BRepBuilderAPI_Transform return BRepBuilderAPI_Transform(brep_shape, transform, True).Shape() def brep_to_mesh(brep_shape, linear_deflection=0.01): """将B-rep转换为三角网格""" mesh = BRepMesh_IncrementalMesh(brep_shape, linear_deflection) mesh.Perform() # 导出为STL临时文件 stl_writer = StlAPI_Writer() stl_writer.SetASCIIMode(False) with tempfile.NamedTemporaryFile(suffix='.stl') as tmp: stl_writer.Write(brep_shape, tmp.name) return trimesh.load(tmp.name) def process_step_brep2mesh(step_path, output_obj_path=None, linear_deflection=0.01): """处理单个STEP文件""" # 1. 加载STEP文件 brep = load_step(step_path) # 2. 归一化B-rep normalized_brep = normalize_brep(brep) # 3. 转换为三角网格 mesh = brep_to_mesh(normalized_brep, linear_deflection) # 4. 验证归一化结果 vertices = mesh.vertices assert np.all(vertices >= -1.0001) and np.all(vertices <= 1.0001) # 5. 保存OBJ文件 if output_obj_path: mesh.export(output_obj_path) return { 'mesh': mesh, 'vertices': vertices.astype(np.float32), 'faces': mesh.faces.astype(np.int32), 'is_watertight': mesh.is_watertight, 'obj_path': output_obj_path } def prepare_task(step_path, args): """准备任务参数(在主进程执行)""" rel_path = os.path.relpath(step_path, args.input_dir) dir_name = os.path.dirname(rel_path) base_name = os.path.basename(step_path) # 生成输出文件名 obj_name = f"{os.path.basename(dir_name)}.obj" if dir_name else \ f"{os.path.splitext(base_name)[0]}.obj" return step_path, os.path.join(args.output_dir, obj_name) def worker_process(params): """工作进程函数(在子进程执行)""" step_path, obj_path, deflection = params try: if not os.path.exists(step_path): return {'status': 'failed', 'error': '文件不存在', 'file': step_path} # 添加文件锁防止重复处理 lockfile = f"{obj_path}.lock" if os.path.exists(lockfile): return {'status': 'skipped', 'error': '文件正在被其他进程处理', 'file': step_path} with open(lockfile, 'w') as f: f.write(str(os.getpid())) # 核心处理逻辑 brep = load_step(step_path) if brep.IsNull(): return {'status': 'failed', 'error': 'BREP数据为空', 'file': step_path} mesh = brep_to_mesh(normalize_brep(brep), deflection) # 验证结果 vertices = mesh.vertices.astype(np.float32) assert np.all(vertices >= -1.0001) and np.all(vertices <= 1.0001) # 保存文件 mesh.export(obj_path) return { 'status': 'success', 'file': step_path, 'vertices': len(vertices), 'faces': len(mesh.faces), 'watertight': mesh.is_watertight } except Exception as e: return { 'status': 'failed', 'file': step_path, 'error': str(e) } finally: if 'lockfile' in locals() and os.path.exists(lockfile): os.remove(lockfile) def main(): ''' 批量处理 brep(.step文件),保存归一化 mesh 文件到 obj 文件夹结构: input_dir/ ├── id1/ │ └── id1_xxx.step ├── id2/ │ └── id2_xx.step output_dir/ ├── id1.obj ├── id2.obj ''' # 配置命令行参数 parser = argparse.ArgumentParser(description='STEP文件批量处理工具') parser.add_argument('-i', '--input_dir', required=True, help='输入目录路径,包含STEP文件的文件夹') parser.add_argument('-o', '--output_dir', required=True, help='输出目录路径,用于保存OBJ文件') parser.add_argument('-d', '--deflection', type=float, default=0.01, help='网格精度参数 (默认: 0.01)') parser.add_argument('-f', '--force', action='store_true', help='覆盖已存在的输出文件') parser.add_argument('-v', '--verbose', action='store_true', help='显示详细处理信息') args = parser.parse_args() # 创建输出目录 os.makedirs(args.output_dir, exist_ok=True) # 获取所有STEP文件 step_files = glob.glob(os.path.join(args.input_dir, "**/*.step"), recursive=True) step_files += glob.glob(os.path.join(args.input_dir, "**/*.stp"), recursive=True) if not step_files: print(f"未找到STEP文件,请检查输入目录: {input_dir}") return print(f"找到 {len(step_files)} 个STEP文件,开始处理...") # 准备所有任务 cpu_count = min(multiprocessing.cpu_count(), 8) max_workers = max(1, cpu_count - 1) # 保留1个核心 print(f"使用 {max_workers} cpu 并行处理") # 准备任务(主进程执行) tasks = [] for step_path in step_files: step_path, obj_path = prepare_task(step_path, args) if not args.force and os.path.exists(obj_path): if args.verbose: print(f"跳过: {obj_path}") continue tasks.append((step_path, obj_path, args.deflection)) # 进程池执行 with ProcessPoolExecutor( max_workers=max_workers, mp_context=multiprocessing.get_context('spawn') ) as executor: # 分块提交任务 chunk_size = 50 futures = [] success_count = 0 failure_count = 0 processed_files = set() start_time = time.time() # 初始化进度条 pbar = tqdm(total=len(tasks), desc="整体进度", unit="文件") for i in range(0, len(tasks), chunk_size): chunk = tasks[i:i + chunk_size] futures.extend(executor.submit(worker_process, task) for task in chunk) # 处理已完成任务 for future in as_completed(futures): result = future.result() if result['file'] in processed_files: # 跳过已处理文件 continue processed_files.add(result['file']) # 更新计数器 if result['status'] == 'success': success_count += 1 # 详细成功日志 pbar.write( f"[成功] {os.path.basename(result['file'])} " f"顶点:{result['vertices']} 面:{result['faces']} " f"水密:{result['watertight']}" ) else: failure_count += 1 if args.verbose: pbar.write( f"[失败] {os.path.basename(result['file'])} " f"错误: {result['error'][:100]}" ) # 更新进度条和实时统计 pbar.update(1) pbar.set_postfix({ '成功': success_count, '失败': failure_count, '成功率': f"{success_count/(success_count+failure_count):.1%}", '速度': f"{pbar.n/(time.time()-start_time):.1f}文件/秒" }) # 最终统计 pbar.close() print("\n" + "="*50) print(f"处理完成! 总耗时: {time.strftime('%H:%M:%S', time.gmtime(time.time()-start_time))}") print(f"成功: {success_count} | 失败: {failure_count} | 总数: {len(tasks)}") print(f"成功率: {success_count/len(tasks):.1%}") print("="*50) if __name__ == "__main__": main()