You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
4.4 KiB

import argparse
import os
import glob
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import trimesh
import numpy as np
def sample_points(obj_path: str, num_samples: int) -> np.ndarray:
"""从OBJ文件采样点数据"""
mesh = trimesh.load(obj_path)
assert isinstance(mesh, trimesh.Trimesh), "输入必须是三角网格"
# 采样点坐标和法线
points, face_indices = mesh.sample(num_samples, return_index=True)
normals = mesh.face_normals[face_indices]
# 组合为 [x,y,z, xn,yn,zn, sdf=0] 格式
return np.column_stack([points, normals, np.zeros(len(points))])
def process_single_file(obj_path: str, output_dir: str, num_samples: int) -> dict:
"""处理单个OBJ文件"""
try:
# 生成输出文件名 (id_50k.xyz)
file_id = os.path.splitext(os.path.basename(obj_path))[0]
output_path = os.path.join(output_dir, f"{file_id}_{num_samples//1000}k.xyz")
# 采样并保存
data = sample_points(obj_path, num_samples)
np.savetxt(output_path, data,
fmt="%.6f %.6f %.6f %.6f %.6f %.6f %.3f")
return {
'status': 'success',
'file': obj_path,
'output': output_path,
'points': len(data)
}
except Exception as e:
return {
'status': 'failed',
'file': obj_path,
'error': str(e)
}
def main():
'''
批量处理OBJ文件采样表面点数据
保存格式每行包含 x y z xn yn zn sdf
'''
parser = argparse.ArgumentParser(description='OBJ文件点采样工具')
parser.add_argument('-i', '--input_dir', required=True,
help='包含OBJ文件的输入目录')
parser.add_argument('-o', '--output_dir', required=True,
help='输出目录路径')
parser.add_argument('-n', '--num_samples', type=int, default=50000,
help='每个文件的采样点数 (默认: 50,000)')
parser.add_argument('-f', '--force', action='store_true',
help='覆盖已存在的输出文件')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细处理信息')
args = parser.parse_args()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 获取所有OBJ文件
obj_files = glob.glob(os.path.join(args.input_dir, "*.obj"))
if not obj_files:
print(f"错误: 在 {args.input_dir} 中未找到OBJ文件")
return
# 准备任务列表
tasks = []
for obj_path in obj_files:
file_id = os.path.splitext(os.path.basename(obj_path))[0]
output_path = os.path.join(args.output_dir, f"{file_id}_{args.num_samples//1000}k.xyz")
if not args.force and os.path.exists(output_path):
if args.verbose:
print(f"跳过已存在文件: {output_path}")
continue
tasks.append((obj_path, args.output_dir, args.num_samples))
# 并行处理
cpu_count = min(multiprocessing.cpu_count(), 8)
max_workers = max(1, cpu_count - 1)
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_single_file, *task) for task in tasks]
# 进度跟踪
success = failure = 0
with tqdm(total=len(tasks), desc="处理进度") as pbar:
for future in as_completed(futures):
result = future.result()
if result['status'] == 'success':
success += 1
if args.verbose:
pbar.write(f"成功: {result['output']} (采样点: {result['points']})")
else:
failure += 1
pbar.write(f"失败: {os.path.basename(result['file'])} - {result['error']}")
pbar.update(1)
pbar.set_postfix({
'成功': success,
'失败': failure,
'进度': f"{pbar.n}/{pbar.total}"
})
# 输出总结
print("\n" + "="*50)
print(f"处理完成! 总文件数: {len(tasks)}")
print(f"成功: {success} | 失败: {failure}")
print(f"成功率: {success/len(tasks):.1%}")
print("="*50)
if __name__ == "__main__":
main()