You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
10 KiB

import os
import glob
from tqdm import tqdm
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import argparse
import time
import numpy as np
from OCC.Core.STEPControl import STEPControl_Reader
from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh
from OCC.Core.TopoDS import TopoDS_Shape
from OCC.Core.IFSelect import (
IFSelect_RetDone,
IFSelect_RetError,
IFSelect_RetFail,
IFSelect_RetStop,
IFSelect_RetVoid
) # 操作状态码
from OCC.Core.StlAPI import StlAPI_Writer
from OCC.Core.Bnd import Bnd_Box
from OCC.Core.BRepBndLib import brepbndlib
from OCC.Core.gp import gp_Trsf, gp_Vec
import tempfile
import trimesh
class STEPFileReadError(Exception):
"""自定义STEP文件读取异常类"""
def __init__(self, status, file_path):
self.status = status
self.file_path = file_path
status_messages = {
IFSelect_RetVoid: "未执行任何操作",
IFSelect_RetError: "文件读取过程中发生错误",
IFSelect_RetFail: "文件读取完全失败",
IFSelect_RetStop: "读取过程中断"
}
message = f"STEP文件读取失败: {status_messages.get(status, '未知错误')}\n文件: {file_path}"
super().__init__(message)
def load_step(step_path):
"""加载STEP文件并返回B-rep模型"""
reader = STEPControl_Reader()
status = reader.ReadFile(step_path)
if status != IFSelect_RetDone:
raise STEPFileReadError(status, step_path)
reader.TransferRoots()
return reader.OneShape()
def normalize_brep(brep_shape):
"""归一化B-rep模型到单位立方体空间[-1,1]³"""
# 计算原始包围盒
bbox = Bnd_Box()
brepbndlib.Add(brep_shape, bbox)
xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get()
# 计算变换参数
center = gp_Vec((xmin+xmax)/2, (ymin+ymax)/2, (zmin+zmax)/2)
max_dim = max(xmax-xmin, ymax-ymin, zmax-zmin)
scale = 2.0 / max_dim # 缩放到[-1,1]范围
# 创建并应用变换
transform = gp_Trsf()
transform.SetTranslation(-center)
transform.SetScaleFactor(scale)
from OCC.Core.BRepBuilderAPI import BRepBuilderAPI_Transform
return BRepBuilderAPI_Transform(brep_shape, transform, True).Shape()
def brep_to_mesh(brep_shape, linear_deflection=0.01):
"""将B-rep转换为三角网格"""
mesh = BRepMesh_IncrementalMesh(brep_shape, linear_deflection)
mesh.Perform()
# 导出为STL临时文件
stl_writer = StlAPI_Writer()
stl_writer.SetASCIIMode(False)
with tempfile.NamedTemporaryFile(suffix='.stl') as tmp:
stl_writer.Write(brep_shape, tmp.name)
return trimesh.load(tmp.name)
def process_step_brep2mesh(step_path, output_obj_path=None, linear_deflection=0.01):
"""处理单个STEP文件"""
# 1. 加载STEP文件
brep = load_step(step_path)
# 2. 归一化B-rep
normalized_brep = normalize_brep(brep)
# 3. 转换为三角网格
mesh = brep_to_mesh(normalized_brep, linear_deflection)
# 4. 验证归一化结果
vertices = mesh.vertices
assert np.all(vertices >= -1.0001) and np.all(vertices <= 1.0001)
# 5. 保存OBJ文件
if output_obj_path:
mesh.export(output_obj_path)
return {
'mesh': mesh,
'vertices': vertices.astype(np.float32),
'faces': mesh.faces.astype(np.int32),
'is_watertight': mesh.is_watertight,
'obj_path': output_obj_path
}
def prepare_task(step_path, args):
"""准备任务参数(在主进程执行)"""
rel_path = os.path.relpath(step_path, args.input_dir)
dir_name = os.path.dirname(rel_path)
base_name = os.path.basename(step_path)
# 生成输出文件名
obj_name = f"{os.path.basename(dir_name)}.obj" if dir_name else \
f"{os.path.splitext(base_name)[0]}.obj"
return step_path, os.path.join(args.output_dir, obj_name)
def worker_process(params):
"""工作进程函数(在子进程执行)"""
step_path, obj_path, deflection = params
try:
if not os.path.exists(step_path):
return {'status': 'failed', 'error': '文件不存在', 'file': step_path}
# 添加文件锁防止重复处理
lockfile = f"{obj_path}.lock"
if os.path.exists(lockfile):
return {'status': 'skipped', 'error': '文件正在被其他进程处理', 'file': step_path}
with open(lockfile, 'w') as f:
f.write(str(os.getpid()))
# 核心处理逻辑
brep = load_step(step_path)
if brep.IsNull():
return {'status': 'failed', 'error': 'BREP数据为空', 'file': step_path}
mesh = brep_to_mesh(normalize_brep(brep), deflection)
# 验证结果
vertices = mesh.vertices.astype(np.float32)
assert np.all(vertices >= -1.0001) and np.all(vertices <= 1.0001)
# 保存文件
mesh.export(obj_path)
return {
'status': 'success',
'file': step_path,
'vertices': len(vertices),
'faces': len(mesh.faces),
'watertight': mesh.is_watertight
}
except Exception as e:
return {
'status': 'failed',
'file': step_path,
'error': str(e)
}
finally:
if 'lockfile' in locals() and os.path.exists(lockfile):
os.remove(lockfile)
def main():
'''
批量处理 brep(.step文件)保存归一化 mesh 文件到 obj
文件夹结构
input_dir/
id1/
id1_xxx.step
id2/
id2_xx.step
output_dir/
id1.obj
id2.obj
'''
# 配置命令行参数
parser = argparse.ArgumentParser(description='STEP文件批量处理工具')
parser.add_argument('-i', '--input_dir', required=True,
help='输入目录路径,包含STEP文件的文件夹')
parser.add_argument('-o', '--output_dir', required=True,
help='输出目录路径,用于保存OBJ文件')
parser.add_argument('-d', '--deflection', type=float, default=0.01,
help='网格精度参数 (默认: 0.01)')
parser.add_argument('-f', '--force', action='store_true',
help='覆盖已存在的输出文件')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细处理信息')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 获取所有STEP文件
step_files = glob.glob(os.path.join(args.input_dir, "**/*.step"), recursive=True)
step_files += glob.glob(os.path.join(args.input_dir, "**/*.stp"), recursive=True)
if not step_files:
print(f"未找到STEP文件,请检查输入目录: {input_dir}")
return
print(f"找到 {len(step_files)} 个STEP文件,开始处理...")
# 准备所有任务
cpu_count = min(multiprocessing.cpu_count(), 8)
max_workers = max(1, cpu_count - 1) # 保留1个核心
print(f"使用 {max_workers} cpu 并行处理")
# 准备任务(主进程执行)
tasks = []
for step_path in step_files:
step_path, obj_path = prepare_task(step_path, args)
if not args.force and os.path.exists(obj_path):
if args.verbose:
print(f"跳过: {obj_path}")
continue
tasks.append((step_path, obj_path, args.deflection))
# 进程池执行
with ProcessPoolExecutor(
max_workers=max_workers,
mp_context=multiprocessing.get_context('spawn')
) as executor:
# 分块提交任务
chunk_size = 50
futures = []
success_count = 0
failure_count = 0
processed_files = set()
start_time = time.time()
# 初始化进度条
pbar = tqdm(total=len(tasks), desc="整体进度", unit="文件")
for i in range(0, len(tasks), chunk_size):
chunk = tasks[i:i + chunk_size]
futures.extend(executor.submit(worker_process, task) for task in chunk)
# 处理已完成任务
for future in as_completed(futures):
result = future.result()
if result['file'] in processed_files: # 跳过已处理文件
continue
processed_files.add(result['file'])
# 更新计数器
if result['status'] == 'success':
success_count += 1
# 详细成功日志
pbar.write(
f"[成功] {os.path.basename(result['file'])} "
f"顶点:{result['vertices']} 面:{result['faces']} "
f"水密:{result['watertight']}"
)
else:
failure_count += 1
if args.verbose:
pbar.write(
f"[失败] {os.path.basename(result['file'])} "
f"错误: {result['error'][:100]}"
)
# 更新进度条和实时统计
pbar.update(1)
pbar.set_postfix({
'成功': success_count,
'失败': failure_count,
'成功率': f"{success_count/(success_count+failure_count):.1%}",
'速度': f"{pbar.n/(time.time()-start_time):.1f}文件/秒"
})
# 最终统计
pbar.close()
print("\n" + "="*50)
print(f"处理完成! 总耗时: {time.strftime('%H:%M:%S', time.gmtime(time.time()-start_time))}")
print(f"成功: {success_count} | 失败: {failure_count} | 总数: {len(tasks)}")
print(f"成功率: {success_count/len(tasks):.1%}")
print("="*50)
if __name__ == "__main__":
main()