""" CAD模型处理脚本 功能:将STEP格式的CAD模型转换为结构化数据,包括: - 几何信息:面、边、顶点的坐标数据 - 拓扑信息:面-边-顶点的邻接关系 - 空间信息:包围盒数据 """ import os import pickle # 用于数据序列化 import argparse # 命令行参数解析 import numpy as np from tqdm import tqdm # 进度条显示 from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError # 并行处理 import logging from datetime import datetime # 创建logs目录 os.makedirs('logs', exist_ok=True) # 设置日志记录器 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 创建文件处理器 current_time = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = f'logs/process_brep_{current_time}.log' file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) # 添加文件处理器到日志记录器 logger.addHandler(file_handler) # 记录脚本开始执行 logger.info("="*50) logger.info("Script started") logger.info(f"Log file: {log_file}") logger.info("="*50) # 导入OpenCASCADE相关库 from OCC.Core.STEPControl import STEPControl_Reader # STEP文件读取器 from OCC.Core.TopExp import TopExp_Explorer, topexp # 拓扑结构遍历 from OCC.Core.TopAbs import TopAbs_FACE, TopAbs_EDGE, TopAbs_VERTEX # 拓扑类型定义 from OCC.Core.BRep import BRep_Tool # B-rep工具 from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh # 网格剖分 from OCC.Core.TopLoc import TopLoc_Location # 位置变换 from OCC.Core.IFSelect import IFSelect_RetDone # 操作状态码 from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape # 形状映射 from OCC.Core.BRepBndLib import brepbndlib # 包围盒计算 from OCC.Core.Bnd import Bnd_Box # 包围盒 from OCC.Core.TopoDS import TopoDS_Shape, topods, TopoDS_Vertex # 拓扑数据结构 # 设置最大面数阈值,用于加速处理 MAX_FACE = 70 def normalize(surfs, edges, corners): """ 将CAD模型归一化到单位立方体空间 参数: surfs: 面的点集列表 edges: 边的点集列表 corners: 顶点坐标列表 返回: surfs_wcs: 原始坐标系下的面点集 edges_wcs: 原始坐标系下的边点集 surfs_ncs: 归一化坐标系下的面点集 edges_ncs: 归一化坐标系下的边点集 corner_wcs: 归一化后的顶点坐标 """ if len(corners) == 0: return None, None, None, None, None # 计算包围盒和缩放因子 corners_array = np.array(corners) center = (corners_array.max(0) + corners_array.min(0)) / 2 # 计算中心点 scale = 1.0 / (corners_array.max(0) - corners_array.min(0)).max() # 计算缩放系数 # 归一化面的点集 surfs_wcs = [] # 原始世界坐标系下的面点集 surfs_ncs = [] # 归一化坐标系下的面点集 for surf in surfs: surf_wcs = np.array(surf) surf_ncs = (surf_wcs - center) * scale # 归一化变换 surfs_wcs.append(surf_wcs) surfs_ncs.append(surf_ncs) # 归一化边的点集 edges_wcs = [] # 原始世界坐标系下的边点集 edges_ncs = [] # 归一化坐标系下的边点集 for edge in edges: edge_wcs = np.array(edge) edge_ncs = (edge_wcs - center) * scale # 归一化变换 edges_wcs.append(edge_wcs) edges_ncs.append(edge_ncs) # 归一化顶点坐标 corner_wcs = (corners_array - center) * scale return (np.array(surfs_wcs, dtype=object), np.array(edges_wcs, dtype=object), np.array(surfs_ncs, dtype=object), np.array(edges_ncs, dtype=object), corner_wcs) def get_adjacency_info(shape): """ 获取CAD模型中面、边、顶点之间的邻接关系 参数: shape: CAD模型的形状对象 返回: edgeFace_adj: 边-面邻接矩阵 (num_edges × num_faces) faceEdge_adj: 面-边邻接矩阵 (num_faces × num_edges) edgeCorner_adj: 边-顶点邻接矩阵 (num_edges × 2) """ # 创建边-面映射关系 edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape() topexp.MapShapesAndAncestors(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map) # 获取所有几何元素 faces = [] # 存储所有面 edges = [] # 存储所有边 vertices = [] # 存储所有顶点 # 创建拓扑结构探索器 face_explorer = TopExp_Explorer(shape, TopAbs_FACE) edge_explorer = TopExp_Explorer(shape, TopAbs_EDGE) vertex_explorer = TopExp_Explorer(shape, TopAbs_VERTEX) # 收集所有几何元素 while face_explorer.More(): faces.append(topods.Face(face_explorer.Current())) face_explorer.Next() while edge_explorer.More(): edges.append(topods.Edge(edge_explorer.Current())) edge_explorer.Next() while vertex_explorer.More(): vertices.append(topods.Vertex(vertex_explorer.Current())) vertex_explorer.Next() # 创建邻接矩阵 num_faces = len(faces) num_edges = len(edges) num_vertices = len(vertices) edgeFace_adj = np.zeros((num_edges, num_faces), dtype=np.int32) faceEdge_adj = np.zeros((num_faces, num_edges), dtype=np.int32) edgeCorner_adj = np.zeros((num_edges, 2), dtype=np.int32) # 填充边-面邻接矩阵 for i, edge in enumerate(edges): # 检查每个面是否与当前边相连 for j, face in enumerate(faces): edge_explorer = TopExp_Explorer(face, TopAbs_EDGE) while edge_explorer.More(): if edge.IsSame(edge_explorer.Current()): edgeFace_adj[i, j] = 1 faceEdge_adj[j, i] = 1 break edge_explorer.Next() # 获取边的两个端点 v1 = TopoDS_Vertex() v2 = TopoDS_Vertex() topexp.Vertices(edge, v1, v2) # 记录边的端点索引 if not v1.IsNull() and not v2.IsNull(): v1_vertex = topods.Vertex(v1) v2_vertex = topods.Vertex(v2) for k, vertex in enumerate(vertices): if v1_vertex.IsSame(vertex): edgeCorner_adj[i, 0] = k if v2_vertex.IsSame(vertex): edgeCorner_adj[i, 1] = k return edgeFace_adj, faceEdge_adj, edgeCorner_adj def get_bbox(shape, subshape): """ 计算形状的包围盒 参数: shape: 完整的CAD模型形状 subshape: 需要计算包围盒的子形状(面或边) 返回: 包围盒的六个参数 [xmin, ymin, zmin, xmax, ymax, zmax] """ bbox = Bnd_Box() brepbndlib.Add(subshape, bbox) xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get() return np.array([xmin, ymin, zmin, xmax, ymax, zmax]) def parse_solid(step_path): """Parse the surface, curve, face, edge, vertex in a CAD solid using OCC.""" # Load STEP file reader = STEPControl_Reader() status = reader.ReadFile(step_path) if status != IFSelect_RetDone: raise Exception("Failed to read STEP file") reader.TransferRoots() shape = reader.OneShape() # Create mesh mesh = BRepMesh_IncrementalMesh(shape, 0.01) mesh.Perform() # Initialize explorers face_explorer = TopExp_Explorer(shape, TopAbs_FACE) edge_explorer = TopExp_Explorer(shape, TopAbs_EDGE) vertex_explorer = TopExp_Explorer(shape, TopAbs_VERTEX) face_pnts = [] edge_pnts = [] corner_pnts = [] surf_bbox_wcs = [] edge_bbox_wcs = [] # Extract face points while face_explorer.More(): face = topods.Face(face_explorer.Current()) loc = TopLoc_Location() triangulation = BRep_Tool.Triangulation(face, loc) if triangulation is not None: points = [] for i in range(1, triangulation.NbNodes() + 1): node = triangulation.Node(i) pnt = node.Transformed(loc.Transformation()) points.append([pnt.X(), pnt.Y(), pnt.Z()]) if points: points = np.array(points, dtype=np.float32) if len(points.shape) == 2 and points.shape[1] == 3: face_pnts.append(points) surf_bbox_wcs.append(get_bbox(shape, face)) face_explorer.Next() # Extract edge points num_samples = 100 while edge_explorer.More(): edge = topods.Edge(edge_explorer.Current()) curve, first, last = BRep_Tool.Curve(edge) if curve is not None: points = [] for i in range(num_samples): param = first + (last - first) * float(i) / (num_samples - 1) pnt = curve.Value(param) points.append([pnt.X(), pnt.Y(), pnt.Z()]) if points: points = np.array(points, dtype=np.float32) if len(points.shape) == 2 and points.shape[1] == 3: edge_pnts.append(points) edge_bbox_wcs.append(get_bbox(shape, edge)) edge_explorer.Next() # Extract vertex points while vertex_explorer.More(): vertex = topods.Vertex(vertex_explorer.Current()) pnt = BRep_Tool.Pnt(vertex) corner_pnts.append([pnt.X(), pnt.Y(), pnt.Z()]) vertex_explorer.Next() # 获取邻接信息 edgeFace_adj, faceEdge_adj, edgeCorner_adj = get_adjacency_info(shape) # 转换为numpy数组,但保持列表形式 face_pnts = list(face_pnts) # 确保是列表 edge_pnts = list(edge_pnts) # 确保是列表 corner_pnts = np.array(corner_pnts, dtype=np.float32) surf_bbox_wcs = np.array(surf_bbox_wcs, dtype=np.float32) edge_bbox_wcs = np.array(edge_bbox_wcs, dtype=np.float32) # Normalize the CAD model surfs_wcs, edges_wcs, surfs_ncs, edges_ncs, corner_wcs = normalize(face_pnts, edge_pnts, corner_pnts) # Create result dictionary data = { 'surf_wcs': surfs_wcs, 'edge_wcs': edges_wcs, 'surf_ncs': surfs_ncs, 'edge_ncs': edges_ncs, 'corner_wcs': corner_wcs.astype(np.float32), 'edgeFace_adj': edgeFace_adj, 'edgeCorner_adj': edgeCorner_adj, 'faceEdge_adj': faceEdge_adj, 'surf_bbox_wcs': surf_bbox_wcs, 'edge_bbox_wcs': edge_bbox_wcs, 'corner_unique': np.unique(corner_wcs, axis=0).astype(np.float32) } return data def load_step(step_path): """Load STEP file and return solids""" reader = STEPControl_Reader() reader.ReadFile(step_path) reader.TransferRoots() return [reader.OneShape()] def process_single_step( step_path:str, output_path:str=None, timeout:int=300 ) -> dict: """Process single STEP file""" try: # 解析STEP文件 data = parse_solid(step_path) if data is None: logger.error("Failed to parse STEP file") return None # 保存结果 if output_path: try: logger.info(f"Saving results to: {output_path}") os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'wb') as f: pickle.dump(data, f) logger.info("Results saved successfully") except Exception as e: logger.error(f'Not saving due to error: {str(e)}') return data except Exception as e: logger.error(f'Not saving due to error: {str(e)}') return 0 def test(step_file_path, output_path=None): """ 测试函数:转换单个STEP文件并保存结果 """ try: logger.info(f"Processing STEP file: {step_file_path}") # 解析STEP文件 data = parse_solid(step_file_path) if data is None: logger.error("Failed to parse STEP file") return None # 打印统计信息 logger.info("\nStatistics:") logger.info(f"Number of surfaces: {len(data['surf_wcs'])}") logger.info(f"Number of edges: {len(data['edge_wcs'])}") logger.info(f"Number of corners: {len(data['corner_unique'])}") # 保存结果 if output_path: logger.info(f"Saving results to: {output_path}") os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'wb') as f: pickle.dump(data, f) logger.info("Results saved successfully") return data except Exception as e: logger.error(f"Error processing STEP file: {str(e)}") return None def process_furniture_step(data_path): """ 处理家具数据集的STEP文件 参数: data_path: 数据集路径 返回: 包含训练、验证和测试集的STEP文件路径字典 { 'train': [step_file_path1, step_file_path2, ...], 'val': [step_file_path1, step_file_path2, ...], 'test': [step_file_path1, step_file_path2, ...] } """ step_dirs = {} for split in ['train', 'val', 'test']: tmp_step_dirs = [] split_path = os.path.join(data_path, split) if os.path.exists(split_path): for f in os.listdir(split_path): if f.endswith('.step'): tmp_step_dirs.append(f) step_dirs[split] = tmp_step_dirs return step_dirs def main(): """ 主函数:处理多个STEP文件 """ # 定义路径常量 INPUT = '/mnt/disk2/dataset/furniture/step/furniture_dataset_step/' OUTPUT = 'test_data/pkl/' RESULT = 'test_data/result/' # 用于存储成功/失败文件记录 # 确保输出目录存在 os.makedirs(OUTPUT, exist_ok=True) os.makedirs(RESULT, exist_ok=True) # 获取所有STEP文件 step_dirs_dict = process_furniture_step(INPUT) total_processed = 0 total_success = 0 # 按数据集分割处理文件 for split in ['train', 'val', 'test']: current_step_dirs = step_dirs_dict[split] if not current_step_dirs: logger.warning(f"No files found in {split} split") continue # 确保分割目录存在 split_output_dir = os.path.join(OUTPUT, split) split_result_dir = os.path.join(RESULT, split) os.makedirs(split_output_dir, exist_ok=True) os.makedirs(split_result_dir, exist_ok=True) success_files = [] # 存储成功处理的文件名 failed_files = [] # 存储失败的文件名及原因 # 并行处理文件 with ProcessPoolExecutor(max_workers=os.cpu_count() // 2) as executor: futures = {} for step_file in current_step_dirs: input_path = os.path.join(INPUT, split, step_file) output_path = os.path.join(split_output_dir, step_file.replace('.step', '.pkl')) future = executor.submit(process_single_step, input_path, output_path, timeout=300) futures[future] = step_file # 处理结果 for future in tqdm(as_completed(futures), total=len(current_step_dirs), desc=f"Processing {split} set"): try: status = future.result(timeout=300) if status is not None: success_files.append(futures[future]) total_success += 1 except TimeoutError: logger.error(f"Timeout occurred while processing {futures[future]}") failed_files.append((futures[future], "Timeout")) except Exception as e: logger.error(f"Error processing {futures[future]}: {str(e)}") failed_files.append((futures[future], str(e))) finally: total_processed += 1 # 保存成功文件列表 success_file_path = os.path.join(split_result_dir, 'success.txt') with open(success_file_path, 'w', encoding='utf-8') as f: f.write('\n'.join(success_files)) logger.info(f"Saved {len(success_files)} successful files to {success_file_path}") # 保存失败文件列表(包含错误信息) failed_file_path = os.path.join(split_result_dir, 'failed.txt') with open(failed_file_path, 'w', encoding='utf-8') as f: for file, error in failed_files: f.write(f"{file}: {error}\n") logger.info(f"Saved {len(failed_files)} failed files to {failed_file_path}") # 打印最终统计信息 if total_processed > 0: success_rate = (total_success / total_processed) * 100 logger.info(f"Processing completed:") logger.info(f"Total files processed: {total_processed}") logger.info(f"Successfully processed: {total_success}") logger.info(f"Success rate: {success_rate:.2f}%") else: logger.warning("No files were processed") if __name__ == '__main__': main()