Browse Source

step2pkl

main
王琛涵 4 months ago
parent
commit
19cf75f134
  1. 478
      scripts/process_brep.py

478
scripts/process_brep.py

@ -1,168 +1,255 @@
import os
import pickle
import os
import pickle
import argparse
import numpy as np
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError
from convert_utils import *
from occwl.io import load_step
from OCC.Core.STEPControl import STEPControl_Reader
from OCC.Core.TopExp import TopExp_Explorer, topexp
from OCC.Core.TopAbs import TopAbs_FACE, TopAbs_EDGE, TopAbs_VERTEX
from OCC.Core.BRep import BRep_Tool
from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh
from OCC.Core.TopLoc import TopLoc_Location
from OCC.Core.IFSelect import IFSelect_RetDone
from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape
from OCC.Core.BRepBndLib import brepbndlib
from OCC.Core.Bnd import Bnd_Box
from OCC.Core.TopoDS import TopoDS_Shape
from OCC.Core.TopoDS import topods
from OCC.Core.TopoDS import TopoDS_Vertex
# To speed up processing, define maximum threshold
MAX_FACE = 70
def normalize(surf_pnts, edge_pnts, corner_pnts):
"""
Various levels of normalization
"""
# Global normalization to -1~1
total_points = np.array(surf_pnts).reshape(-1, 3)
min_vals = np.min(total_points, axis=0)
max_vals = np.max(total_points, axis=0)
global_offset = min_vals + (max_vals - min_vals)/2
global_scale = max(max_vals - min_vals)
assert global_scale != 0, 'scale is zero'
surfs_wcs, edges_wcs, surfs_ncs, edges_ncs = [],[],[],[]
# Normalize corner
corner_wcs = (corner_pnts - global_offset[np.newaxis,:]) / (global_scale * 0.5)
# Normalize surface
for surf_pnt in surf_pnts:
# Normalize CAD to WCS
surf_pnt_wcs = (surf_pnt - global_offset[np.newaxis,np.newaxis,:]) / (global_scale * 0.5)
surfs_wcs.append(surf_pnt_wcs)
# Normalize Surface to NCS
min_vals = np.min(surf_pnt_wcs.reshape(-1,3), axis=0)
max_vals = np.max(surf_pnt_wcs.reshape(-1,3), axis=0)
local_offset = min_vals + (max_vals - min_vals)/2
local_scale = max(max_vals - min_vals)
pnt_ncs = (surf_pnt_wcs - local_offset[np.newaxis,np.newaxis,:]) / (local_scale * 0.5)
surfs_ncs.append(pnt_ncs)
# Normalize edge
for edge_pnt in edge_pnts:
# Normalize CAD to WCS
edge_pnt_wcs = (edge_pnt - global_offset[np.newaxis,:]) / (global_scale * 0.5)
edges_wcs.append(edge_pnt_wcs)
# Normalize Edge to NCS
min_vals = np.min(edge_pnt_wcs.reshape(-1,3), axis=0)
max_vals = np.max(edge_pnt_wcs.reshape(-1,3), axis=0)
local_offset = min_vals + (max_vals - min_vals)/2
local_scale = max(max_vals - min_vals)
pnt_ncs = (edge_pnt_wcs - local_offset) / (local_scale * 0.5)
edges_ncs.append(pnt_ncs)
assert local_scale != 0, 'scale is zero'
surfs_wcs = np.stack(surfs_wcs)
surfs_ncs = np.stack(surfs_ncs)
edges_wcs = np.stack(edges_wcs)
edges_ncs = np.stack(edges_ncs)
return surfs_wcs, edges_wcs, surfs_ncs, edges_ncs, corner_wcs
def parse_solid(solid):
"""
Parse the surface, curve, face, edge, vertex in a CAD solid.
Args:
- solid (occwl.solid): A single brep solid in occwl data format.
Returns:
- data: A dictionary containing all parsed data
"""
assert isinstance(solid, Solid)
# Split closed surface and closed curve to halve
solid = solid.split_all_closed_faces(num_splits=0)
solid = solid.split_all_closed_edges(num_splits=0)
if len(list(solid.faces())) > MAX_FACE:
return None
def normalize(surfs, edges, corners):
"""Normalize the CAD model to unit cube"""
if len(corners) == 0:
return None, None, None, None, None
# Get bounding box
corners_array = np.array(corners)
center = (corners_array.max(0) + corners_array.min(0)) / 2
scale = 1.0 / (corners_array.max(0) - corners_array.min(0)).max()
# Normalize surfaces
surfs_wcs = []
surfs_ncs = []
for surf in surfs:
surf_wcs = np.array(surf) # 确保是numpy数组
surf_ncs = (surf_wcs - center) * scale
surfs_wcs.append(surf_wcs)
surfs_ncs.append(surf_ncs)
# Normalize edges
edges_wcs = []
edges_ncs = []
for edge in edges:
edge_wcs = np.array(edge) # 确保是numpy数组
edge_ncs = (edge_wcs - center) * scale
edges_wcs.append(edge_wcs)
edges_ncs.append(edge_ncs)
# Normalize corners
corner_wcs = (corners_array - center) * scale
# 返回时保持列表形式
return (np.array(surfs_wcs, dtype=object),
np.array(edges_wcs, dtype=object),
np.array(surfs_ncs, dtype=object),
np.array(edges_ncs, dtype=object),
corner_wcs)
def get_adjacency_info(shape):
"""获取形状的邻接信息"""
# 创建数据映射
edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape()
topexp.MapShapesAndAncestors(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map)
# 获取所有面和边
faces = []
edges = []
vertices = []
face_explorer = TopExp_Explorer(shape, TopAbs_FACE)
edge_explorer = TopExp_Explorer(shape, TopAbs_EDGE)
vertex_explorer = TopExp_Explorer(shape, TopAbs_VERTEX)
# 收集所有面、边和顶点
while face_explorer.More():
faces.append(topods.Face(face_explorer.Current()))
face_explorer.Next()
while edge_explorer.More():
edges.append(topods.Edge(edge_explorer.Current()))
edge_explorer.Next()
while vertex_explorer.More():
vertices.append(topods.Vertex(vertex_explorer.Current()))
vertex_explorer.Next()
# 创建邻接矩阵
num_faces = len(faces)
num_edges = len(edges)
num_vertices = len(vertices)
edgeFace_adj = np.zeros((num_edges, num_faces), dtype=np.int32)
faceEdge_adj = np.zeros((num_faces, num_edges), dtype=np.int32)
edgeCorner_adj = np.zeros((num_edges, 2), dtype=np.int32)
# 填充边-面邻接矩阵
for i, edge in enumerate(edges):
# 获取与边相连的面
for j, face in enumerate(faces):
# 使用 explorer 检查边是否属于面
edge_explorer = TopExp_Explorer(face, TopAbs_EDGE)
while edge_explorer.More():
if edge.IsSame(edge_explorer.Current()):
edgeFace_adj[i, j] = 1
faceEdge_adj[j, i] = 1
break
edge_explorer.Next()
# 获取边的顶点
v1 = TopoDS_Vertex()
v2 = TopoDS_Vertex()
topexp.Vertices(edge, v1, v2)
if not v1.IsNull() and not v2.IsNull():
v1_vertex = topods.Vertex(v1)
v2_vertex = topods.Vertex(v2)
# 找到顶点的索引
for k, vertex in enumerate(vertices):
if v1_vertex.IsSame(vertex):
edgeCorner_adj[i, 0] = k
if v2_vertex.IsSame(vertex):
edgeCorner_adj[i, 1] = k
return edgeFace_adj, faceEdge_adj, edgeCorner_adj
def get_bbox(shape, subshape):
"""计算包围盒"""
bbox = Bnd_Box()
brepbndlib.Add(subshape, bbox)
xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get()
return np.array([xmin, ymin, zmin, xmax, ymax, zmax])
def parse_solid(step_path):
"""Parse the surface, curve, face, edge, vertex in a CAD solid using OCC."""
# Load STEP file
reader = STEPControl_Reader()
status = reader.ReadFile(step_path)
if status != IFSelect_RetDone:
raise Exception("Failed to read STEP file")
reader.TransferRoots()
shape = reader.OneShape()
# Create mesh
mesh = BRepMesh_IncrementalMesh(shape, 0.01)
mesh.Perform()
# Initialize explorers
face_explorer = TopExp_Explorer(shape, TopAbs_FACE)
edge_explorer = TopExp_Explorer(shape, TopAbs_EDGE)
vertex_explorer = TopExp_Explorer(shape, TopAbs_VERTEX)
face_pnts = []
edge_pnts = []
corner_pnts = []
surf_bbox_wcs = []
edge_bbox_wcs = []
# Extract face points
while face_explorer.More():
face = topods.Face(face_explorer.Current())
loc = TopLoc_Location()
triangulation = BRep_Tool.Triangulation(face, loc)
# Extract all B-rep primitives and their adjacency information
face_pnts, edge_pnts, edge_corner_pnts, edgeFace_IncM, faceEdge_IncM = extract_primitive(solid)
if triangulation is not None:
points = []
for i in range(1, triangulation.NbNodes() + 1):
node = triangulation.Node(i)
pnt = node.Transformed(loc.Transformation())
points.append([pnt.X(), pnt.Y(), pnt.Z()])
if points:
points = np.array(points, dtype=np.float32)
if len(points.shape) == 2 and points.shape[1] == 3:
face_pnts.append(points)
surf_bbox_wcs.append(get_bbox(shape, face))
face_explorer.Next()
# Extract edge points
num_samples = 100
while edge_explorer.More():
edge = topods.Edge(edge_explorer.Current())
curve, first, last = BRep_Tool.Curve(edge)
if curve is not None:
points = []
for i in range(num_samples):
param = first + (last - first) * float(i) / (num_samples - 1)
pnt = curve.Value(param)
points.append([pnt.X(), pnt.Y(), pnt.Z()])
if points:
points = np.array(points, dtype=np.float32)
if len(points.shape) == 2 and points.shape[1] == 3:
edge_pnts.append(points)
edge_bbox_wcs.append(get_bbox(shape, edge))
edge_explorer.Next()
# Extract vertex points
while vertex_explorer.More():
vertex = topods.Vertex(vertex_explorer.Current())
pnt = BRep_Tool.Pnt(vertex)
corner_pnts.append([pnt.X(), pnt.Y(), pnt.Z()])
vertex_explorer.Next()
# 获取邻接信息
edgeFace_adj, faceEdge_adj, edgeCorner_adj = get_adjacency_info(shape)
# 转换为numpy数组,但保持列表形式
face_pnts = list(face_pnts) # 确保是列表
edge_pnts = list(edge_pnts) # 确保是列表
corner_pnts = np.array(corner_pnts, dtype=np.float32)
surf_bbox_wcs = np.array(surf_bbox_wcs, dtype=np.float32)
edge_bbox_wcs = np.array(edge_bbox_wcs, dtype=np.float32)
# Normalize the CAD model
surfs_wcs, edges_wcs, surfs_ncs, edges_ncs, corner_wcs = normalize(face_pnts, edge_pnts, edge_corner_pnts)
# Remove duplicate and merge corners
corner_wcs = np.round(corner_wcs,4)
corner_unique = []
for corner_pnt in corner_wcs.reshape(-1,3):
if len(corner_unique) == 0:
corner_unique = corner_pnt.reshape(1,3)
else:
# Check if it exist or not
exists = np.any(np.all(corner_unique == corner_pnt, axis=1))
if exists:
continue
else:
corner_unique = np.concatenate([corner_unique, corner_pnt.reshape(1,3)], 0)
# Edge-corner adjacency
edgeCorner_IncM = []
for edge_corner in corner_wcs:
start_corner_idx = np.where((corner_unique == edge_corner[0]).all(axis=1))[0].item()
end_corner_idx = np.where((corner_unique == edge_corner[1]).all(axis=1))[0].item()
edgeCorner_IncM.append([start_corner_idx, end_corner_idx])
edgeCorner_IncM = np.array(edgeCorner_IncM)
# Surface global bbox
surf_bboxes = []
for pnts in surfs_wcs:
min_point, max_point = get_bbox(pnts.reshape(-1,3))
surf_bboxes.append( np.concatenate([min_point, max_point]))
surf_bboxes = np.vstack(surf_bboxes)
# Edge global bbox
edge_bboxes = []
for pnts in edges_wcs:
min_point, max_point = get_bbox(pnts.reshape(-1,3))
edge_bboxes.append(np.concatenate([min_point, max_point]))
edge_bboxes = np.vstack(edge_bboxes)
# Convert to float32 to save space
surfs_wcs, edges_wcs, surfs_ncs, edges_ncs, corner_wcs = normalize(face_pnts, edge_pnts, corner_pnts)
# Create result dictionary
data = {
'surf_wcs':surfs_wcs.astype(np.float32),
'edge_wcs':edges_wcs.astype(np.float32),
'surf_ncs':surfs_ncs.astype(np.float32),
'edge_ncs':edges_ncs.astype(np.float32),
'corner_wcs':corner_wcs.astype(np.float32),
'edgeFace_adj': edgeFace_IncM,
'edgeCorner_adj':edgeCorner_IncM,
'faceEdge_adj':faceEdge_IncM,
'surf_bbox_wcs':surf_bboxes.astype(np.float32),
'edge_bbox_wcs':edge_bboxes.astype(np.float32),
'corner_unique':corner_unique.astype(np.float32),
'surf_wcs': surfs_wcs,
'edge_wcs': edges_wcs,
'surf_ncs': surfs_ncs,
'edge_ncs': edges_ncs,
'corner_wcs': corner_wcs.astype(np.float32),
'edgeFace_adj': edgeFace_adj,
'edgeCorner_adj': edgeCorner_adj,
'faceEdge_adj': faceEdge_adj,
'surf_bbox_wcs': surf_bbox_wcs,
'edge_bbox_wcs': edge_bbox_wcs,
'corner_unique': np.unique(corner_wcs, axis=0).astype(np.float32)
}
return data
def load_step(step_path):
"""Load STEP file and return solids"""
reader = STEPControl_Reader()
reader.ReadFile(step_path)
reader.TransferRoots()
return [reader.OneShape()]
def process(step_folder, timeout=300):
"""
Helper function to load step files and process in parallel
Args:
- step_folder (str): Path to the STEP parent folder.
Returns:
- Complete status: Valid (1) / Non-valid (0).
"""
def process(step_path, timeout=300):
"""Process single STEP file"""
try:
# Load cad data
# print(step_folder)
if step_folder.endswith('.step'):
step_path = step_folder
process_furniture = True
else:
for _, _, files in os.walk(step_folder):
assert len(files) == 1
step_path = os.path.join(step_folder, files[0])
process_furniture = False
# Check single solid
cad_solid = load_step(step_path)
if len(cad_solid)!=1:
@ -170,13 +257,13 @@ def process(step_folder, timeout=300):
return 0
# Start data parsing
data = parse_solid(cad_solid[0])
data = parse_solid(step_path)
if data is None:
print ('Exceeding threshold...')
return 0 # number of faces or edges exceed pre-determined threshold
return 0
# Save the parsed result
if process_furniture:
if 'furniture' in step_path:
data_uid = step_path.split('/')[-2] + '_' + step_path.split('/')[-1]
sub_folder = step_path.split('/')[-3]
else:
@ -184,7 +271,7 @@ def process(step_folder, timeout=300):
sub_folder = data_uid[:4]
if data_uid.endswith('.step'):
data_uid = data_uid[:-5] # furniture avoid .step
data_uid = data_uid[:-5]
data['uid'] = data_uid
save_folder = os.path.join(OUTPUT, sub_folder)
@ -201,8 +288,67 @@ def process(step_folder, timeout=300):
print('not saving due to error...', str(e))
return 0
def test(step_file_path, output_path=None):
"""
测试函数转换单个STEP文件并保存结果
"""
try:
print(f"Processing STEP file: {step_file_path}")
# 解析STEP文件
data = parse_solid(step_file_path)
if data is None:
print("Failed to parse STEP file")
return None
# 打印统计信息
print("\nStatistics:")
print(f"Number of surfaces: {len(data['surf_wcs'])}")
print(f"Number of edges: {len(data['edge_wcs'])}")
print(f"Number of corners: {len(data['corner_unique'])}")
# 保存结果
if output_path:
print(f"\nSaving results to: {output_path}")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'wb') as f:
pickle.dump(data, f)
print("Results saved successfully")
return data
except Exception as e:
print(f"Error processing STEP file: {str(e)}")
return None
def load_furniture_step(data_path):
"""Load furniture STEP files"""
step_dirs = []
for split in ['train', 'val', 'test']:
split_path = os.path.join(data_path, split)
if os.path.exists(split_path):
for f in os.listdir(split_path):
if f.endswith('.step'):
step_dirs.append(os.path.join(split_path, f))
return step_dirs
def load_abc_step(data_path, is_deepcad=False):
"""Load ABC/DeepCAD STEP files"""
step_dirs = []
for f in sorted(os.listdir(data_path)):
if os.path.isdir(os.path.join(data_path, f)):
if is_deepcad:
step_path = os.path.join(data_path, f, f+'.step')
else:
step_path = os.path.join(data_path, f, 'shape.step')
if os.path.exists(step_path):
step_dirs.append(step_path)
return step_dirs
if __name__ == '__main__':
def main():
"""
主函数处多个STEP文件
"""
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str, help="Data folder path", required=True)
parser.add_argument("--option", type=str, choices=['abc', 'deepcad', 'furniture'], default='abc',
@ -210,6 +356,7 @@ if __name__ == '__main__':
parser.add_argument("--interval", type=int, help="Data range index, only required for abc/deepcad")
args = parser.parse_args()
global OUTPUT
if args.option == 'deepcad':
OUTPUT = 'deepcad_parsed'
elif args.option == 'abc':
@ -240,9 +387,26 @@ if __name__ == '__main__':
print(f"Timeout occurred while processing {futures[future]}")
except Exception as e:
print(f"An error occurred while processing {futures[future]}: {e}")
#convert_iter = Pool(os.cpu_count()).imap(process, step_dirs)
#for status in tqdm(convert_iter, total=len(step_dirs)):
# valid += status
print(f'Done... Data Converted Ratio {100.0*valid/len(step_dirs)}%')
print(f'Done... Data Converted Ratio {100.0*valid/len(step_dirs)}%')
if __name__ == '__main__':
import sys
if len(sys.argv) > 1 and sys.argv[1] == '--test':
# 测试模式
if len(sys.argv) < 3:
print("Usage: python process_brep.py --test <step_file_path> [output_path]")
sys.exit(1)
step_file = sys.argv[2]
output_file = sys.argv[3] if len(sys.argv) > 3 else None
print("Running in test mode...")
result = test(step_file, output_file)
if result is not None:
print("\nTest completed successfully!")
else:
# 正常批处理模式
main()
Loading…
Cancel
Save