You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
11 KiB
321 lines
11 KiB
from typing import Tuple, Union
|
|
|
|
import argparse
|
|
import os
|
|
import glob
|
|
import multiprocessing
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from tqdm import tqdm
|
|
import trimesh
|
|
import numpy as np
|
|
from scipy.interpolate import interp1d
|
|
|
|
def get_feature_edges(
|
|
mesh: trimesh.Trimesh,
|
|
angle_threshold: float = 30.0
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
从网格中提取特征边及对应二面角
|
|
|
|
参数:
|
|
mesh: 输入的三角网格对象(trimesh.Trimesh)
|
|
angle_threshold: 特征边判定阈值(单位:度),默认30度
|
|
|
|
返回:
|
|
(edges, dihedrals) 元组:
|
|
- edges: 边坐标数组,形状为(N,2,3)
|
|
- dihedrals: 对应二面角数组,形状为(N,)
|
|
"""
|
|
# 获取所有边
|
|
edges = mesh.edges_unique
|
|
edge_vectors = mesh.vertices[edges]
|
|
|
|
# 计算每条边的二面角
|
|
dihedrals = []
|
|
for edge in edges:
|
|
# 获取共享该边的面
|
|
faces = mesh.face_adjacency_edges
|
|
edge_faces = np.where(faces == edge)[0]
|
|
|
|
print(len(edge_faces))
|
|
|
|
if len(edge_faces) == 2: # 只处理有两个相邻面的边
|
|
# 计算面法向量夹角(二面角)
|
|
normals = mesh.face_normals[mesh.face_adjacency[edge_faces]]
|
|
angle = np.degrees(np.arccos(np.clip(np.dot(normals[0], normals[1]), -1, 1)))
|
|
dihedrals.append(angle)
|
|
else:
|
|
dihedrals.append(0)
|
|
|
|
dihedrals = np.array(dihedrals)
|
|
|
|
# 筛选特征边
|
|
feature_mask = dihedrals > angle_threshold
|
|
feature_edges = edge_vectors[feature_mask]
|
|
feature_dihedrals = dihedrals[feature_mask]
|
|
|
|
|
|
return feature_edges, feature_dihedrals
|
|
|
|
|
|
def sample_feature_edges(
|
|
edges: np.ndarray,
|
|
dihedrals: np.ndarray,
|
|
min_length: float = 4e-3,
|
|
min_samples: int = 3,
|
|
max_samples: int = 20,
|
|
sampling_dist: str = 'cosine',
|
|
return_indices: bool = False
|
|
) -> Union[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]]:
|
|
"""
|
|
对特征边进行采样
|
|
|
|
参数:
|
|
edges: 边坐标数组,形状为(N,2,3)
|
|
dihedrals: 对应二面角数组,形状为(N,)
|
|
min_length: 最小边长度阈值(单位:米),默认0.1
|
|
min_samples: 每条边最少采样点数,默认3
|
|
max_samples: 每条边最多采样点数,默认20
|
|
sampling_dist: 采样分布类型:
|
|
'linear' - 线性均匀采样
|
|
'cosine' - 余弦分布(中间密集)
|
|
'quadratic' - 二次方分布
|
|
return_indices: 是否返回原始边索引
|
|
|
|
返回:
|
|
默认返回 (sampled_points, sampled_dihedrals)
|
|
当return_indices=True时返回 (sampled_points, sampled_dihedrals, edge_indices)
|
|
"""
|
|
sampled_points = []
|
|
sampled_dihedrals = []
|
|
|
|
for edge, dihedral in zip(edges, dihedrals):
|
|
# 计算边长度
|
|
length = np.linalg.norm(edge[1] - edge[0])
|
|
|
|
# 过滤短边
|
|
if length < min_length:
|
|
continue
|
|
|
|
# 动态计算采样点数(基于长度)
|
|
n_samples = min(max_samples, max(min_samples, int(length * 10)))
|
|
|
|
# 生成采样参数(使用平方分布使中心区域采样更密集)
|
|
t = np.linspace(0, 1, n_samples)
|
|
t = 0.5 * (1 - np.cos(t * np.pi)) # 转换为非线性分布
|
|
|
|
# 插值采样
|
|
interpolator = interp1d([0, 1], edge, axis=0)
|
|
points = interpolator(t)
|
|
|
|
sampled_points.extend(points)
|
|
sampled_dihedrals.extend([dihedral] * n_samples)
|
|
|
|
return np.array(sampled_points), np.array(sampled_dihedrals)
|
|
|
|
|
|
def save_ptangle(filename, points, angles):
|
|
"""
|
|
保存为.ptangle格式:
|
|
每行: x y z angle(度)
|
|
"""
|
|
data = np.column_stack([points, angles])
|
|
np.savetxt(filename, data, fmt='%.6f %.6f %.6f %.2f')
|
|
|
|
def load_ptangle(filename):
|
|
"""
|
|
加载.ptangle文件
|
|
返回: (points, angles)
|
|
"""
|
|
data = np.loadtxt(filename)
|
|
return data[:, :3], data[:, 3]
|
|
|
|
def process_mesh_to_ptangle(obj_path, output_path, edge_min_samples=3, edge_max_samples=20, angle_threshold=30, min_edge_length=0.01):
|
|
"""
|
|
完整处理流程:
|
|
1. 加载OBJ网格
|
|
2. 检测特征边
|
|
3. 采样特征点
|
|
4. 保存为.ptangle格式
|
|
|
|
返回:
|
|
dict: 包含处理状态和结果信息的字典
|
|
"""
|
|
result = {
|
|
'status': 'unknown',
|
|
'file': obj_path,
|
|
'points': 0,
|
|
'max_angle': 0,
|
|
'error': None
|
|
}
|
|
|
|
try:
|
|
# 1. 加载网格
|
|
try:
|
|
mesh = trimesh.load(obj_path)
|
|
if not isinstance(mesh, trimesh.Trimesh):
|
|
raise ValueError("Input must be a triangular mesh")
|
|
print(f"成功加载网格: 顶点数={len(mesh.vertices)}, 面数={len(mesh.faces)}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"网格加载失败: {str(e)}") from e
|
|
|
|
# 2. 网格修复
|
|
try:
|
|
if not mesh.is_watertight:
|
|
print("检测到非水密网格,正在填充孔洞...")
|
|
mesh.fill_holes()
|
|
except Exception as e:
|
|
raise RuntimeError(f"网格修复失败: {str(e)}") from e
|
|
|
|
# 3. 特征边检测
|
|
try:
|
|
edges, dihedrals = get_feature_edges(
|
|
mesh=mesh,
|
|
angle_threshold=angle_threshold,
|
|
)
|
|
print(f"找到 {len(edges)} 条特征边")
|
|
if len(edges) == 0:
|
|
print("警告: 未检测到任何特征边")
|
|
except Exception as e:
|
|
raise RuntimeError(f"特征边检测失败: {str(e)}") from e
|
|
|
|
# 4. 特征边采样
|
|
try:
|
|
points, angles = sample_feature_edges(
|
|
edges=edges,
|
|
dihedrals=dihedrals,
|
|
min_length=min_edge_length,
|
|
min_samples=edge_min_samples,
|
|
max_samples=edge_max_samples
|
|
)
|
|
print(f"生成 {len(points)} 个采样点")
|
|
if len(points) == 0:
|
|
raise RuntimeError("采样结果为空")
|
|
except Exception as e:
|
|
raise RuntimeError(f"采样失败: {str(e)}") from e
|
|
|
|
# 5. 保存结果
|
|
try:
|
|
save_ptangle(output_path, points, angles)
|
|
print(f"结果已保存到 {output_path}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"保存结果失败: {str(e)}") from e
|
|
|
|
# 更新成功结果
|
|
result.update({
|
|
'status': 'success',
|
|
'points': len(points),
|
|
'max_angle': float(np.max(angles)) if len(angles) > 0 else 0.0
|
|
})
|
|
|
|
except Exception as e:
|
|
result.update({
|
|
'status': 'error',
|
|
'error': str(e)
|
|
})
|
|
print(f"\n处理失败: {str(e)}")
|
|
|
|
print("\n处理完成! 状态:", result['status'])
|
|
return result
|
|
|
|
def main():
|
|
"""
|
|
NOTE: 有问题
|
|
批量处理OBJ网格,采样特征点及其二面角
|
|
保存为.ptangle格式(每行:x y z dihedral_angle)
|
|
"""
|
|
parser = argparse.ArgumentParser(description='网格特征点采样工具')
|
|
parser.add_argument('-i', '--input_dir', required=True,
|
|
help='输入OBJ文件或目录')
|
|
parser.add_argument('-o', '--output_dir', required=True,
|
|
help='输出.ptangle文件或目录')
|
|
parser.add_argument('--edge_min_samples', type=int, default=3,
|
|
help='每条边最小采样点数 (默认: 3)')
|
|
parser.add_argument('--edge_max_samples', type=int, default=30,
|
|
help='每条边最大采样点数 (默认: 30)')
|
|
parser.add_argument('-t', '--angle_threshold', type=float, default=30.0,
|
|
help='特征二面角阈值(度) (默认: 30)')
|
|
parser.add_argument('--min-length', type=float, default=4e-3,
|
|
help='最小特征边长度(默认: 0.01)')
|
|
parser.add_argument('--samples-per-edge', type=int, default=3,
|
|
help='每条边的采样点数(默认: 3)')
|
|
parser.add_argument('--merge-groups', action='store_true',
|
|
help='合并连接的特征边组')
|
|
parser.add_argument('-f', '--force', action='store_true',
|
|
help='覆盖已存在的输出文件')
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='显示详细处理信息')
|
|
args = parser.parse_args()
|
|
|
|
# 创建输出目录
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
|
|
# 获取所有OBJ文件
|
|
obj_files = glob.glob(os.path.join(args.input_dir, "*.obj"))
|
|
if not obj_files:
|
|
print(f"错误: 在 {args.input_dir} 中未找到OBJ文件")
|
|
return
|
|
|
|
# 准备任务列表
|
|
tasks = []
|
|
for obj_path in obj_files:
|
|
file_id = os.path.splitext(os.path.basename(obj_path))[0]
|
|
output_path = os.path.join(args.output_dir, f"{file_id}.ptangle")
|
|
|
|
if not args.force and os.path.exists(output_path):
|
|
if args.verbose:
|
|
print(f"跳过已存在文件: {output_path}")
|
|
continue
|
|
tasks.append((obj_path, output_path, args.edge_min_samples, args.edge_max_samples, args.angle_threshold, args.min_length))
|
|
|
|
# 并行处理配置
|
|
cpu_count = min(multiprocessing.cpu_count(), 8)
|
|
max_workers = max(1, cpu_count - 1)
|
|
|
|
# 处理状态统计
|
|
stats = {
|
|
'total': len(tasks),
|
|
'success': 0,
|
|
'failure': 0,
|
|
'max_angle': 0.0
|
|
}
|
|
|
|
# 进度条包装函数
|
|
def update_stats(result):
|
|
if result['status'] == 'success':
|
|
stats['success'] += 1
|
|
stats['max_angle'] = max(stats['max_angle'], result['max_angle'])
|
|
if args.verbose:
|
|
print(f"成功处理 {os.path.basename(result['file'])}: "
|
|
f"{result['points']}点, 最大角度{result['max_angle']:.1f}°")
|
|
else:
|
|
stats['failure'] += 1
|
|
print(f"处理失败 {os.path.basename(result['file'])}: {result['error']}")
|
|
|
|
# 并行处理
|
|
with ProcessPoolExecutor(max_workers=max_workers) as executor:
|
|
futures = []
|
|
for task in tasks:
|
|
futures.append(executor.submit(process_mesh_to_ptangle, *task))
|
|
|
|
# 进度显示
|
|
with tqdm(total=len(tasks), desc="处理进度") as pbar:
|
|
for future in as_completed(futures):
|
|
update_stats(future.result())
|
|
pbar.update(1)
|
|
pbar.set_postfix({
|
|
'成功': stats['success'],
|
|
'失败': stats['failure'],
|
|
'最大角度': f"{stats['max_angle']:.1f}°"
|
|
})
|
|
|
|
# 输出总结
|
|
print("\n" + "="*50)
|
|
print(f"处理完成! 总文件数: {stats['total']}")
|
|
print(f"成功: {stats['success']} | 失败: {stats['failure']}")
|
|
print(f"最大二面角: {stats['max_angle']:.1f}°")
|
|
print(f"成功率: {stats['success']/stats['total']:.1%}")
|
|
print("="*50)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|