from typing import Tuple, Union import argparse import os import glob import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed from tqdm import tqdm import trimesh import numpy as np from scipy.interpolate import interp1d def get_feature_edges( mesh: trimesh.Trimesh, angle_threshold: float = 30.0 ) -> Tuple[np.ndarray, np.ndarray]: """ 从网格中提取特征边及对应二面角 参数: mesh: 输入的三角网格对象(trimesh.Trimesh) angle_threshold: 特征边判定阈值(单位:度),默认30度 返回: (edges, dihedrals) 元组: - edges: 边坐标数组,形状为(N,2,3) - dihedrals: 对应二面角数组,形状为(N,) """ # 获取所有边 edges = mesh.edges_unique edge_vectors = mesh.vertices[edges] # 计算每条边的二面角 dihedrals = [] for edge in edges: # 获取共享该边的面 faces = mesh.face_adjacency_edges edge_faces = np.where(faces == edge)[0] print(len(edge_faces)) if len(edge_faces) == 2: # 只处理有两个相邻面的边 # 计算面法向量夹角(二面角) normals = mesh.face_normals[mesh.face_adjacency[edge_faces]] angle = np.degrees(np.arccos(np.clip(np.dot(normals[0], normals[1]), -1, 1))) dihedrals.append(angle) else: dihedrals.append(0) dihedrals = np.array(dihedrals) # 筛选特征边 feature_mask = dihedrals > angle_threshold feature_edges = edge_vectors[feature_mask] feature_dihedrals = dihedrals[feature_mask] return feature_edges, feature_dihedrals def sample_feature_edges( edges: np.ndarray, dihedrals: np.ndarray, min_length: float = 4e-3, min_samples: int = 3, max_samples: int = 20, sampling_dist: str = 'cosine', return_indices: bool = False ) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]]: """ 对特征边进行采样 参数: edges: 边坐标数组,形状为(N,2,3) dihedrals: 对应二面角数组,形状为(N,) min_length: 最小边长度阈值(单位:米),默认0.1 min_samples: 每条边最少采样点数,默认3 max_samples: 每条边最多采样点数,默认20 sampling_dist: 采样分布类型: 'linear' - 线性均匀采样 'cosine' - 余弦分布(中间密集) 'quadratic' - 二次方分布 return_indices: 是否返回原始边索引 返回: 默认返回 (sampled_points, sampled_dihedrals) 当return_indices=True时返回 (sampled_points, sampled_dihedrals, edge_indices) """ sampled_points = [] sampled_dihedrals = [] for edge, dihedral in zip(edges, dihedrals): # 计算边长度 length = np.linalg.norm(edge[1] - edge[0]) # 过滤短边 if length < min_length: continue # 动态计算采样点数(基于长度) n_samples = min(max_samples, max(min_samples, int(length * 10))) # 生成采样参数(使用平方分布使中心区域采样更密集) t = np.linspace(0, 1, n_samples) t = 0.5 * (1 - np.cos(t * np.pi)) # 转换为非线性分布 # 插值采样 interpolator = interp1d([0, 1], edge, axis=0) points = interpolator(t) sampled_points.extend(points) sampled_dihedrals.extend([dihedral] * n_samples) return np.array(sampled_points), np.array(sampled_dihedrals) def save_ptangle(filename, points, angles): """ 保存为.ptangle格式: 每行: x y z angle(度) """ data = np.column_stack([points, angles]) np.savetxt(filename, data, fmt='%.6f %.6f %.6f %.2f') def load_ptangle(filename): """ 加载.ptangle文件 返回: (points, angles) """ data = np.loadtxt(filename) return data[:, :3], data[:, 3] def process_mesh_to_ptangle(obj_path, output_path, edge_min_samples=3, edge_max_samples=20, angle_threshold=30, min_edge_length=0.01): """ 完整处理流程: 1. 加载OBJ网格 2. 检测特征边 3. 采样特征点 4. 保存为.ptangle格式 返回: dict: 包含处理状态和结果信息的字典 """ result = { 'status': 'unknown', 'file': obj_path, 'points': 0, 'max_angle': 0, 'error': None } try: # 1. 加载网格 try: mesh = trimesh.load(obj_path) if not isinstance(mesh, trimesh.Trimesh): raise ValueError("Input must be a triangular mesh") print(f"成功加载网格: 顶点数={len(mesh.vertices)}, 面数={len(mesh.faces)}") except Exception as e: raise RuntimeError(f"网格加载失败: {str(e)}") from e # 2. 网格修复 try: if not mesh.is_watertight: print("检测到非水密网格,正在填充孔洞...") mesh.fill_holes() except Exception as e: raise RuntimeError(f"网格修复失败: {str(e)}") from e # 3. 特征边检测 try: edges, dihedrals = get_feature_edges( mesh=mesh, angle_threshold=angle_threshold, ) print(f"找到 {len(edges)} 条特征边") if len(edges) == 0: print("警告: 未检测到任何特征边") except Exception as e: raise RuntimeError(f"特征边检测失败: {str(e)}") from e # 4. 特征边采样 try: points, angles = sample_feature_edges( edges=edges, dihedrals=dihedrals, min_length=min_edge_length, min_samples=edge_min_samples, max_samples=edge_max_samples ) print(f"生成 {len(points)} 个采样点") if len(points) == 0: raise RuntimeError("采样结果为空") except Exception as e: raise RuntimeError(f"采样失败: {str(e)}") from e # 5. 保存结果 try: save_ptangle(output_path, points, angles) print(f"结果已保存到 {output_path}") except Exception as e: raise RuntimeError(f"保存结果失败: {str(e)}") from e # 更新成功结果 result.update({ 'status': 'success', 'points': len(points), 'max_angle': float(np.max(angles)) if len(angles) > 0 else 0.0 }) except Exception as e: result.update({ 'status': 'error', 'error': str(e) }) print(f"\n处理失败: {str(e)}") print("\n处理完成! 状态:", result['status']) return result def main(): """ NOTE: 有问题 批量处理OBJ网格,采样特征点及其二面角 保存为.ptangle格式(每行:x y z dihedral_angle) """ parser = argparse.ArgumentParser(description='网格特征点采样工具') parser.add_argument('-i', '--input_dir', required=True, help='输入OBJ文件或目录') parser.add_argument('-o', '--output_dir', required=True, help='输出.ptangle文件或目录') parser.add_argument('--edge_min_samples', type=int, default=3, help='每条边最小采样点数 (默认: 3)') parser.add_argument('--edge_max_samples', type=int, default=30, help='每条边最大采样点数 (默认: 30)') parser.add_argument('-t', '--angle_threshold', type=float, default=30.0, help='特征二面角阈值(度) (默认: 30)') parser.add_argument('--min-length', type=float, default=4e-3, help='最小特征边长度(默认: 0.01)') parser.add_argument('--samples-per-edge', type=int, default=3, help='每条边的采样点数(默认: 3)') parser.add_argument('--merge-groups', action='store_true', help='合并连接的特征边组') parser.add_argument('-f', '--force', action='store_true', help='覆盖已存在的输出文件') parser.add_argument('-v', '--verbose', action='store_true', help='显示详细处理信息') args = parser.parse_args() # 创建输出目录 os.makedirs(args.output_dir, exist_ok=True) # 获取所有OBJ文件 obj_files = glob.glob(os.path.join(args.input_dir, "*.obj")) if not obj_files: print(f"错误: 在 {args.input_dir} 中未找到OBJ文件") return # 准备任务列表 tasks = [] for obj_path in obj_files: file_id = os.path.splitext(os.path.basename(obj_path))[0] output_path = os.path.join(args.output_dir, f"{file_id}.ptangle") if not args.force and os.path.exists(output_path): if args.verbose: print(f"跳过已存在文件: {output_path}") continue tasks.append((obj_path, output_path, args.edge_min_samples, args.edge_max_samples, args.angle_threshold, args.min_length)) # 并行处理配置 cpu_count = min(multiprocessing.cpu_count(), 8) max_workers = max(1, cpu_count - 1) # 处理状态统计 stats = { 'total': len(tasks), 'success': 0, 'failure': 0, 'max_angle': 0.0 } # 进度条包装函数 def update_stats(result): if result['status'] == 'success': stats['success'] += 1 stats['max_angle'] = max(stats['max_angle'], result['max_angle']) if args.verbose: print(f"成功处理 {os.path.basename(result['file'])}: " f"{result['points']}点, 最大角度{result['max_angle']:.1f}°") else: stats['failure'] += 1 print(f"处理失败 {os.path.basename(result['file'])}: {result['error']}") # 并行处理 with ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [] for task in tasks: futures.append(executor.submit(process_mesh_to_ptangle, *task)) # 进度显示 with tqdm(total=len(tasks), desc="处理进度") as pbar: for future in as_completed(futures): update_stats(future.result()) pbar.update(1) pbar.set_postfix({ '成功': stats['success'], '失败': stats['failure'], '最大角度': f"{stats['max_angle']:.1f}°" }) # 输出总结 print("\n" + "="*50) print(f"处理完成! 总文件数: {stats['total']}") print(f"成功: {stats['success']} | 失败: {stats['failure']}") print(f"最大二面角: {stats['max_angle']:.1f}°") print(f"成功率: {stats['success']/stats['total']:.1%}") print("="*50) if __name__ == "__main__": main()