321 lines
12 KiB
Python
321 lines
12 KiB
Python
import os
|
||
import sys
|
||
import argparse
|
||
import numpy as np
|
||
import json
|
||
import glob
|
||
import time
|
||
import cv2
|
||
from mtcnn.mtcnn import MTCNN
|
||
import kp
|
||
|
||
# 引入之前的功能
|
||
from nef_test import (
|
||
load_image_safe, landmarks, affine_matrix, extract_vector_data,
|
||
load_vector, cosine_similarity, SCPU_FW_PATH, NCPU_FW_PATH, visualize_alignment
|
||
)
|
||
|
||
# 預設參數
|
||
MODEL_FILE_PATH = 'R34_G369K.nef'
|
||
VECTOR_DATABASE_DIR = 'face_vectors'
|
||
SIMILARITY_THRESHOLD = 0.5 # 相似度閾值,可根據需要調整
|
||
|
||
def get_face_vector(device_group, model_nef_descriptor, image_path):
|
||
"""從圖像中擷取人臉向量"""
|
||
|
||
# 創建MTCNN檢測器
|
||
detector = MTCNN(device="CPU:0")
|
||
|
||
# 載入圖像
|
||
try:
|
||
img_rgb, img_bgr = load_image_safe(image_path)
|
||
print(f" - 已載入圖像: {image_path}")
|
||
except Exception as e:
|
||
print(f"錯誤: {str(e)}")
|
||
return None
|
||
|
||
# 獲取人臉特徵點並計算仿射變換矩陣
|
||
try:
|
||
lmks = landmarks(detector, img_rgb)
|
||
mat, size = affine_matrix(lmks)
|
||
print(" - 已檢測到人臉特徵點")
|
||
except Exception as e:
|
||
print(f"錯誤: {str(e)}")
|
||
return None
|
||
|
||
# 應用仿射變換
|
||
aligned_img = cv2.warpAffine(img_bgr, mat, size)
|
||
|
||
# visualize_alignment(img_bgr, lmks, aligned_img)
|
||
|
||
# 轉換為BGR565格式並調整大小
|
||
aligned_img_bgr565 = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2BGR565)
|
||
img_bgr565 = cv2.resize(aligned_img_bgr565, (112, 112), interpolation=cv2.INTER_LINEAR)
|
||
print(" - 已對齊圖像並格式化")
|
||
|
||
# 準備通用圖像推理輸入描述符
|
||
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
||
model_id=model_nef_descriptor.models[0].id,
|
||
inference_number=0,
|
||
input_node_image_list=[
|
||
kp.GenericInputNodeImage(
|
||
image=img_bgr565,
|
||
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
||
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
||
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
||
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
||
)
|
||
]
|
||
)
|
||
|
||
# 開始推理工作
|
||
try:
|
||
kp.inference.generic_image_inference_send(
|
||
device_group=device_group,
|
||
generic_inference_input_descriptor=generic_inference_input_descriptor
|
||
)
|
||
|
||
generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
|
||
print(" - 推理成功完成")
|
||
except kp.ApiKPException as exception:
|
||
print(f' - 錯誤: 推理失敗,錯誤 = {exception}')
|
||
return None
|
||
|
||
# 獲取推理節點輸出
|
||
inf_node_output_list = []
|
||
for node_idx in range(generic_raw_result.header.num_output_node):
|
||
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=node_idx,
|
||
generic_raw_result=generic_raw_result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
inf_node_output_list.append(inference_float_node_output)
|
||
|
||
# 獲取人臉向量
|
||
if len(inf_node_output_list) > 0:
|
||
face_vector = inf_node_output_list[0]
|
||
# 轉換向量為標準NumPy數組
|
||
face_vector_np = extract_vector_data(face_vector)
|
||
|
||
# Ensure the vector is flattened to 1D
|
||
face_vector_np = face_vector_np.flatten()
|
||
|
||
# Print shape for debugging
|
||
print(f" - Face vector shape: {face_vector_np.shape}")
|
||
|
||
return face_vector_np
|
||
else:
|
||
print("錯誤: 推理結果中沒有找到輸出節點")
|
||
return None
|
||
|
||
def load_face_database(database_dir):
|
||
"""載入人臉資料庫"""
|
||
face_db = []
|
||
|
||
# 確保資料庫目錄存在
|
||
if not os.path.exists(database_dir):
|
||
print(f"警告: 人臉資料庫目錄 '{database_dir}' 不存在,將創建該目錄")
|
||
os.makedirs(database_dir)
|
||
return face_db
|
||
|
||
# 查找所有支持的向量文件
|
||
vector_files = []
|
||
vector_files.extend(glob.glob(os.path.join(database_dir, "*.npy")))
|
||
vector_files.extend(glob.glob(os.path.join(database_dir, "*.json")))
|
||
vector_files.extend(glob.glob(os.path.join(database_dir, "*.pkl")))
|
||
|
||
# 載入每個向量文件
|
||
for vector_file in vector_files:
|
||
try:
|
||
# 根據文件擴展名自動確定格式
|
||
file_ext = os.path.splitext(vector_file)[1].lower()
|
||
if file_ext == '.npy':
|
||
vector = load_vector(vector_file, format='numpy')
|
||
# 嘗試從文件名中提取標識信息
|
||
name = os.path.basename(vector_file).replace('.npy', '')
|
||
metadata = {'name': name}
|
||
face_db.append({
|
||
'vector': vector,
|
||
'metadata': metadata,
|
||
'file_path': vector_file
|
||
})
|
||
elif file_ext == '.json':
|
||
vector, metadata = load_vector(vector_file, format='json')
|
||
face_db.append({
|
||
'vector': vector,
|
||
'metadata': metadata or {'name': os.path.basename(vector_file).replace('.json', '')},
|
||
'file_path': vector_file
|
||
})
|
||
elif file_ext == '.pkl':
|
||
vector = load_vector(vector_file, format='pickle')
|
||
# 嘗試從文件名中提取標識信息
|
||
name = os.path.basename(vector_file).replace('.pkl', '')
|
||
metadata = {'name': name}
|
||
face_db.append({
|
||
'vector': vector,
|
||
'metadata': metadata,
|
||
'file_path': vector_file
|
||
})
|
||
|
||
print(f"已載入人臉向量: {vector_file}")
|
||
except Exception as e:
|
||
print(f"警告: 無法載入向量文件 '{vector_file}': {str(e)}")
|
||
|
||
print(f"成功載入 {len(face_db)} 個人臉向量")
|
||
return face_db
|
||
|
||
def recognize_face(new_face_vector, face_database, threshold=SIMILARITY_THRESHOLD):
|
||
"""識別人臉,將新的人臉向量與資料庫中的向量進行比較"""
|
||
if not face_database:
|
||
return None, 0.0
|
||
|
||
max_similarity = 0.0
|
||
best_match = None
|
||
|
||
for face_entry in face_database:
|
||
stored_vector = face_entry['vector']
|
||
similarity = cosine_similarity(new_face_vector, stored_vector)
|
||
|
||
if similarity > max_similarity:
|
||
max_similarity = similarity
|
||
best_match = face_entry
|
||
|
||
# 如果最高相似度超過閾值,則返回匹配結果
|
||
if max_similarity >= threshold:
|
||
return best_match, max_similarity
|
||
else:
|
||
return None, max_similarity
|
||
|
||
def add_face_to_database(face_vector, database_dir, name=None, image_path=None, format='json'):
|
||
"""添加新的人臉向量到資料庫"""
|
||
if not os.path.exists(database_dir):
|
||
os.makedirs(database_dir)
|
||
|
||
# 創建唯一的文件名
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
if name:
|
||
file_name = f"{name}_{timestamp}"
|
||
else:
|
||
file_name = f"unknown_{timestamp}"
|
||
|
||
# 創建元數據
|
||
metadata = {
|
||
'name': name or 'unknown',
|
||
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
'image_path': image_path
|
||
}
|
||
|
||
# 根據格式保存向量
|
||
if format == 'numpy':
|
||
file_path = os.path.join(database_dir, f"{file_name}.npy")
|
||
np.save(file_path, face_vector)
|
||
elif format == 'pickle':
|
||
import pickle
|
||
file_path = os.path.join(database_dir, f"{file_name}.pkl")
|
||
with open(file_path, 'wb') as f:
|
||
pickle.dump(face_vector, f)
|
||
else: # 預設使用JSON格式
|
||
file_path = os.path.join(database_dir, f"{file_name}.json")
|
||
data = {
|
||
'vector': face_vector.tolist(),
|
||
'metadata': metadata
|
||
}
|
||
with open(file_path, 'w') as f:
|
||
json.dump(data, f)
|
||
|
||
print(f"已將新的人臉向量保存到: {file_path}")
|
||
return file_path
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='人臉識別和比對系統')
|
||
parser.add_argument('-p', '--port_id', help='使用指定的端口ID連接設備', default=28, type=int)
|
||
parser.add_argument('-m', '--model', help=f'模型文件路徑 (.nef) (預設: {MODEL_FILE_PATH})',
|
||
default=MODEL_FILE_PATH, type=str)
|
||
parser.add_argument('-i', '--img', help='待識別的圖像文件路徑', required=True, type=str)
|
||
parser.add_argument('-d', '--database', help=f'人臉向量資料庫目錄 (預設: {VECTOR_DATABASE_DIR})',
|
||
default=VECTOR_DATABASE_DIR, type=str)
|
||
parser.add_argument('-t', '--threshold', help=f'相似度閾值 (預設: {SIMILARITY_THRESHOLD})',
|
||
default=SIMILARITY_THRESHOLD, type=float)
|
||
parser.add_argument('-a', '--add', help='將新人臉添加到資料庫 (如果未識別)', action='store_true')
|
||
parser.add_argument('-n', '--name', help='新人臉的名稱 (與 --add 一起使用)', default=None, type=str)
|
||
parser.add_argument('-f', '--format', help='向量儲存格式: numpy, pickle, 或 json (預設: json)',
|
||
default='json', choices=['numpy', 'pickle', 'json'], type=str)
|
||
args = parser.parse_args()
|
||
|
||
# 連接設備
|
||
try:
|
||
print('[連接設備]')
|
||
device_group = kp.core.connect_devices(usb_port_ids=[args.port_id])
|
||
print(' - 成功')
|
||
except kp.ApiKPException as exception:
|
||
print(f'錯誤: 連接設備失敗, 端口ID = \'{args.port_id}\',錯誤信息: [{str(exception)}]')
|
||
return
|
||
|
||
# 設置USB通信超時
|
||
print('[設置設備超時]')
|
||
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
|
||
print(' - 成功')
|
||
|
||
# 上傳固件到設備
|
||
try:
|
||
print('[上傳固件]')
|
||
kp.core.load_firmware_from_file(device_group=device_group,
|
||
scpu_fw_path=SCPU_FW_PATH,
|
||
ncpu_fw_path=NCPU_FW_PATH)
|
||
print(' - 成功')
|
||
except kp.ApiKPException as exception:
|
||
print(f'錯誤: 上傳固件失敗,錯誤 = \'{str(exception)}\'')
|
||
return
|
||
|
||
# 上傳模型到設備
|
||
try:
|
||
print('[上傳模型]')
|
||
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
|
||
file_path=args.model)
|
||
print(' - 成功')
|
||
except kp.ApiKPException as exception:
|
||
print(f'錯誤: 上傳模型失敗,錯誤 = \'{str(exception)}\'')
|
||
return
|
||
|
||
# 載入人臉資料庫
|
||
print('[載入人臉資料庫]')
|
||
face_database = load_face_database(args.database)
|
||
|
||
# 從圖像中提取人臉向量
|
||
print('[處理輸入圖像]')
|
||
new_face_vector = get_face_vector(device_group, model_nef_descriptor, args.img)
|
||
|
||
if new_face_vector is not None:
|
||
# 識別人臉
|
||
print('[識別人臉]')
|
||
match, similarity = recognize_face(new_face_vector, face_database, args.threshold)
|
||
|
||
if match:
|
||
metadata = match['metadata']
|
||
name = metadata.get('name', '未知')
|
||
print(f"[結果] 識別為: {name}")
|
||
print(f" - 相似度: {similarity:.4f}")
|
||
print(f" - 來源文件: {match['file_path']}")
|
||
if 'timestamp' in metadata:
|
||
print(f" - 記錄時間: {metadata['timestamp']}")
|
||
else:
|
||
print(f"[結果] 未能識別人臉 (最高相似度: {similarity:.4f}, 閾值: {args.threshold})")
|
||
|
||
# 如果指定了--add參數,則將新的人臉添加到資料庫
|
||
if args.add:
|
||
print('[添加新人臉到資料庫]')
|
||
file_path = add_face_to_database(
|
||
new_face_vector,
|
||
args.database,
|
||
name=args.name,
|
||
image_path=args.img,
|
||
format=args.format
|
||
)
|
||
print(f"[添加完成] 已添加新人臉: {args.name or '未命名'}")
|
||
|
||
# 清理
|
||
kp.core.disconnect_devices(device_group=device_group)
|
||
print("[清理] 設備已斷開連接")
|
||
|
||
if __name__ == '__main__':
|
||
main() |