You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

323 lines
11 KiB

from typing import Tuple, Union
import argparse
import os
import glob
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import trimesh
import numpy as np
from scipy.spatial import cKDTree
from scipy.interpolate import interp1d
def get_feature_edges(
mesh: trimesh.Trimesh,
angle_threshold: float = 30.0
) -> Tuple[np.ndarray, np.ndarray]:
"""
从网格中提取特征边及对应二面角
参数:
mesh: 输入的三角网格对象(trimesh.Trimesh)
angle_threshold: 特征边判定阈值(单位:度),默认30度
返回:
(edges, dihedrals) 元组:
- edges: 边坐标数组,形状为(N,2,3)
- dihedrals: 对应二面角数组,形状为(N,)
"""
# 获取所有边
edges = mesh.edges_unique
edge_vectors = mesh.vertices[edges]
# 计算每条边的二面角
dihedrals = []
for edge in edges:
# 获取共享该边的面
faces = mesh.face_adjacency_edges
edge_faces = np.where(faces == edge)[0]
print(len(edge_faces))
if len(edge_faces) == 2: # 只处理有两个相邻面的边
# 计算面法向量夹角(二面角)
normals = mesh.face_normals[mesh.face_adjacency[edge_faces]]
angle = np.degrees(np.arccos(np.clip(np.dot(normals[0], normals[1]), -1, 1)))
dihedrals.append(angle)
else:
dihedrals.append(0)
dihedrals = np.array(dihedrals)
# 筛选特征边
feature_mask = dihedrals > angle_threshold
feature_edges = edge_vectors[feature_mask]
feature_dihedrals = dihedrals[feature_mask]
return feature_edges, feature_dihedrals
def sample_feature_edges(
edges: np.ndarray,
dihedrals: np.ndarray,
min_length: float = 4e-3,
min_samples: int = 3,
max_samples: int = 20,
sampling_dist: str = 'cosine',
return_indices: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
对特征边进行采样
参数:
edges: 边坐标数组,形状为(N,2,3)
dihedrals: 对应二面角数组,形状为(N,)
min_length: 最小边长度阈值(单位:米),默认0.1
min_samples: 每条边最少采样点数,默认3
max_samples: 每条边最多采样点数,默认20
sampling_dist: 采样分布类型:
'linear' - 线性均匀采样
'cosine' - 余弦分布(中间密集)
'quadratic' - 二次方分布
return_indices: 是否返回原始边索引
返回:
默认返回 (sampled_points, sampled_dihedrals)
当return_indices=True时返回 (sampled_points, sampled_dihedrals, edge_indices)
"""
sampled_points = []
sampled_dihedrals = []
for edge, dihedral in zip(edges, dihedrals):
# 计算边长度
length = np.linalg.norm(edge[1] - edge[0])
# 过滤短边
if length < min_length:
continue
# 动态计算采样点数(基于长度)
n_samples = min(max_samples, max(min_samples, int(length * 10)))
# 生成采样参数(使用平方分布使中心区域采样更密集)
t = np.linspace(0, 1, n_samples)
t = 0.5 * (1 - np.cos(t * np.pi)) # 转换为非线性分布
# 插值采样
interpolator = interp1d([0, 1], edge, axis=0)
points = interpolator(t)
sampled_points.extend(points)
sampled_dihedrals.extend([dihedral] * n_samples)
return np.array(sampled_points), np.array(sampled_dihedrals)
def save_ptangle(filename, points, angles):
"""
保存为.ptangle格式:
每行: x y z angle(度)
"""
data = np.column_stack([points, angles])
np.savetxt(filename, data, fmt='%.6f %.6f %.6f %.2f')
def load_ptangle(filename):
"""
加载.ptangle文件
返回: (points, angles)
"""
data = np.loadtxt(filename)
return data[:, :3], data[:, 3]
def process_mesh_to_ptangle(obj_path, output_path, edge_min_samples=3, edge_max_samples=20, angle_threshold=30, min_edge_length=0.01):
"""
完整处理流程:
1. 加载OBJ网格
2. 检测特征边
3. 采样特征点
4. 保存为.ptangle格式
返回:
dict: 包含处理状态和结果信息的字典
"""
result = {
'status': 'unknown',
'file': obj_path,
'points': 0,
'max_angle': 0,
'error': None
}
try:
# 1. 加载网格
try:
mesh = trimesh.load(obj_path)
if not isinstance(mesh, trimesh.Trimesh):
raise ValueError("Input must be a triangular mesh")
print(f"成功加载网格: 顶点数={len(mesh.vertices)}, 面数={len(mesh.faces)}")
except Exception as e:
raise RuntimeError(f"网格加载失败: {str(e)}") from e
# 2. 网格修复
try:
if not mesh.is_watertight:
print("检测到非水密网格,正在填充孔洞...")
mesh.fill_holes()
except Exception as e:
raise RuntimeError(f"网格修复失败: {str(e)}") from e
# 3. 特征边检测
try:
edges, dihedrals = get_feature_edges(
mesh=mesh,
angle_threshold=angle_threshold,
)
print(f"找到 {len(edges)} 条特征边")
if len(edges) == 0:
print("警告: 未检测到任何特征边")
except Exception as e:
raise RuntimeError(f"特征边检测失败: {str(e)}") from e
# 4. 特征边采样
try:
points, angles = sample_feature_edges(
edges=edges,
dihedrals=dihedrals,
min_length=min_edge_length,
min_samples=edge_min_samples,
max_samples=edge_max_samples
)
print(f"生成 {len(points)} 个采样点")
if len(points) == 0:
raise RuntimeError("采样结果为空")
except Exception as e:
raise RuntimeError(f"采样失败: {str(e)}") from e
# 5. 保存结果
try:
save_ptangle(output_path, points, angles)
print(f"结果已保存到 {output_path}")
except Exception as e:
raise RuntimeError(f"保存结果失败: {str(e)}") from e
# 更新成功结果
result.update({
'status': 'success',
'points': len(points),
'max_angle': float(np.max(angles)) if len(angles) > 0 else 0.0
})
except Exception as e:
result.update({
'status': 'error',
'error': str(e)
})
print(f"\n处理失败: {str(e)}")
print("\n处理完成! 状态:", result['status'])
return result
def main():
"""
NOTE: 有问题
批量处理OBJ网格,采样特征点及其二面角
保存为.ptangle格式(每行:x y z dihedral_angle)
"""
parser = argparse.ArgumentParser(description='网格特征点采样工具')
parser.add_argument('-i', '--input_dir', required=True,
help='输入OBJ文件或目录')
parser.add_argument('-o', '--output_dir', required=True,
help='输出.ptangle文件或目录')
parser.add_argument('--edge_min_samples', type=int, default=3,
help='每条边最小采样点数 (默认: 3)')
parser.add_argument('--edge_max_samples', type=int, default=30,
help='每条边最大采样点数 (默认: 30)')
parser.add_argument('-t', '--angle_threshold', type=float, default=30.0,
help='特征二面角阈值(度) (默认: 30)')
parser.add_argument('--min-length', type=float, default=4e-3,
help='最小特征边长度(默认: 0.01)')
parser.add_argument('--samples-per-edge', type=int, default=3,
help='每条边的采样点数(默认: 3)')
parser.add_argument('--merge-groups', action='store_true',
help='合并连接的特征边组')
parser.add_argument('-f', '--force', action='store_true',
help='覆盖已存在的输出文件')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细处理信息')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 获取所有OBJ文件
obj_files = glob.glob(os.path.join(args.input_dir, "*.obj"))
if not obj_files:
print(f"错误: 在 {args.input_dir} 中未找到OBJ文件")
return
# 准备任务列表
tasks = []
for obj_path in obj_files:
file_id = os.path.splitext(os.path.basename(obj_path))[0]
output_path = os.path.join(args.output_dir, f"{file_id}.ptangle")
if not args.force and os.path.exists(output_path):
if args.verbose:
print(f"跳过已存在文件: {output_path}")
continue
tasks.append((obj_path, output_path, args.edge_min_samples, args.edge_max_samples, args.angle_threshold, args.min_length))
# 并行处理配置
cpu_count = min(multiprocessing.cpu_count(), 8)
max_workers = max(1, cpu_count - 1)
# 处理状态统计
stats = {
'total': len(tasks),
'success': 0,
'failure': 0,
'max_angle': 0.0
}
# 进度条包装函数
def update_stats(result):
if result['status'] == 'success':
stats['success'] += 1
stats['max_angle'] = max(stats['max_angle'], result['max_angle'])
if args.verbose:
print(f"成功处理 {os.path.basename(result['file'])}: "
f"{result['points']}点, 最大角度{result['max_angle']:.1f}°")
else:
stats['failure'] += 1
print(f"处理失败 {os.path.basename(result['file'])}: {result['error']}")
# 并行处理
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for task in tasks:
futures.append(executor.submit(process_mesh_to_ptangle, *task))
# 进度显示
with tqdm(total=len(tasks), desc="处理进度") as pbar:
for future in as_completed(futures):
update_stats(future.result())
pbar.update(1)
pbar.set_postfix({
'成功': stats['success'],
'失败': stats['failure'],
'最大角度': f"{stats['max_angle']:.1f}°"
})
# 输出总结
print("\n" + "="*50)
print(f"处理完成! 总文件数: {stats['total']}")
print(f"成功: {stats['success']} | 失败: {stats['failure']}")
print(f"最大二面角: {stats['max_angle']:.1f}°")
print(f"成功率: {stats['success']/stats['total']:.1%}")
print("="*50)
if __name__ == "__main__":
main()