You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

321 lines
11 KiB

from typing import Tuple, Union
import argparse
import os
import glob
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import trimesh
import numpy as np
from scipy.interpolate import interp1d
def get_feature_edges(
mesh: trimesh.Trimesh,
angle_threshold: float = 30.0
) -> Tuple[np.ndarray, np.ndarray]:
"""
从网格中提取特征边及对应二面角
参数:
mesh: 输入的三角网格对象(trimesh.Trimesh)
angle_threshold: 特征边判定阈值(单位:度),默认30度
返回:
(edges, dihedrals) 元组:
- edges: 边坐标数组,形状为(N,2,3)
- dihedrals: 对应二面角数组,形状为(N,)
"""
# 获取所有边
edges = mesh.edges_unique
edge_vectors = mesh.vertices[edges]
# 计算每条边的二面角
dihedrals = []
for edge in edges:
# 获取共享该边的面
faces = mesh.face_adjacency_edges
edge_faces = np.where(faces == edge)[0]
print(len(edge_faces))
if len(edge_faces) == 2: # 只处理有两个相邻面的边
# 计算面法向量夹角(二面角)
normals = mesh.face_normals[mesh.face_adjacency[edge_faces]]
angle = np.degrees(np.arccos(np.clip(np.dot(normals[0], normals[1]), -1, 1)))
dihedrals.append(angle)
else:
dihedrals.append(0)
dihedrals = np.array(dihedrals)
# 筛选特征边
feature_mask = dihedrals > angle_threshold
feature_edges = edge_vectors[feature_mask]
feature_dihedrals = dihedrals[feature_mask]
return feature_edges, feature_dihedrals
def sample_feature_edges(
edges: np.ndarray,
dihedrals: np.ndarray,
min_length: float = 4e-3,
min_samples: int = 3,
max_samples: int = 20,
sampling_dist: str = 'cosine',
return_indices: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
对特征边进行采样
参数:
edges: 边坐标数组,形状为(N,2,3)
dihedrals: 对应二面角数组,形状为(N,)
min_length: 最小边长度阈值(单位:米),默认0.1
min_samples: 每条边最少采样点数,默认3
max_samples: 每条边最多采样点数,默认20
sampling_dist: 采样分布类型:
'linear' - 线性均匀采样
'cosine' - 余弦分布(中间密集)
'quadratic' - 二次方分布
return_indices: 是否返回原始边索引
返回:
默认返回 (sampled_points, sampled_dihedrals)
当return_indices=True时返回 (sampled_points, sampled_dihedrals, edge_indices)
"""
sampled_points = []
sampled_dihedrals = []
for edge, dihedral in zip(edges, dihedrals):
# 计算边长度
length = np.linalg.norm(edge[1] - edge[0])
# 过滤短边
if length < min_length:
continue
# 动态计算采样点数(基于长度)
n_samples = min(max_samples, max(min_samples, int(length * 10)))
# 生成采样参数(使用平方分布使中心区域采样更密集)
t = np.linspace(0, 1, n_samples)
t = 0.5 * (1 - np.cos(t * np.pi)) # 转换为非线性分布
# 插值采样
interpolator = interp1d([0, 1], edge, axis=0)
points = interpolator(t)
sampled_points.extend(points)
sampled_dihedrals.extend([dihedral] * n_samples)
return np.array(sampled_points), np.array(sampled_dihedrals)
def save_ptangle(filename, points, angles):
"""
保存为.ptangle格式:
每行: x y z angle(度)
"""
data = np.column_stack([points, angles])
np.savetxt(filename, data, fmt='%.6f %.6f %.6f %.2f')
def load_ptangle(filename):
"""
加载.ptangle文件
返回: (points, angles)
"""
data = np.loadtxt(filename)
return data[:, :3], data[:, 3]
def process_mesh_to_ptangle(obj_path, output_path, edge_min_samples=3, edge_max_samples=20, angle_threshold=30, min_edge_length=0.01):
"""
完整处理流程:
1. 加载OBJ网格
2. 检测特征边
3. 采样特征点
4. 保存为.ptangle格式
返回:
dict: 包含处理状态和结果信息的字典
"""
result = {
'status': 'unknown',
'file': obj_path,
'points': 0,
'max_angle': 0,
'error': None
}
try:
# 1. 加载网格
try:
mesh = trimesh.load(obj_path)
if not isinstance(mesh, trimesh.Trimesh):
raise ValueError("Input must be a triangular mesh")
print(f"成功加载网格: 顶点数={len(mesh.vertices)}, 面数={len(mesh.faces)}")
except Exception as e:
raise RuntimeError(f"网格加载失败: {str(e)}") from e
# 2. 网格修复
try:
if not mesh.is_watertight:
print("检测到非水密网格,正在填充孔洞...")
mesh.fill_holes()
except Exception as e:
raise RuntimeError(f"网格修复失败: {str(e)}") from e
# 3. 特征边检测
try:
edges, dihedrals = get_feature_edges(
mesh=mesh,
angle_threshold=angle_threshold,
)
print(f"找到 {len(edges)} 条特征边")
if len(edges) == 0:
print("警告: 未检测到任何特征边")
except Exception as e:
raise RuntimeError(f"特征边检测失败: {str(e)}") from e
# 4. 特征边采样
try:
points, angles = sample_feature_edges(
edges=edges,
dihedrals=dihedrals,
min_length=min_edge_length,
min_samples=edge_min_samples,
max_samples=edge_max_samples
)
print(f"生成 {len(points)} 个采样点")
if len(points) == 0:
raise RuntimeError("采样结果为空")
except Exception as e:
raise RuntimeError(f"采样失败: {str(e)}") from e
# 5. 保存结果
try:
save_ptangle(output_path, points, angles)
print(f"结果已保存到 {output_path}")
except Exception as e:
raise RuntimeError(f"保存结果失败: {str(e)}") from e
# 更新成功结果
result.update({
'status': 'success',
'points': len(points),
'max_angle': float(np.max(angles)) if len(angles) > 0 else 0.0
})
except Exception as e:
result.update({
'status': 'error',
'error': str(e)
})
print(f"\n处理失败: {str(e)}")
print("\n处理完成! 状态:", result['status'])
return result
def main():
"""
NOTE: 有问题
批量处理OBJ网格,采样特征点及其二面角
保存为.ptangle格式(每行:x y z dihedral_angle)
"""
parser = argparse.ArgumentParser(description='网格特征点采样工具')
parser.add_argument('-i', '--input_dir', required=True,
help='输入OBJ文件或目录')
parser.add_argument('-o', '--output_dir', required=True,
help='输出.ptangle文件或目录')
parser.add_argument('--edge_min_samples', type=int, default=3,
help='每条边最小采样点数 (默认: 3)')
parser.add_argument('--edge_max_samples', type=int, default=30,
help='每条边最大采样点数 (默认: 30)')
parser.add_argument('-t', '--angle_threshold', type=float, default=30.0,
help='特征二面角阈值(度) (默认: 30)')
parser.add_argument('--min-length', type=float, default=4e-3,
help='最小特征边长度(默认: 0.01)')
parser.add_argument('--samples-per-edge', type=int, default=3,
help='每条边的采样点数(默认: 3)')
parser.add_argument('--merge-groups', action='store_true',
help='合并连接的特征边组')
parser.add_argument('-f', '--force', action='store_true',
help='覆盖已存在的输出文件')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细处理信息')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 获取所有OBJ文件
obj_files = glob.glob(os.path.join(args.input_dir, "*.obj"))
if not obj_files:
print(f"错误: 在 {args.input_dir} 中未找到OBJ文件")
return
# 准备任务列表
tasks = []
for obj_path in obj_files:
file_id = os.path.splitext(os.path.basename(obj_path))[0]
output_path = os.path.join(args.output_dir, f"{file_id}.ptangle")
if not args.force and os.path.exists(output_path):
if args.verbose:
print(f"跳过已存在文件: {output_path}")
continue
tasks.append((obj_path, output_path, args.edge_min_samples, args.edge_max_samples, args.angle_threshold, args.min_length))
# 并行处理配置
cpu_count = min(multiprocessing.cpu_count(), 8)
max_workers = max(1, cpu_count - 1)
# 处理状态统计
stats = {
'total': len(tasks),
'success': 0,
'failure': 0,
'max_angle': 0.0
}
# 进度条包装函数
def update_stats(result):
if result['status'] == 'success':
stats['success'] += 1
stats['max_angle'] = max(stats['max_angle'], result['max_angle'])
if args.verbose:
print(f"成功处理 {os.path.basename(result['file'])}: "
f"{result['points']}点, 最大角度{result['max_angle']:.1f}°")
else:
stats['failure'] += 1
print(f"处理失败 {os.path.basename(result['file'])}: {result['error']}")
# 并行处理
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for task in tasks:
futures.append(executor.submit(process_mesh_to_ptangle, *task))
# 进度显示
with tqdm(total=len(tasks), desc="处理进度") as pbar:
for future in as_completed(futures):
update_stats(future.result())
pbar.update(1)
pbar.set_postfix({
'成功': stats['success'],
'失败': stats['failure'],
'最大角度': f"{stats['max_angle']:.1f}°"
})
# 输出总结
print("\n" + "="*50)
print(f"处理完成! 总文件数: {stats['total']}")
print(f"成功: {stats['success']} | 失败: {stats['failure']}")
print(f"最大二面角: {stats['max_angle']:.1f}°")
print(f"成功率: {stats['success']/stats['total']:.1%}")
print("="*50)
if __name__ == "__main__":
main()