
| from camel.agents import ChatAgent, TaskSpecifyAgent from camel.models import ModelFactory from camel.prompts import TextPrompt from camel.types import ModelPlatformType, TaskType from camel.societies import RolePlaying from camel.societies.workforce import Workforce from camel.toolkits import SearchToolkit, MathToolkit from camel.memories import ChatHistoryBlock from camel.memories.records import MemoryRecord from camel.messages import BaseMessage from camel.types import OpenAIBackendRole from camel.tasks import Task from colorama import Fore import os import random import json import re import time import traceback from typing import Dict, List, Tuple, Optional, Callable, Any from datetime import datetime from functools import wraps from dotenv import load_dotenv
load_dotenv() api_key = os.getenv('MODELSCOPE_SDK_TOKEN')
class RetryConfig: """重试配置类""" def __init__(self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0, jitter: bool = True): self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay self.backoff_factor = backoff_factor self.jitter = jitter
class RetryableError(Exception): """可重试的错误类型""" pass
class NonRetryableError(Exception): """不可重试的错误类型""" pass
def smart_retry(config: RetryConfig = None, retryable_exceptions: Tuple = (Exception,), non_retryable_exceptions: Tuple = (KeyboardInterrupt, SystemExit)): """智能重试装饰器""" if config is None: config = RetryConfig() def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> Any: last_exception = None for attempt in range(config.max_retries + 1): try: return func(*args, **kwargs) except non_retryable_exceptions as e: print(Fore.RED + f"不可重试错误: {e}") raise e except retryable_exceptions as e: last_exception = e if attempt == config.max_retries: print(Fore.RED + f"重试{config.max_retries}次后仍然失败: {e}") break delay = min( config.base_delay * (config.backoff_factor ** attempt), config.max_delay ) if config.jitter: delay *= (0.5 + random.random() * 0.5) print(Fore.YELLOW + f"第{attempt + 1}次尝试失败: {e}") print(Fore.CYAN + f"{delay:.1f}秒后重试...") time.sleep(delay) if last_exception: raise last_exception return wrapper return decorator
class ErrorHandler: """错误处理和分类系统""" @staticmethod def classify_error(error: Exception) -> str: """分类错误类型""" error_str = str(error).lower() error_type = type(error).__name__ if any(keyword in error_str for keyword in ['connection', 'timeout', 'network', 'http']): return "network_error" if any(keyword in error_str for keyword in ['api', 'rate limit', 'quota', 'unauthorized']): return "api_error" if any(keyword in error_str for keyword in ['model', 'inference', 'generation']): return "model_error" if any(keyword in error_str for keyword in ['json', 'parse', 'format', 'decode']): return "data_error" if any(keyword in error_str for keyword in ['memory', 'out of memory', 'oom']): return "memory_error" return "unknown_error" @staticmethod def get_retry_config(error_type: str) -> RetryConfig: """根据错误类型获取重试配置""" configs = { "network_error": RetryConfig(max_retries=5, base_delay=2.0, backoff_factor=1.5), "api_error": RetryConfig(max_retries=3, base_delay=5.0, backoff_factor=2.0), "model_error": RetryConfig(max_retries=2, base_delay=3.0, backoff_factor=2.0), "data_error": RetryConfig(max_retries=2, base_delay=1.0, backoff_factor=1.5), "memory_error": RetryConfig(max_retries=1, base_delay=10.0, backoff_factor=1.0), "unknown_error": RetryConfig(max_retries=2, base_delay=2.0, backoff_factor=2.0) } return configs.get(error_type, RetryConfig()) @staticmethod def handle_error_with_fallback(error: Exception, fallback_func: Callable = None) -> Any: """带降级策略的错误处理""" error_type = ErrorHandler.classify_error(error) print(Fore.RED + f"❌ 检测到错误类型: {error_type}") print(Fore.RED + f"❌ 错误详情: {error}") if fallback_func: try: print(Fore.YELLOW + "🔄 尝试使用降级策略...") return fallback_func() except Exception as fallback_error: print(Fore.RED + f"❌ 降级策略也失败了: {fallback_error}") return None
@smart_retry(RetryConfig(max_retries=3, base_delay=2.0)) def create_model_with_retry(): """带重试机制的模型创建""" try: return ModelFactory.create( model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL, model_type="Qwen/Qwen2.5-72B-Instruct", url='https://api-inference.modelscope.cn/v1/', api_key=api_key ) except Exception as e: error_type = ErrorHandler.classify_error(e) print(Fore.RED + f"❌ 模型创建失败 ({error_type}): {e}") raise RetryableError(f"模型创建失败: {e}")
model = create_model_with_retry()
class RequirementPriorityManager: """用户需求优先级管理系统""" def __init__(self): self.requirement_types = { 'sport_preference': {'priority': 10, 'name': '运动类型', 'negotiable': False}, 'location': {'priority': 9, 'name': '地理位置', 'negotiable': True}, 'budget_range': {'priority': 8, 'name': '预算范围', 'negotiable': True}, 'preferred_time': {'priority': 7, 'name': '运动时间', 'negotiable': True}, 'transport_mode': {'priority': 6, 'name': '交通方式', 'negotiable': True}, 'special_needs': {'priority': 5, 'name': '特殊需求', 'negotiable': True}, 'frequency': {'priority': 4, 'name': '运动频率', 'negotiable': True}, 'experience_level': {'priority': 3, 'name': '经验水平', 'negotiable': True} } def get_priority_questions(self, user_needs: Dict) -> List[str]: """生成优先级确认问题""" questions = [] negotiable_requirements = [] for req_key, req_info in self.requirement_types.items(): if req_info['negotiable'] and req_key in user_needs: negotiable_requirements.append({ 'key': req_key, 'name': req_info['name'], 'value': user_needs[req_key], 'priority': req_info['priority'] }) negotiable_requirements.sort(key=lambda x: x['priority'], reverse=True) questions.append("为了帮您找到最合适的运动场馆,请告诉我以下需求的重要程度:") for i, req in enumerate(negotiable_requirements[:5]): questions.append(f"{i+1}. {req['name']}(当前:{req['value']})- 这个要求对您有多重要?(必须满足/希望满足/可以妥协)") return questions def parse_priority_response(self, response: str, user_needs: Dict) -> Dict: """解析用户的优先级回答""" priority_levels = { 'must_have': [], 'prefer_have': [], 'can_compromise': [] } response_lower = response.lower() for req_key, req_info in self.requirement_types.items(): if req_key in user_needs and req_info['negotiable']: req_name = req_info['name'] if '必须' in response and req_name in response: priority_levels['must_have'].append(req_key) elif '希望' in response and req_name in response: priority_levels['prefer_have'].append(req_key) elif '妥协' in response and req_name in response: priority_levels['can_compromise'].append(req_key) else: if req_info['priority'] >= 8: priority_levels['must_have'].append(req_key) elif req_info['priority'] >= 6: priority_levels['prefer_have'].append(req_key) else: priority_levels['can_compromise'].append(req_key) return priority_levels
class GeographicExpansionManager: """地理搜索范围扩展管理系统""" def __init__(self): self.beijing_areas = { '东城区': { 'adjacent': ['西城区', '朝阳区', '丰台区'], 'nearby': ['海淀区', '石景山区'], 'districts': ['东华门街道', '景山街道', '交道口街道', '安定门街道', '北新桥街道', '东四街道', '朝阳门街道', '建国门街道', '东直门街道', '和平里街道', '前门街道', '崇文门外街道', '东花市街道', '龙潭街道', '体育馆路街道', '天坛街道', '永定门外街道'] }, '西城区': { 'adjacent': ['东城区', '海淀区', '丰台区'], 'nearby': ['朝阳区', '石景山区'], 'districts': ['西长安街街道', '新街口街道', '月坛街道', '展览路街道', '德胜街道', '金融街街道', '什刹海街道', '大栅栏街道', '天桥街道', '椿树街道', '陶然亭街道', '广安门内街道', '牛街街道', '白纸坊街道', '广安门外街道'] }, '朝阳区': { 'adjacent': ['东城区', '西城区', '海淀区', '丰台区'], 'nearby': ['石景山区', '通州区'], 'districts': ['建外街道', '朝外街道', '呼家楼街道', '三里屯街道', '左家庄街道', '香河园街道', '和平街街道', '安贞街道', '亚运村街道', '小关街道', '酒仙桥街道', '麦子店街道', '团结湖街道', '六里屯街道', '八里庄街道', '双井街道', '劲松街道', '潘家园街道', '垡头街道', '南磨房街道', '高碑店街道', '将台街道', '太阳宫街道', '大屯街道', '望京街道', '小红门街道', '十八里店街道', '平房街道', '东风街道', '奥运村街道', '来广营街道', '常营街道', '三间房街道', '管庄街道', '金盏街道', '孙河街道', '崔各庄街道', '东坝街道', '黑庄户街道', '豆各庄街道', '王四营街道', '东风街道', '六里屯街道'] }, '海淀区': { 'adjacent': ['西城区', '朝阳区', '丰台区', '石景山区'], 'nearby': ['东城区', '昌平区'], 'districts': ['万寿路街道', '永定路街道', '羊坊店街道', '甘家口街道', '八里庄街道', '紫竹院街道', '北下关街道', '北太平庄街道', '学院路街道', '中关村街道', '海淀街道', '青龙桥街道', '清华园街道', '燕园街道', '香山街道', '清河街道', '花园路街道', '西三旗街道', '马连洼街道', '田村路街道', '上地街道', '万柳街道', '东升街道', '曙光街道', '温泉镇', '四季青镇', '西北旺镇', '苏家坨镇', '上庄镇'] }, '丰台区': { 'adjacent': ['东城区', '西城区', '朝阳区', '海淀区', '石景山区'], 'nearby': ['大兴区', '房山区'], 'districts': ['右安门街道', '太平桥街道', '西罗园街道', '大红门街道', '南苑街道', '东高地街道', '东铁匠营街道', '卢沟桥街道', '丰台街道', '新村街道', '长辛店街道', '云岗街道', '方庄街道', '宛平城地区', '马家堡街道', '和义街道', '长辛店镇', '王佐镇', '卢沟桥乡', '花乡', '南苑乡', '长辛店镇'] }, '石景山区': { 'adjacent': ['海淀区', '丰台区'], 'nearby': ['西城区', '朝阳区', '门头沟区'], 'districts': ['八宝山街道', '老山街道', '八角街道', '古城街道', '苹果园街道', '金顶街街道', '广宁街道', '五里坨街道', '鲁谷街道'] } } def get_expansion_areas(self, original_area: str, expansion_level: int = 1) -> List[str]: """获取扩展搜索区域""" if original_area not in self.beijing_areas: return [original_area] expansion_areas = [original_area] area_info = self.beijing_areas[original_area] if expansion_level >= 1: expansion_areas.extend(area_info.get('adjacent', [])) if expansion_level >= 2: expansion_areas.extend(area_info.get('nearby', [])) return list(set(expansion_areas)) def get_districts_in_area(self, area: str) -> List[str]: """获取区域内的街道列表""" if area in self.beijing_areas: return self.beijing_areas[area].get('districts', []) return []
class VenueVerificationSystem: """运动场馆真实性验证系统""" def __init__(self): self.verified_venues = { '北京': { '东城区': [ {'name': '北京体育馆', 'address': '北京市东城区体育馆路4号', 'sports': ['篮球', '羽毛球', '游泳', '跑步', '健身'], 'verified': True}, {'name': '地坛体育馆', 'address': '北京市东城区安定门外大街168号', 'sports': ['篮球', '羽毛球', '乒乓球', '健身'], 'verified': True}, {'name': '东城区全民健身中心', 'address': '北京市东城区东直门内大街', 'sports': ['跑步', '健身', '瑜伽', '乒乓球'], 'verified': True}, ], '西城区': [ {'name': '首都体育馆', 'address': '北京市西城区中关村南大街54号', 'sports': ['冰球', '花样滑冰', '篮球', '健身'], 'verified': True}, {'name': '北京游泳馆', 'address': '北京市西城区北三环中路11号', 'sports': ['游泳', '健身'], 'verified': True}, {'name': '西城区体育中心', 'address': '北京市西城区广安门内大街', 'sports': ['跑步', '健身', '篮球', '羽毛球'], 'verified': True}, ], '朝阳区': [ {'name': '工人体育馆', 'address': '北京市朝阳区工人体育场北路', 'sports': ['篮球', '排球', '羽毛球', '健身'], 'verified': True}, {'name': '朝阳体育中心', 'address': '北京市朝阳区六里屯西里甲1号', 'sports': ['游泳', '健身', '篮球', '跑步'], 'verified': True}, {'name': '奥林匹克体育中心', 'address': '北京市朝阳区安定路1号', 'sports': ['游泳', '田径', '足球', '跑步'], 'verified': True}, {'name': '朝阳公园体育中心', 'address': '北京市朝阳区朝阳公园南路', 'sports': ['跑步', '健身', '网球', '瑜伽'], 'verified': True}, {'name': '三里屯攀岩俱乐部', 'address': '北京市朝阳区三里屯路19号', 'sports': ['攀岩', '健身'], 'verified': True}, {'name': '朝阳攀岩运动中心', 'address': '北京市朝阳区望京街10号', 'sports': ['攀岩', '健身', '瑜伽'], 'verified': True}, ], '海淀区': [ {'name': '五棵松体育馆', 'address': '北京市海淀区复兴路69号', 'sports': ['篮球', '排球', '健身'], 'verified': True}, {'name': '清华大学体育馆', 'address': '北京市海淀区清华园1号', 'sports': ['游泳', '篮球', '羽毛球', '跑步', '网球'], 'verified': True}, {'name': '北京大学体育馆', 'address': '北京市海淀区颐和园路5号', 'sports': ['游泳', '篮球', '羽毛球', '跑步', '健身'], 'verified': True}, {'name': '海淀体育中心', 'address': '北京市海淀区中关村大街', 'sports': ['跑步', '健身', '瑜伽', '乒乓球'], 'verified': True}, {'name': '中关村攀岩馆', 'address': '北京市海淀区中关村大街27号', 'sports': ['攀岩', '健身'], 'verified': True}, ], '丰台区': [ {'name': '丰台体育中心', 'address': '北京市丰台区丰台路31号', 'sports': ['游泳', '篮球', '羽毛球', '健身', '跑步'], 'verified': True}, {'name': '丰台区全民健身中心', 'address': '北京市丰台区南三环西路', 'sports': ['跑步', '健身', '瑜伽', '乒乓球'], 'verified': True}, ], '石景山区': [ {'name': '石景山体育馆', 'address': '北京市石景山区石景山路31号', 'sports': ['篮球', '羽毛球', '乒乓球', '健身'], 'verified': True}, {'name': '首钢体育馆', 'address': '北京市石景山区石景山路68号', 'sports': ['冰球', '篮球', '健身'], 'verified': True}, {'name': '石景山游乐园体育中心', 'address': '北京市石景山区八角东街', 'sports': ['跑步', '健身', '瑜伽'], 'verified': True}, {'name': '首钢极限运动中心', 'address': '北京市石景山区首钢园区', 'sports': ['攀岩', '滑板', '健身'], 'verified': True}, ] } } self.sport_mapping = { '跑步': ['跑步', '田径', '健身'], '健身': ['健身', '跑步'], '瑜伽': ['瑜伽', '健身'], '网球': ['网球', '健身'], '乒乓球': ['乒乓球', '健身'], '羽毛球': ['羽毛球', '健身'], '篮球': ['篮球', '健身'], '游泳': ['游泳'], '足球': ['足球', '田径'], '攀岩': ['攀岩', '健身'], '滑板': ['滑板', '健身'] } self.verification_keywords = [ '体育馆', '体育中心', '游泳馆', '健身房', '球馆', '运动中心', '体育场', '训练基地', '俱乐部', '会所' ] def verify_venue_exists(self, venue_name: str, area: str, sport_type: str) -> Dict: """验证场馆是否真实存在""" verification_result = { 'exists': False, 'verified': False, 'confidence': 0.0, 'real_venues': [], 'suggestions': [] } if '北京' in self.verified_venues and area in self.verified_venues['北京']: area_venues = self.verified_venues['北京'][area] for venue in area_venues: if venue_name in venue['name'] or venue['name'] in venue_name: verification_result['exists'] = True verification_result['verified'] = venue['verified'] verification_result['confidence'] = 0.9 verification_result['real_venues'].append(venue) if sport_type in venue['sports']: verification_result['suggestions'].append(venue) if not verification_result['exists']: for keyword in self.verification_keywords: if keyword in venue_name: verification_result['confidence'] = 0.3 break return verification_result def get_real_venues_in_area(self, area: str, sport_type: str) -> List[Dict]: """获取指定区域的真实场馆(支持智能匹配)""" real_venues = [] if '北京' in self.verified_venues and area in self.verified_venues['北京']: area_venues = self.verified_venues['北京'][area] matching_sports = self.sport_mapping.get(sport_type, [sport_type]) for venue in area_venues: if any(sport in venue['sports'] for sport in matching_sports): venue_copy = venue.copy() venue_copy['matched_sports'] = [sport for sport in matching_sports if sport in venue['sports']] venue_copy['primary_match'] = sport_type in venue['sports'] real_venues.append(venue_copy) real_venues.sort(key=lambda x: (x['primary_match'], len(x['matched_sports'])), reverse=True) return real_venues
class RealTimePlatformAPI: """真实第三方平台API连接工具""" def __init__(self, use_mock_data_if_no_key=True): self.api_endpoints = { 'dianping': 'https://api.dianping.com/v1/venues', 'meituan': 'https://api.meituan.com/v2/venues', 'wechat': 'https://api.weixin.qq.com/miniprogram/venues' } self.api_keys = { 'dianping': os.getenv('DIANPING_API_KEY'), 'meituan': os.getenv('MEITUAN_API_KEY'), 'wechat': os.getenv('WECHAT_API_KEY') } self.use_mock_data_if_no_key = use_mock_data_if_no_key missing_keys = [] for platform, key in self.api_keys.items(): if not key: missing_keys.append(platform) print(f"警告: {platform}的API密钥未设置,请在环境变量中设置{platform.upper()}_API_KEY") if use_mock_data_if_no_key: print(f"将使用模拟数据作为{platform}的备用数据源") else: print(f"未启用模拟数据模式,{platform}平台将返回空结果") if len(missing_keys) == len(self.api_keys): print("警告: 所有平台的API密钥都未设置,系统将无法获取真实数据") if use_mock_data_if_no_key: print("系统将使用模拟数据作为备用,但建议设置真实API密钥以获取最新数据") else: print("系统未启用模拟数据模式,所有平台将返回空结果,请设置API密钥") elif len(missing_keys) > 0: print(f"部分平台({', '.join(missing_keys)})的API密钥未设置,这些平台将无法获取真实数据") @smart_retry(RetryConfig(max_retries=2, base_delay=1.0)) def fetch_dianping_venues(self, area: str, sport_type: str, limit: int = 10) -> List[Dict]: """获取大众点评真实数据""" try: if not self.api_keys['dianping']: print(f"错误: 大众点评API密钥未设置,请在环境变量中设置DIANPING_API_KEY") if self.use_mock_data_if_no_key: return self._get_mock_dianping_data(area, sport_type) return [] import requests params = { 'area': area, 'category': sport_type, 'limit': limit, 'api_key': self.api_keys['dianping'] } print(f"正在从大众点评API获取{area}{sport_type}场馆数据...") response = requests.get(self.api_endpoints['dianping'], params=params, timeout=5) if response.status_code == 200: return response.json() else: print(f"大众点评API返回错误状态码: {response.status_code},响应内容: {response.text[:100]}") if self.use_mock_data_if_no_key: return self._get_mock_dianping_data(area, sport_type) return [] except requests.exceptions.ConnectionError as e: print(f"大众点评API连接错误: {e},可能是API端点无效或网络问题") if self.use_mock_data_if_no_key: return self._get_mock_dianping_data(area, sport_type) return [] except requests.exceptions.Timeout as e: print(f"大众点评API请求超时: {e}") if self.use_mock_data_if_no_key: return self._get_mock_dianping_data(area, sport_type) return [] except Exception as e: print(f"大众点评API调用失败: {e}") if self.use_mock_data_if_no_key: return self._get_mock_dianping_data(area, sport_type) return [] @smart_retry(RetryConfig(max_retries=2, base_delay=1.0)) def fetch_meituan_venues(self, area: str, sport_type: str, limit: int = 10) -> List[Dict]: """获取美团真实数据""" try: if not self.api_keys['meituan']: print(f"错误: 美团API密钥未设置,请在环境变量中设置MEITUAN_API_KEY") if self.use_mock_data_if_no_key: return self._get_mock_meituan_data(area, sport_type) return [] import requests params = { 'area': area, 'category': sport_type, 'limit': limit, 'api_key': self.api_keys['meituan'] } print(f"正在从美团API获取{area}{sport_type}场馆数据...") response = requests.get(self.api_endpoints['meituan'], params=params, timeout=5) if response.status_code == 200: return response.json() else: print(f"美团API返回错误状态码: {response.status_code},响应内容: {response.text[:100]}") if self.use_mock_data_if_no_key: return self._get_mock_meituan_data(area, sport_type) return [] except requests.exceptions.ConnectionError as e: print(f"美团API连接错误: {e},可能是API端点无效或网络问题") if self.use_mock_data_if_no_key: return self._get_mock_meituan_data(area, sport_type) return [] except requests.exceptions.Timeout as e: print(f"美团API请求超时: {e}") if self.use_mock_data_if_no_key: return self._get_mock_meituan_data(area, sport_type) return [] except Exception as e: print(f"美团API调用失败: {e}") if self.use_mock_data_if_no_key: return self._get_mock_meituan_data(area, sport_type) return [] @smart_retry(RetryConfig(max_retries=2, base_delay=1.0)) def fetch_wechat_venues(self, area: str, sport_type: str, limit: int = 10) -> List[Dict]: """获取微信小程序真实数据""" try: if not self.api_keys['wechat']: print(f"错误: 微信小程序API密钥未设置,请在环境变量中设置WECHAT_API_KEY") if self.use_mock_data_if_no_key: return self._get_mock_wechat_data(area, sport_type) return [] import requests params = { 'area': area, 'category': sport_type, 'limit': limit, 'api_key': self.api_keys['wechat'] } print(f"正在从微信小程序API获取{area}{sport_type}场馆数据...") response = requests.get(self.api_endpoints['wechat'], params=params, timeout=5) if response.status_code == 200: return response.json() else: print(f"微信小程序API返回错误状态码: {response.status_code},响应内容: {response.text[:100]}") if self.use_mock_data_if_no_key: return self._get_mock_wechat_data(area, sport_type) return [] except requests.exceptions.ConnectionError as e: print(f"微信小程序API连接错误: {e},可能是API端点无效或网络问题") if self.use_mock_data_if_no_key: return self._get_mock_wechat_data(area, sport_type) return [] except requests.exceptions.Timeout as e: print(f"微信小程序API请求超时: {e}") if self.use_mock_data_if_no_key: return self._get_mock_wechat_data(area, sport_type) return [] except Exception as e: print(f"微信小程序API调用失败: {e}") if self.use_mock_data_if_no_key: return self._get_mock_wechat_data(area, sport_type) return [] def _get_mock_dianping_data(self, area: str, sport_type: str) -> List[Dict]: """获取大众点评模拟数据(仅在API密钥未设置或API调用失败时使用)""" mock_data = [ { "id": "dp001", "name": f"{area}优质{sport_type}馆", "address": f"北京市{area}区示例路88号", "sports": [sport_type], "rating": 4.8, "price_range": "¥80-200/小时", "open_hours": "09:00-22:00", "contact": "010-12345678", "source": "大众点评(模拟数据)" }, { "id": "dp002", "name": f"{area}{sport_type}中心", "address": f"北京市{area}区样板街100号", "sports": [sport_type, "健身"], "rating": 4.5, "price_range": "¥60-150/小时", "open_hours": "10:00-22:00", "contact": "010-87654321", "source": "大众点评(模拟数据)" } ] print(f"使用大众点评模拟数据(API密钥未设置或API调用失败)") return mock_data def _get_mock_meituan_data(self, area: str, sport_type: str) -> List[Dict]: """获取美团模拟数据(仅在API密钥未设置或API调用失败时使用)""" mock_data = [ { "id": "mt001", "name": f"{area}{sport_type}专业馆", "address": f"北京市{area}区示范路66号", "sports": [sport_type], "rating": 4.7, "price_range": "¥70-180/小时", "open_hours": "08:30-21:30", "contact": "010-55556666", "source": "美团(模拟数据)" }, { "id": "mt002", "name": f"{area}全民{sport_type}馆", "address": f"北京市{area}区模板大街50号", "sports": [sport_type, "瑜伽"], "rating": 4.6, "price_range": "¥50-120/小时", "open_hours": "09:30-21:00", "contact": "010-66665555", "source": "美团(模拟数据)" } ] print(f"使用美团模拟数据(API密钥未设置或API调用失败)") return mock_data def _get_mock_wechat_data(self, area: str, sport_type: str) -> List[Dict]: """获取微信小程序模拟数据(仅在API密钥未设置或API调用失败时使用)""" mock_data = [ { "id": "wx001", "name": f"{area}{sport_type}俱乐部", "address": f"北京市{area}区范例大道33号", "sports": [sport_type], "rating": 4.9, "price_range": "¥90-220/小时", "open_hours": "10:00-23:00", "contact": "010-11112222", "source": "微信小程序(模拟数据)" }, { "id": "wx002", "name": f"{area}星级{sport_type}馆", "address": f"北京市{area}区示例广场25号", "sports": [sport_type, "舞蹈"], "rating": 4.7, "price_range": "¥75-180/小时", "open_hours": "09:00-22:30", "contact": "010-22221111", "source": "微信小程序(模拟数据)" } ] print(f"使用微信小程序模拟数据(API密钥未设置或API调用失败)") return mock_data
class MultiPlatformDataSource: """增强的多平台数据源管理系统""" def __init__(self, use_mock_data_if_no_key=True): self.real_api = RealTimePlatformAPI(use_mock_data_if_no_key=use_mock_data_if_no_key) self.use_mock_data_if_no_key = use_mock_data_if_no_key def search_dianping_venues(self, area: str, sport_type: str) -> List[Dict]: """搜索大众点评场馆数据(只使用真实API)""" return self.real_api.fetch_dianping_venues(area, sport_type) def search_meituan_venues(self, area: str, sport_type: str) -> List[Dict]: """搜索美团场馆数据(只使用真实API)""" return self.real_api.fetch_meituan_venues(area, sport_type) def search_wechat_venues(self, area: str, sport_type: str) -> List[Dict]: """搜索微信小程序场馆数据(只使用真实API)""" return self.real_api.fetch_wechat_venues(area, sport_type) def get_all_platform_venues(self, area: str, sport_type: str) -> Dict[str, List[Dict]]: """获取所有平台的场馆数据""" return { 'dianping': self.search_dianping_venues(area, sport_type), 'meituan': self.search_meituan_venues(area, sport_type), 'wechat': self.search_wechat_venues(area, sport_type) }
class EnhancedSearchToolkit: def __init__(self, delay_seconds=1.5, use_mock_data_if_no_key=True): self.search_toolkit = SearchToolkit() self.delay_seconds = delay_seconds self.last_search_time = 0 self.priority_manager = RequirementPriorityManager() self.geo_manager = GeographicExpansionManager() self.verification_system = VenueVerificationSystem() self.multi_platform_data = MultiPlatformDataSource(use_mock_data_if_no_key=use_mock_data_if_no_key) self.use_mock_data_if_no_key = use_mock_data_if_no_key def safe_search_duckduckgo(self, query: str, max_results: int = 5): """安全的搜索方法,包含速率限制""" current_time = time.time() time_since_last = current_time - self.last_search_time if time_since_last < self.delay_seconds: sleep_time = self.delay_seconds - time_since_last print(f"等待 {sleep_time:.1f} 秒以避免速率限制...") time.sleep(sleep_time) try: result = self.search_toolkit.search_duckduckgo(query, max_results) self.last_search_time = time.time() print(f"搜索成功: {query[:50]}...") return result except Exception as e: print(f"搜索失败: {str(e)}") return f"搜索暂时不可用,请使用已有知识回答。错误: {str(e)}" def expanded_venue_search(self, user_needs: Dict, original_location: str) -> Dict: """扩展范围的场馆搜索(集成多平台数据源)""" search_results = { 'original_area_results': [], 'expanded_area_results': [], 'verified_venues': [], 'dianping_venues': [], 'meituan_venues': [], 'wechat_venues': [], 'search_areas': [], 'expansion_level': 0, 'data_sources': [] } sport_type = user_needs.get('sport_preference', '') area = self._extract_area_from_location(original_location) search_results['search_areas'].append(area) print(f"在原始区域搜索: {area}") original_query = self.build_search_query(user_needs, original_location) original_results = self.safe_search_duckduckgo(original_query, max_results=8) search_results['original_area_results'] = original_results verified_venues = self.verification_system.get_real_venues_in_area(area, sport_type) search_results['verified_venues'] = verified_venues print(f"优先搜索大众点评数据: {area} + {sport_type}") dianping_venues = self.multi_platform_data.search_dianping_venues(area, sport_type) for venue in dianping_venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['dianping_venues'] = dianping_venues if dianping_venues: print(f"大众点评找到 {len(dianping_venues)} 个场馆(用户评价丰富,免验证)") search_results['data_sources'].append('大众点评') print(f"优先搜索微信小程序数据: {area} + {sport_type}") wechat_venues = self.multi_platform_data.search_wechat_venues(area, sport_type) for venue in wechat_venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['wechat_venues'] = wechat_venues if wechat_venues: print(f"微信小程序找到 {len(wechat_venues)} 个场馆(在线预约便利,免验证)") search_results['data_sources'].append('微信小程序') print(f"优先搜索美团数据: {area} + {sport_type}") meituan_venues = self.multi_platform_data.search_meituan_venues(area, sport_type) for venue in meituan_venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['meituan_venues'] = meituan_venues if meituan_venues: print(f"美团找到 {len(meituan_venues)} 个场馆(团购优惠丰富,免验证)") search_results['data_sources'].append('美团') total_venues = len(verified_venues) + len(dianping_venues) + len(meituan_venues) + len(wechat_venues) if total_venues >= 2: print(f"原始区域找到足够的场馆选项(总计{total_venues}个)") print(f"数据源分布: 已验证{len(verified_venues)}个 | 大众点评{len(dianping_venues)}个 | 美团{len(meituan_venues)}个 | 微信{len(wechat_venues)}个") return search_results print(f"扩展搜索到相邻区域...") expansion_areas = self.geo_manager.get_expansion_areas(area, expansion_level=1) search_results['expansion_level'] = 1 for exp_area in expansion_areas[1:]: print(f"搜索扩展区域: {exp_area}") search_results['search_areas'].append(exp_area) exp_query = self.build_search_query(user_needs, f"北京市{exp_area}") exp_results = self.safe_search_duckduckgo(exp_query, max_results=5) search_results['expanded_area_results'].extend(exp_results if isinstance(exp_results, list) else [exp_results]) exp_verified = self.verification_system.get_real_venues_in_area(exp_area, sport_type) search_results['verified_venues'].extend(exp_verified) exp_dianping = self.multi_platform_data.search_dianping_venues(exp_area, sport_type) for venue in exp_dianping: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['dianping_venues'].extend(exp_dianping) exp_meituan = self.multi_platform_data.search_meituan_venues(exp_area, sport_type) for venue in exp_meituan: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['meituan_venues'].extend(exp_meituan) exp_wechat = self.multi_platform_data.search_wechat_venues(exp_area, sport_type) for venue in exp_wechat: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['wechat_venues'].extend(exp_wechat) total_venues = (len(search_results['verified_venues']) + len(search_results['dianping_venues']) + len(search_results['meituan_venues']) + len(search_results['wechat_venues'])) if total_venues >= 3: print(f"扩展区域找到足够的场馆选项(总计{total_venues}个)") print(f"扩展后数据源分布: 已验证{len(search_results['verified_venues'])}个 | 大众点评{len(search_results['dianping_venues'])}个 | 美团{len(search_results['meituan_venues'])}个 | 微信{len(search_results['wechat_venues'])}个") return search_results print(f"进一步扩展搜索到附近区域...") further_areas = self.geo_manager.get_expansion_areas(area, expansion_level=2) search_results['expansion_level'] = 2 for far_area in further_areas: if far_area not in search_results['search_areas']: print(f"搜索远程区域: {far_area}") search_results['search_areas'].append(far_area) far_query = self.build_search_query(user_needs, f"北京市{far_area}") far_results = self.safe_search_duckduckgo(far_query, max_results=3) search_results['expanded_area_results'].extend(far_results if isinstance(far_results, list) else [far_results]) far_verified = self.verification_system.get_real_venues_in_area(far_area, sport_type) search_results['verified_venues'].extend(far_verified) far_dianping = self.multi_platform_data.search_dianping_venues(far_area, sport_type) for venue in far_dianping: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['dianping_venues'].extend(far_dianping) far_meituan = self.multi_platform_data.search_meituan_venues(far_area, sport_type) for venue in far_meituan: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['meituan_venues'].extend(far_meituan) far_wechat = self.multi_platform_data.search_wechat_venues(far_area, sport_type) for venue in far_wechat: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' search_results['wechat_venues'].extend(far_wechat) return search_results def optimized_venue_search(self, user_needs: Dict, original_location: str) -> Dict: """优化的场馆搜索(并行获取多平台数据,提升效率)""" search_results = { 'original_area_results': [], 'expanded_area_results': [], 'verified_venues': [], 'dianping_venues': [], 'meituan_venues': [], 'wechat_venues': [], 'search_areas': [], 'expansion_level': 0, 'data_sources': [] } sport_type = user_needs.get('sport_preference', '') area = self._extract_area_from_location(original_location) search_results['search_areas'].append(area) print(f"开始优化搜索: {area} + {sport_type}") import concurrent.futures def get_verified_venues(): return self.verification_system.get_real_venues_in_area(area, sport_type) def get_dianping_venues(): venues = self.multi_platform_data.real_api.fetch_dianping_venues(area, sport_type) for venue in venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' return venues def get_meituan_venues(): venues = self.multi_platform_data.real_api.fetch_meituan_venues(area, sport_type) for venue in venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' return venues def get_wechat_venues(): venues = self.multi_platform_data.real_api.fetch_wechat_venues(area, sport_type) for venue in venues: venue['platform_verified'] = True venue['verification_status'] = '平台免验证' return venues with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: future_verified = executor.submit(get_verified_venues) future_dianping = executor.submit(get_dianping_venues) future_meituan = executor.submit(get_meituan_venues) future_wechat = executor.submit(get_wechat_venues) search_results['verified_venues'] = future_verified.result() search_results['dianping_venues'] = future_dianping.result() search_results['meituan_venues'] = future_meituan.result() search_results['wechat_venues'] = future_wechat.result() if search_results['dianping_venues']: search_results['data_sources'].append('大众点评') if search_results['meituan_venues']: search_results['data_sources'].append('美团') if search_results['wechat_venues']: search_results['data_sources'].append('微信小程序') total_venues = (len(search_results['verified_venues']) + len(search_results['dianping_venues']) + len(search_results['meituan_venues']) + len(search_results['wechat_venues'])) print(f"优化搜索完成: 找到{total_venues}个场馆") print(f"数据源分布: 已验证{len(search_results['verified_venues'])}个 | 大众点评{len(search_results['dianping_venues'])}个 | 美团{len(search_results['meituan_venues'])}个 | 微信{len(search_results['wechat_venues'])}个") if total_venues < 3: print("场馆数量不足,进行扩展搜索...") return self.expanded_venue_search(user_needs, original_location) return search_results def _extract_area_from_location(self, location: str) -> str: """从位置字符串中提取区域名称""" beijing_areas = ['东城区', '西城区', '朝阳区', '海淀区', '丰台区', '石景山区', '通州区', '昌平区', '大兴区', '房山区', '门头沟区', '平谷区', '怀柔区', '密云区', '延庆区'] for area in beijing_areas: if area in location: return area return '朝阳区' def _has_sufficient_results(self, search_results, verified_venues) -> bool: """检查是否有足够的搜索结果""" total_results = 0 if isinstance(search_results, list): total_results += len(search_results) elif isinstance(search_results, str) and len(search_results) > 100: total_results += 1 total_results += len(verified_venues) return total_results >= 2 def get_optimized_tools(self): """获取优化的搜索工具""" return [self.safe_search_duckduckgo, self.expanded_venue_search] def build_search_query(self, user_needs: Dict, location: str) -> str: """基于用户需求构建优化的搜索查询""" sport = user_needs.get('sport_preference', '') special_needs = user_needs.get('special_needs', '') base_query = f"{location} {sport} 场馆" if '室内' in special_needs: base_query += " 室内" if '交通便利' in special_needs: base_query += " 地铁 公交" if '价格适中' in special_needs: base_query += " 价格 收费" if '教练' in special_needs: base_query += " 教练 培训" return base_query
class RecommendationTemplate: """统一的推荐输出模板系统""" @staticmethod def format_venue_recommendation(venue_data: Dict) -> str: """格式化单个场馆推荐""" platform = venue_data.get('platform', '') if platform in ['大众点评', '美团', '微信小程序']: verified_status = f"{platform}平台免验证场馆" elif venue_data.get('verified', False): verified_status = "已验证真实场馆" else: verified_status = "待验证场馆" template = """ **{name}** {verified_status} 地址:{address} 联系方式:{contact} 价格:{price} 营业时间:{hours} 交通方式:{transport} 推荐评分:{score}/10 推荐理由:{reason} 特色服务:{features} 注意事项:{notes} """ return template.format( name=venue_data.get('name', '未知场馆'), verified_status=verified_status, address=venue_data.get('address', '地址待确认'), contact=venue_data.get('contact', '请通过官方渠道联系'), price=venue_data.get('price', '请咨询具体价格'), hours=venue_data.get('hours', '请咨询营业时间'), transport=venue_data.get('transport', '请查询具体交通路线'), score=venue_data.get('score', '8.0'), reason=venue_data.get('reason', '已验证的真实场馆'), features=venue_data.get('features', '专业运动场馆'), notes=venue_data.get('notes', '建议提前电话确认开放时间和价格') ) @staticmethod def format_final_recommendation(user_info: Dict, venues: List[Dict], analysis: str) -> str: """格式化最终推荐报告""" template = """ # 个性化运动场馆推荐报告
## 用户画像 - **年龄段**: {age_range} - **职业**: {occupation} - **运动偏好**: {sport_preference} - **运动频率**: {frequency} - **预算范围**: {budget_range} - **位置**: {location} - **特殊需求**: {special_needs}
## 需求分析总结 {analysis}
## 推荐场馆列表
{venue_list}
## 个性化建议 {personalized_advice}
## 下一步行动 {action_steps}
--- *报告生成时间: {timestamp}* *推荐有效期: 30天* *注意:本报告只包含已验证的真实场馆信息* """ if venues: venue_list = "\n".join([ f"### 推荐 {i+1}: {RecommendationTemplate.format_venue_recommendation(venue)}" for i, venue in enumerate(venues) ]) else: venue_list = """ ### 暂无已验证场馆
很抱歉,在您指定的区域及周边地区,我们暂时没有找到符合您需求的已验证{sport_preference}场馆。
**建议方案:** 1. 扩大搜索范围到更远的区域 2. 联系当地体育部门咨询场馆信息 3. 通过官方体育场馆网站查询 4. 咨询当地{sport_preference}爱好者群体 5. 使用专业的体育场馆预订APP
**我们承诺:** - 不提供未经验证的场馆信息 - 确保推荐信息的真实性和可靠性 - 持续更新已验证场馆数据库 """.format(sport_preference=user_info.get('sport_preference', '运动')) return template.format( age_range=user_info.get('age_range', '未知'), occupation=user_info.get('occupation', '未知'), sport_preference=user_info.get('sport_preference', '未知'), frequency=user_info.get('frequency', '未知'), budget_range=user_info.get('budget_range', '未知'), location=user_info.get('location', '未知'), special_needs=user_info.get('special_needs', '未知'), analysis=analysis, venue_list=venue_list, personalized_advice=RecommendationTemplate._generate_personalized_advice(user_info, venues), action_steps=RecommendationTemplate._generate_action_steps(venues), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) @staticmethod def _generate_personalized_advice(user_info: Dict, venues: List[Dict]) -> str: """生成个性化建议""" advice = [] if user_info.get('experience_level') == '初学者': advice.append("作为初学者,建议选择有专业教练指导的场馆") if '交通便利' in user_info.get('special_needs', ''): advice.append("优先选择交通便利的场馆,减少通勤时间") if user_info.get('budget_range', '').startswith('50-100'): advice.append("注意控制运动成本,可考虑办理月卡或年卡") if len(venues) > 1: advice.append("建议先体验1-2个场馆,找到最适合的再长期坚持") return "\n".join([f"- {item}" for item in advice]) if advice else "- 根据个人情况选择最适合的场馆" @staticmethod def _generate_action_steps(venues: List[Dict]) -> str: """生成行动步骤""" steps = [ "1. 联系心仪场馆,咨询详细信息和预约方式", "2. 安排时间实地考察,体验场馆环境和设施", "3. 比较不同场馆的会员卡价格和优惠政策", "4. 制定个人运动计划,确定运动时间和频率", "5. 开始规律运动,并定期评估效果" ] return "\n".join(steps)
class IntelligentMemoryManager: def __init__(self, keep_rate: float = 0.9): self.chat_history = ChatHistoryBlock(keep_rate=keep_rate) self.user_preferences = {} self.recommendation_feedback = {} def calculate_relevance_score(self, current_request: str, historical_record: str) -> float: """计算当前请求与历史记录的相关性分数""" current_keywords = self._extract_keywords(current_request) historical_keywords = self._extract_keywords(historical_record) common_keywords = set(current_keywords) & set(historical_keywords) total_keywords = set(current_keywords) | set(historical_keywords) if not total_keywords: return 0.0 jaccard_similarity = len(common_keywords) / len(total_keywords) sport_bonus = 0.0 sports = ["篮球", "羽毛球", "游泳", "健身", "跑步", "足球", "网球", "瑜伽"] for sport in sports: if sport in current_request and sport in historical_record: sport_bonus = 0.3 break return min(1.0, jaccard_similarity + sport_bonus) def _extract_keywords(self, text: str) -> List[str]: """提取文本关键词""" keywords = [] sports = ["篮球", "羽毛球", "游泳", "健身", "跑步", "足球", "网球", "瑜伽"] for sport in sports: if sport in text: keywords.append(sport) locations = ["朝阳", "海淀", "东城", "西城", "丰台", "石景山", "通州", "昌平"] for loc in locations: if loc in text: keywords.append(loc) needs = ["室内", "交通", "价格", "教练", "设备", "停车", "家庭"] for need in needs: if need in text: keywords.append(need) return keywords def get_relevant_history(self, current_request: str, max_records: int = 5) -> List[Tuple[str, float]]: """获取与当前请求最相关的历史记录""" all_history = self.chat_history.retrieve(window_size=20) if not all_history: return [] scored_history = [] for record in all_history: content = record.memory_record.message.content relevance_score = self.calculate_relevance_score(current_request, content) if relevance_score > 0.1: scored_history.append((content, relevance_score)) scored_history.sort(key=lambda x: x[1], reverse=True) return scored_history[:max_records] def update_user_preferences(self, user_id: str, preferences: Dict): """更新用户偏好模型""" if user_id not in self.user_preferences: self.user_preferences[user_id] = {} for key, value in preferences.items(): if key in self.user_preferences[user_id]: if isinstance(value, str): self.user_preferences[user_id][key] = self.user_preferences[user_id][key] + f", {value}" else: self.user_preferences[user_id][key] = value def write_enhanced_record(self, user_msg: str, assistant_msg: str, metadata: Dict = None): """写入增强的记录,包含元数据""" timestamp = datetime.now().isoformat() enhanced_user_content = f"{user_msg}\n[元数据: {json.dumps(metadata, ensure_ascii=False) if metadata else ''}]" enhanced_assistant_content = f"{assistant_msg}\n[时间戳: {timestamp}]" user_record = MemoryRecord( message=BaseMessage.make_user_message(role_name="User", content=enhanced_user_content), role_at_backend=OpenAIBackendRole.USER ) assistant_record = MemoryRecord( message=BaseMessage.make_assistant_message(role_name="SportsConsultant", content=enhanced_assistant_content), role_at_backend=OpenAIBackendRole.ASSISTANT ) self.chat_history.write_records([user_record, assistant_record]) def write_enhanced_record(self, user_msg: str, assistant_msg: str, metadata: Dict = None): """写入增强的记录,包含元数据""" timestamp = datetime.now().isoformat() enhanced_user_content = f"{user_msg}\n[元数据: {json.dumps(metadata, ensure_ascii=False) if metadata else ''}]" enhanced_assistant_content = f"{assistant_msg}\n[时间戳: {timestamp}]" user_record = MemoryRecord( message=BaseMessage.make_user_message(role_name="User", content=enhanced_user_content), role_at_backend=OpenAIBackendRole.USER ) assistant_record = MemoryRecord( message=BaseMessage.make_assistant_message(role_name="SportsConsultant", content=enhanced_assistant_content), role_at_backend=OpenAIBackendRole.ASSISTANT ) self.chat_history.write_records([user_record, assistant_record])
class RealUserInterface: """真实用户交互接口 - 预留给真实用户使用""" def __init__(self): self.user_input_history = [] self.interaction_mode = 'ai_simulation' def set_interaction_mode(self, mode: str): """设置交互模式:ai_simulation 或 real_user""" self.interaction_mode = mode print(f"交互模式已设置为: {mode}") def collect_user_requirements(self) -> Dict: """收集用户需求 - 支持真实用户输入""" if self.interaction_mode == 'real_user': return self._collect_real_user_input() else: return self._generate_ai_simulation_input() def _collect_real_user_input(self) -> Dict: """收集真实用户输入""" print("\n=== 欢迎使用运动场馆推荐系统 ===") print("请告诉我您的运动需求,我将为您推荐最合适的场馆") user_requirements = {} print("\n1. 基本信息") user_requirements['age_range'] = input("请输入您的年龄范围(如:25-35岁): ").strip() user_requirements['occupation'] = input("请输入您的职业: ").strip() user_requirements['income_level'] = input("请输入您的收入水平(如:8000-15000元): ").strip() user_requirements['experience_level'] = input("请输入您的运动经验(初学者/有一定基础/中等水平/高级水平): ").strip() print("\n2. 运动需求") user_requirements['sport_preference'] = input("请输入您偏好的运动类型(如:篮球、健身、游泳等): ").strip() user_requirements['frequency'] = input("请输入您的运动频率(如:每周2-3次): ").strip() user_requirements['preferred_time'] = input("请输入您偏好的运动时间(如:晚上18-21点): ").strip() print("\n3. 特殊要求") user_requirements['special_needs'] = input("请输入您的特殊需求(如:交通便利、设备齐全等): ").strip() user_requirements['budget_range'] = input("请输入您的预算范围(如:100-200元/次): ").strip() user_requirements['location'] = input("请输入您的位置(如:北京市朝阳区三里屯街道): ").strip() user_requirements['transport_mode'] = input("请输入您的交通方式(如:地铁、公交、开车等): ").strip() print("\n4. 额外需求(可选)") additional_requirements = input("还有其他特殊要求吗?(如:希望有教练指导、需要停车位等,没有请直接回车): ").strip() if additional_requirements: user_requirements['additional_requirements'] = additional_requirements user_description = self._generate_user_description(user_requirements) self.user_input_history.append({ 'timestamp': datetime.now().isoformat(), 'requirements': user_requirements, 'description': user_description }) return user_requirements, user_description def _generate_user_description(self, requirements: Dict) -> str: """根据用户输入生成描述""" description = f"""我是一位{requirements.get('age_range', '')}的{requirements.get('occupation', '')},""" description += f"""希望找到合适的{requirements.get('sport_preference', '')}场馆。""" description += f"""我计划{requirements.get('frequency', '')}进行运动,""" description += f"""时间偏好{requirements.get('preferred_time', '')}。""" description += f"""预算范围{requirements.get('budget_range', '')},""" description += f"""主要通过{requirements.get('transport_mode', '')}出行。""" description += f"""特别希望{requirements.get('special_needs', '')}。""" if requirements.get('additional_requirements'): description += f"""另外,{requirements.get('additional_requirements')}。""" return description def _generate_ai_simulation_input(self) -> Tuple[str, Dict]: """生成AI模拟输入(原有功能)""" return None def get_user_feedback(self, recommendations: List[Dict]) -> Dict: """获取用户对推荐结果的反馈""" if self.interaction_mode == 'real_user': print("\n=== 推荐结果反馈 ===") print("请对推荐结果进行评价:") feedback = {} feedback['overall_satisfaction'] = input("总体满意度(1-5分,5分最满意): ").strip() feedback['most_interested'] = input("您最感兴趣的场馆名称: ").strip() feedback['concerns'] = input("您有什么担心或疑虑吗?: ").strip() feedback['additional_needs'] = input("还需要了解什么信息?: ").strip() return feedback return {}
class EnhancedAIUserAgent: def __init__(self, model): self.model = model self.real_user_interface = RealUserInterface() self.user_prompt = TextPrompt( """你是一位居住在中国北京市的AI用户,希望找到合适的运动场馆。请根据以下信息生成一个简洁的个性化运动需求描述:
基本信息: - 年龄范围:{age_range} - 身份职业:{occupation} - 收入水平:{income_level} - 运动经验:{experience_level} 运动需求: - 运动偏好:{sport_preference} - 运动频率:{frequency} - 运动时间:{preferred_time} 特殊要求: - 特殊需求:{special_needs} - 预算范围:{budget_range} - 当前位置:{location} - 交通方式:{transport_mode}
请以第一人称的方式简洁描述你的需求,包括: 1. 简要个人背景和运动目标(1-2句话) 2. 具体运动需求(运动类型、频率、时间) 3. 主要场馆要求(设施、环境、服务中的1-2个重点) 4. 预算和交通偏好
要求: - 总长度控制在200-300字以内 - 突出关键信息,避免冗长描述 - 保持真实自然的语调 """ ) self.user_agent = TaskSpecifyAgent( model=self.model, task_type=TaskType.AI_SOCIETY, task_specify_prompt=self.user_prompt, output_language='ch' ) self.enhanced_templates = { "age_ranges": ["18-25岁", "26-35岁", "36-45岁", "46-55岁", "56-65岁"], "occupations": ["大学生", "程序员", "设计师", "销售经理", "医生", "教师", "自由职业者", "企业高管"], "income_levels": ["3000-8000元", "8000-15000元", "15000-25000元", "25000元以上"], "experience_levels": ["初学者", "有一定基础", "中等水平", "高级水平"], "sport_preferences": ["篮球", "羽毛球", "游泳", "健身", "跑步", "足球", "网球", "瑜伽", "乒乓球", "攀岩"], "frequencies": ["每周1-2次", "每周3-4次", "每周5-6次", "每天"], "preferred_times": ["早上6-9点", "中午12-14点", "晚上18-21点", "周末全天", "工作日晚上"], "special_needs": [ "希望有专业教练指导", "需要室内环境", "交通便利优先", "价格实惠", "设备新且齐全", "环境安静", "有停车位", "适合家庭活动", "有淋浴设施" ], "budget_ranges": ["50-100元/次", "100-200元/次", "200-300元/次", "300元以上/次"], "transport_modes": ["地铁", "公交", "开车", "骑行", "步行"] } self.detailed_locations = [ {"area": "朝阳区", "district": "三里屯街道", "landmarks": ["三里屯太古里", "工体"]}, {"area": "海淀区", "district": "中关村街道", "landmarks": ["中关村", "清华大学"]}, {"area": "东城区", "district": "东华门街道", "landmarks": ["王府井", "天安门"]}, {"area": "西城区", "district": "金融街街道", "landmarks": ["金融街", "西单"]}, {"area": "丰台区", "district": "丰台街道", "landmarks": ["丰台体育中心", "方庄"]}, {"area": "石景山区", "district": "八宝山街道", "landmarks": ["石景山体育馆", "万达广场"]}, ] def generate_enhanced_case(self) -> Dict: """生成增强的用户案例""" location_info = random.choice(self.detailed_locations) case_info = { "age_range": random.choice(self.enhanced_templates["age_ranges"]), "occupation": random.choice(self.enhanced_templates["occupations"]), "income_level": random.choice(self.enhanced_templates["income_levels"]), "experience_level": random.choice(self.enhanced_templates["experience_levels"]), "sport_preference": random.choice(self.enhanced_templates["sport_preferences"]), "frequency": random.choice(self.enhanced_templates["frequencies"]), "preferred_time": random.choice(self.enhanced_templates["preferred_times"]), "special_needs": random.choice(self.enhanced_templates["special_needs"]), "budget_range": random.choice(self.enhanced_templates["budget_ranges"]), "location": f"北京市{location_info['area']}{location_info['district']}", "transport_mode": random.choice(self.enhanced_templates["transport_modes"]), "landmarks": location_info["landmarks"] } print(f"\n增强版用户运动需求案例:") print(f"年龄:{case_info['age_range']}") print(f"职业:{case_info['occupation']} (收入:{case_info['income_level']})") print(f"运动经验:{case_info['experience_level']}") print(f"运动偏好:{case_info['sport_preference']} ({case_info['frequency']})") print(f"偏好时间:{case_info['preferred_time']}") print(f"特殊需求:{case_info['special_needs']}") print(f"预算范围:{case_info['budget_range']}") print(f"所在位置:{case_info['location']}") print(f"交通方式:{case_info['transport_mode']}") print("-" * 50) return case_info @smart_retry(RetryConfig(max_retries=3, base_delay=1.0)) def create_enhanced_profile(self, case_info=None): """创建增强的用户档案(带重试机制)""" if case_info is None: case_info = self.generate_enhanced_case() task_prompt = "生成一个详细真实的个性化运动需求描述,包含具体的个人背景、运动目标和详细要求" try: response = self.user_agent.run( task_prompt=task_prompt, meta_dict=case_info ) if not response or len(response.strip()) < 50: raise RetryableError("用户档案生成内容过短或为空") print("\nAI用户的详细需求描述:") print(response) print("-" * 50) return response, case_info except Exception as e: print(Fore.RED + f"用户档案生成失败: {e}") fallback_response = self._create_fallback_profile(case_info) print(Fore.YELLOW + "使用降级策略生成用户档案") return fallback_response, case_info def _create_fallback_profile(self, case_info: Dict) -> str: """降级策略:创建简洁用户档案""" return f"""我是一位{case_info['age_range']}的{case_info['occupation']},居住在{case_info['location']}。希望找到合适的{case_info['sport_preference']}场馆,{case_info['frequency']},时间偏好{case_info['preferred_time']}。预算{case_info['budget_range']},{case_info['transport_mode']}出行。特别需要{case_info['special_needs']}。作为{case_info['experience_level']}水平,希望找到合适的场馆。"""
class RecommendationValidator: def __init__(self): self.quality_metrics = { 'completeness': 0.0, 'relevance': 0.0, 'specificity': 0.0, 'actionability': 0.0 } def validate_recommendation(self, recommendation: str, user_needs: Dict) -> Dict[str, float]: """验证推荐结果的质量""" metrics = {} metrics['completeness'] = self._check_completeness(recommendation) metrics['relevance'] = self._check_relevance(recommendation, user_needs) metrics['specificity'] = self._check_specificity(recommendation) metrics['actionability'] = self._check_actionability(recommendation) metrics['overall_quality'] = sum(metrics.values()) / len(metrics) return metrics def _check_completeness(self, recommendation: str) -> float: """检查推荐的完整性""" required_elements = ['场馆名称', '地址', '联系方式', '营业时间', '价格'] found_elements = 0 for element in required_elements: if any(keyword in recommendation for keyword in self._get_element_keywords(element)): found_elements += 1 return found_elements / len(required_elements) def _check_relevance(self, recommendation: str, user_needs: Dict) -> float: """检查推荐与用户需求的相关性""" relevance_score = 0.0 sport_preference = user_needs.get('sport_preference', '') if sport_preference and sport_preference in recommendation: relevance_score += 0.4 location = user_needs.get('location', '') location_keywords = location.split('区')[0] if '区' in location else location if location_keywords in recommendation: relevance_score += 0.3 special_needs = user_needs.get('special_needs', '') if special_needs: need_keywords = ['室内', '交通', '价格', '教练', '设备', '停车'] for keyword in need_keywords: if keyword in special_needs and keyword in recommendation: relevance_score += 0.05 return min(1.0, relevance_score) def _check_specificity(self, recommendation: str) -> float: """检查推荐的具体性""" specificity_indicators = [ r'\d+元', r'\d+:\d+', r'地址[::]', r'电话[::]', r'\d+号线', ] found_indicators = 0 for pattern in specificity_indicators: if re.search(pattern, recommendation): found_indicators += 1 return found_indicators / len(specificity_indicators) def _check_actionability(self, recommendation: str) -> float: """检查推荐的可操作性""" actionable_elements = [ '预约', '联系', '到达', '乘坐', '步行', '开车' ] found_elements = 0 for element in actionable_elements: if element in recommendation: found_elements += 1 return min(1.0, found_elements / 3) def _get_element_keywords(self, element: str) -> List[str]: """获取元素对应的关键词""" keyword_map = { '场馆名称': ['体育馆', '健身房', '游泳馆', '球馆', '中心'], '地址': ['地址', '位于', '坐落', '路', '街', '区'], '联系方式': ['电话', '联系', '咨询', '预约'], '营业时间': ['营业', '开放', '时间', '小时'], '价格': ['价格', '费用', '收费', '元', '钱'] } return keyword_map.get(element, [])
class WorkforceSportsVenueRecommendationSystem: def __init__(self, model, use_mock_data_if_no_key=True): self.model = model self.ai_user = EnhancedAIUserAgent(model) self.memory_manager = IntelligentMemoryManager(keep_rate=0.9) self.search_toolkit = EnhancedSearchToolkit(delay_seconds=2, use_mock_data_if_no_key=use_mock_data_if_no_key) self.validator = RecommendationValidator() self.use_mock_data_if_no_key = use_mock_data_if_no_key print("🚀 Workforce版AI运动场馆推荐系统已初始化") print("✅ 智能记忆管理系统已加载") print("✅ 结果验证系统已启用") print(f"✅ 增强搜索工具包已配置({'启用' if use_mock_data_if_no_key else '禁用'}模拟数据备用模式)") self._create_workforce_agents() def _create_workforce_agents(self): """创建Workforce工作节点""" print("\n正在创建Workforce工作节点...") self.information_collector = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="运动场馆信息收集专员", content="""你是专业的运动场馆信息收集专员。你的职责是收集和整理运动场馆相关信息。
核心职责: 1. 收集北京地区运动场馆的基本信息(地址、开放时间、价格) 2. 搜集不同运动类型的场馆推荐(篮球、羽毛球、游泳、健身等) 3. 整理交通方式和路线信息(地铁线路、公交路线) 4. 收集场馆设施和服务信息(教练、设备、环境) 5. 提供实用的运动贴士(预约方式、注意事项)
搜索格策略: 1. 优先在用户指定区域搜索 2. 如果原始区域结果不足,自动扩展到相邻区域 3. 必要时进一步扩展到附近区域 4. 验证场馆信息的真实性,避免虚假信息 5. 优先推荐已验证的真实场馆
输出格式要求: - 按区域和运动类型整理信息 - 标明场馆的验证状态(已验证/待验证) - 提供具体的实用信息(时间、价格、地址、联系方式) - 说明搜索范围扩展情况 - 不要制定具体的推荐方案 - 专注于信息的准确性和真实性
重要限制: - 只收集信息,不做推荐决策 - 禁止编造或虚构场馆信息 - 优先使用已验证的真实场馆数据 - 对未验证信息要明确标注 - 如果搜索结果不足,要诚实说明并建议扩展搜索""" ), model=self.model, tools=[self.search_toolkit.safe_search_duckduckgo, self.search_toolkit.expanded_venue_search] ) self.roleplay_analyst = self._create_roleplay_analyst() self.recommendation_specialist = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="运动场馆推荐决策专员", content="""你是专业的运动场馆推荐决策专员。你的职责是基于收集的信息和用户需求分析,制定个性化推荐方案。
🎯 核心职责: 1. 分析用户需求与场馆信息的匹配度 2. 使用数学工具计算推荐评分和排序 3. 考虑地理位置、交通便利性、价格等因素 4. 制定个性化的场馆推荐方案 5. 提供详细的选择理由和建议
📊 决策维度: - 需求匹配度(运动类型、时间、频率) - 地理便利性(距离、交通) - 经济合理性(价格、性价比) - 服务质量(设施、教练、环境) - 用户偏好(特殊需求、个人喜好)
⚠️ 严格要求: - 只能推荐已验证的真实场馆 - 绝对禁止编造、虚构任何场馆信息 - 如果没有合适的已验证场馆,诚实说明情况 - 不要创造虚假的场馆名称、地址或联系方式 - 所有推荐必须基于提供的已验证场馆数据
📋 输出要求: - 只推荐已验证数据库中的真实场馆 - 每个推荐必须标明验证状态 - 如果已验证场馆不足,说明实际情况 - 提供真实可行的行动建议 - 使用数学工具进行量化分析时只基于真实数据""" ), model=self.model, tools=MathToolkit().get_tools() ) self.quality_assessor = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="推荐质量评估专员", content="""你是专业的推荐质量评估专员。你的职责是评估推荐结果的质量并提出改进建议。
🎯 评估维度: 1. 完整性:推荐信息是否完整(地址、时间、价格、联系方式) 2. 相关性:推荐是否符合用户具体需求 3. 可操作性:用户是否能够根据推荐采取行动 4. 实用性:推荐是否具有实际价值 5. 个性化程度:是否体现了用户的个性化需求
📊 评分标准: - 每个维度0-1分,总分0-5分 - 4分以上为优秀推荐 - 3-4分为良好推荐 - 3分以下需要改进
📋 输出要求: - 提供详细的质量评估报告 - 指出推荐的优点和不足 - 提供具体的改进建议 - 给出最终质量评分 - 输出格式必须为JSON格式,包含评分和改进建议""" ), model=self.model ) self.priority_consultant = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="需求优先级确认专员", content="""你是专业的需求优先级确认专员。当原始搜索区域找不到合适场馆时,你的职责是帮助用户确认需求优先级。
🎯 核心职责: 1. 分析用户的多项需求和约束条件 2. 引导用户明确各项需求的重要程度 3. 识别哪些需求是必须满足的,哪些可以妥协 4. 为扩大搜索范围提供决策依据 5. 确保用户理解妥协的必要性和合理性
� 确认策略: - 列出用户的所有需求项目 - 询问每项需求的重要程度(必须满足/希望满足/可以妥协) - 解释扩大搜索范围的必要性 - 提供妥协建议和替代方案 - 确保用户做出知情决策
📋 输出要求: - 清晰列出需求优先级分类 - 说明扩大搜索的理由和范围 - 提供具体的妥协建议 - 征求用户的确认和同意""" ), model=self.model ) self.improvement_executor = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="推荐改进执行专员", content="""你是专业的推荐改进执行专员。你的职责是根据质量评估的改进建议,优化和完善推荐结果。
🎯 核心职责: 1. 分析质量评估专员提出的改进建议 2. 识别推荐结果中的不足和缺陷 3. 补充缺失的信息和细节 4. 优化推荐的表达方式和结构 5. 确保最终输出符合统一模板标准 6. 验证推荐场馆的真实性,避免虚假信息
🔧 改进策略: - 信息补全:基于已验证场馆数据补充信息 - 真实性验证:确保推荐场馆真实存在 - 结构优化:重新组织推荐内容的逻辑结构 - 个性化增强:加强针对用户特定需求的个性化建议 - 可操作性提升:提供更具体的行动指导 - 格式标准化:使用统一的输出模板
⚠️ 严格禁止: - 绝对不能编造、虚构任何场馆信息 - 不能创建虚假的场馆名称、地址、电话 - 不能生成不存在的场馆数据 - 如果已验证场馆不足,必须诚实说明
📋 输出要求: - 必须采用统一的推荐模板格式 - 只使用已验证的真实场馆数据 - 清楚标明每个场馆的验证状态 - 如果场馆数量不足,提供替代建议 - 生成诚实、真实的推荐报告""" ), model=self.model ) print("✅ 信息收集专员已创建(配备增强搜索工具)") print("✅ RolePlaying需求分析专员已创建") print("✅ 推荐决策专员已创建(配备数学工具)") print("✅ 质量评估专员已创建") print("✅ 需求优先级确认专员已创建") print("✅ 改进执行专员已创建(真实性验证+统一模板输出)") def _create_roleplay_analyst(self): """创建RolePlaying需求分析专员""" roleplay_session = RolePlaying( assistant_role_name="资深运动需求分析师", assistant_agent_kwargs=dict( model=self.model ), user_role_name="运动咨询用户", user_agent_kwargs=dict( model=self.model ), task_prompt="""作为资深运动需求分析师,请深入分析用户的运动需求和偏好。
🎯 核心职责: 1. 深入挖掘用户的真实运动需求和动机 2. 分析用户的生活方式和时间安排 3. 识别用户的潜在需求和担忧 4. 评估用户的运动经验和能力水平 5. 理解用户的预算和价值观
💡 分析方法: - 通过对话深入了解用户背景 - 识别用户的显性和隐性需求 - 分析用户的决策因素和优先级 - 评估用户的实际约束条件
📋 输出要求: - 提供详细的用户需求分析报告 - 识别关键决策因素 - 提出针对性的建议方向""", with_task_specify=False, output_language='中文' ) return roleplay_session def _extract_venue_data_from_recommendation(self, recommendation: str, case_info: Dict) -> List[Dict]: """从推荐文本中提取场馆数据(优先使用平台场馆)""" sport = case_info.get('sport_preference', '') location = case_info.get('location', '') area = self.search_toolkit._extract_area_from_location(location) platform_venues = [] dianping_venues = self.search_toolkit.multi_platform_data.search_dianping_venues(area, sport) for venue in dianping_venues: formatted_venue = { 'name': venue['name'], 'address': venue['address'], 'contact': '请通过大众点评平台联系', 'price': venue.get('price_range', '请咨询具体价格'), 'hours': venue.get('opening_hours', '请咨询营业时间'), 'transport': self._get_transport_info(venue['address']), 'score': str(venue.get('rating', 4.5)), 'reason': f"大众点评平台免验证{sport}场馆,用户评价{venue.get('reviews', 0)}条", 'features': ', '.join(venue.get('features', [f'专业{sport}场馆'])), 'notes': '大众点评平台场馆,用户评价丰富', 'verified': True, 'platform': '大众点评' } platform_venues.append(formatted_venue) meituan_venues = self.search_toolkit.multi_platform_data.search_meituan_venues(area, sport) for venue in meituan_venues: formatted_venue = { 'name': venue['name'], 'address': venue['address'], 'contact': '请通过美团平台联系', 'price': venue.get('price_range', '请咨询具体价格'), 'hours': venue.get('opening_hours', '请咨询营业时间'), 'transport': self._get_transport_info(venue['address']), 'score': str(venue.get('rating', 4.3)), 'reason': f"美团平台免验证{sport}场馆,{venue.get('group_discount', '团购优惠')}", 'features': ', '.join(venue.get('features', [f'专业{sport}场馆'])), 'notes': f"美团平台场馆,{venue.get('group_discount', '团购优惠丰富')}", 'verified': True, 'platform': '美团' } platform_venues.append(formatted_venue) wechat_venues = self.search_toolkit.multi_platform_data.search_wechat_venues(area, sport) for venue in wechat_venues: formatted_venue = { 'name': venue['name'], 'address': venue['address'], 'contact': f"请通过微信小程序'{venue.get('miniprogram_name', '相关小程序')}'联系", 'price': venue.get('price_range', '请咨询具体价格'), 'hours': venue.get('opening_hours', '请咨询营业时间'), 'transport': self._get_transport_info(venue['address']), 'score': str(venue.get('rating', 4.5)), 'reason': f"微信小程序平台免验证{sport}场馆,在线预约便利", 'features': ', '.join(venue.get('features', [f'专业{sport}场馆'])), 'notes': '微信小程序平台场馆,在线预约便利', 'verified': True, 'platform': '微信小程序' } platform_venues.append(formatted_venue) if len(platform_venues) < 3: verified_venues = self.search_toolkit.verification_system.get_real_venues_in_area(area, sport) if len(verified_venues) < 2: expansion_areas = self.search_toolkit.geo_manager.get_expansion_areas(area, expansion_level=2) for exp_area in expansion_areas[1:]: exp_venues = self.search_toolkit.verification_system.get_real_venues_in_area(exp_area, sport) verified_venues.extend(exp_venues) for venue in verified_venues[:3-len(platform_venues)]: formatted_venue = { 'name': venue['name'], 'address': venue['address'], 'contact': '请通过官方渠道联系', 'price': '请咨询具体价格', 'hours': '请咨询营业时间', 'transport': self._get_transport_info(venue['address']), 'score': '8.0', 'reason': f"已验证的真实{sport}场馆", 'features': f"支持{sport}运动的专业场馆", 'notes': '建议提前电话确认开放时间和价格', 'verified': True, 'platform': '已验证' } platform_venues.append(formatted_venue) if not platform_venues: print("⚠️ 警告:未找到平台场馆和已验证场馆") else: print(f"✅ 找到{len(platform_venues)}个推荐场馆(优先使用平台场馆)") return platform_venues[:5] def _get_transport_info(self, address: str) -> str: """根据地址生成交通信息""" if '东城区' in address: return '地铁1号线、2号线、5号线等多条线路可达' elif '西城区' in address: return '地铁1号线、2号线、4号线等多条线路可达' elif '朝阳区' in address: return '地铁6号线、10号线、14号线等多条线路可达' elif '海淀区' in address: return '地铁4号线、10号线、13号线等多条线路可达' elif '丰台区' in address: return '地铁9号线、10号线、14号线等多条线路可达' elif '石景山区' in address: return '地铁1号线、6号线可达' else: return '请查询具体交通路线' def _extract_quality_score(self, assessment: str) -> float: """从评估文本中提取质量分数""" try: import re score_match = re.search(r'(\d+\.?\d*)/5', assessment) if score_match: return float(score_match.group(1)) return 4.0 except: return 4.0 def _create_workforce(self): """创建Workforce实例""" tasks = [ Task( content="收集用户所需运动类型的场馆信息", id="task_1", additional_info="使用搜索工具收集最新的场馆信息" ), Task( content="深度分析用户运动需求和偏好", id="task_2", additional_info="通过RolePlaying进行需求挖掘" ), Task( content="基于信息和需求分析制定推荐方案", id="task_3", additional_info="使用数学工具进行量化决策" ), Task( content="评估推荐质量并提出改进建议", id="task_4", additional_info="全面评估推荐结果的质量" ) ] workers = [ self.information_collector, self.roleplay_analyst, self.recommendation_specialist, self.quality_assessor ] workforce = Workforce( workers=workers, tasks=tasks ) return workforce @smart_retry(RetryConfig(max_retries=3, base_delay=2.0, backoff_factor=1.5)) def start_workforce_recommendation(self, use_random_case=True, custom_case=None): """启动Workforce版推荐流程(增强版)""" print("\n🎯 Workforce版AI运动场馆推荐系统启动") print("=" * 80) if use_random_case: user_description, case_info = self.ai_user.create_enhanced_profile() else: user_description, case_info = self.ai_user.create_enhanced_profile(custom_case) relevant_history = self.memory_manager.get_relevant_history(user_description, max_records=3) history_context = "" if relevant_history: history_context = "📚 相关历史经验:\n" for i, (content, score) in enumerate(relevant_history): history_context += f"历史记录{i+1}(相关度:{score:.2f}): {content[:200]}...\n" print(f"🧠 找到{len(relevant_history)}条相关历史记录") else: history_context = "这是新用户的首次咨询" print("🆕 新用户首次咨询") print(f"\n🔄 开始Workforce协作流程...") print("=" * 60) print(f"\n📊 阶段1:信息收集(智能扩展搜索)") print("-" * 40) expanded_search_results = self.search_toolkit.optimized_venue_search(case_info, case_info['location']) collection_prompt = f""" 请收集以下用户需求相关的运动场馆信息: 用户需求:{user_description} 搜索重点: - 运动类型:{case_info['sport_preference']} - 位置区域:{case_info['location']} - 特殊需求:{case_info['special_needs']} - 预算范围:{case_info['budget_range']} 扩展搜索结果: - 搜索区域:{expanded_search_results['search_areas']} - 扩展级别:{expanded_search_results['expansion_level']} - 已验证场馆数量:{len(expanded_search_results['verified_venues'])} - 大众点评场馆数量:{len(expanded_search_results['dianping_venues'])}(免验证) - 美团场馆数量:{len(expanded_search_results['meituan_venues'])}(免验证) - 微信小程序场馆数量:{len(expanded_search_results['wechat_venues'])}(免验证) 已验证的真实场馆: {json.dumps(expanded_search_results['verified_venues'], ensure_ascii=False, indent=2)} 大众点评平台场馆(免验证): {json.dumps(expanded_search_results['dianping_venues'], ensure_ascii=False, indent=2)} 美团平台场馆(免验证): {json.dumps(expanded_search_results['meituan_venues'], ensure_ascii=False, indent=2)} 微信小程序平台场馆(免验证): {json.dumps(expanded_search_results['wechat_venues'], ensure_ascii=False, indent=2)} 请基于扩展搜索结果和所有场馆信息(包括平台免验证场馆),整理完整的场馆信息。 重要: 1. 已验证场馆和平台场馆(大众点评/美团/微信小程序)都可以直接使用 2. 平台场馆已经过平台审核,无需额外验证 3. 不要编造任何场馆数据 """ collection_result = self.information_collector.step( BaseMessage.make_user_message(role_name="User", content=collection_prompt) ) collected_info = collection_result.msg.content if collection_result.msg else "信息收集失败" total_platform_venues = (len(expanded_search_results['dianping_venues']) + len(expanded_search_results['meituan_venues']) + len(expanded_search_results['wechat_venues'])) total_venues = len(expanded_search_results['verified_venues']) + total_platform_venues print(f"信息收集完成:搜索了{len(expanded_search_results['search_areas'])}个区域") print(f"找到{len(expanded_search_results['verified_venues'])}个已验证场馆") print(f"找到{total_platform_venues}个平台免验证场馆(大众点评{len(expanded_search_results['dianping_venues'])}个,美团{len(expanded_search_results['meituan_venues'])}个,微信{len(expanded_search_results['wechat_venues'])}个)") print(f"总计{total_venues}个可用场馆") needs_priority_check = ( total_venues < 3 or expanded_search_results['expansion_level'] > 0 ) priority_confirmation = "" if needs_priority_check: print(f"\n阶段1.5:需求优先级确认") print("-" * 40) priority_prompt = f""" 由于在用户指定区域({case_info['location']})找到的合适场馆较少, 我们已经扩展搜索到{len(expanded_search_results['search_areas'])}个区域。 用户需求: - 运动类型:{case_info['sport_preference']}(核心需求,不可妥协) - 位置区域:{case_info['location']}(已扩展搜索) - 特殊需求:{case_info['special_needs']} - 预算范围:{case_info['budget_range']} - 运动时间:{case_info['preferred_time']} - 交通方式:{case_info['transport_mode']} 请帮助用户确认需求优先级: 1. 分析哪些需求是必须满足的 2. 哪些需求可以适当妥协 3. 为扩大搜索范围提供建议 4. 说明妥协的合理性和必要性 目标:在保证核心需求的前提下,找到最适合的运动场馆。 """ priority_result = self.priority_consultant.step( BaseMessage.make_user_message(role_name="User", content=priority_prompt) ) priority_confirmation = priority_result.msg.content if priority_result.msg else "优先级确认失败" print(f"需求优先级确认完成") print(f"\n阶段2:深度需求分析(RolePlaying模式)") print("-" * 40) analysis_prompt = f""" 请对以下用户进行深度需求分析: 用户基本信息:{user_description} 请通过对话深入了解用户的真实需求、偏好和约束条件。 """ roleplay_input = self.roleplay_analyst.init_chat() analysis_results = [] for i in range(3): try: assistant_response, user_response = self.roleplay_analyst.step(roleplay_input) if assistant_response.msg: analysis_results.append(assistant_response.msg.content) print(f"分析师第{i+1}轮:{assistant_response.msg.content[:150]}...") if user_response.msg: print(f"用户第{i+1}轮:{user_response.msg.content[:150]}...") roleplay_input = assistant_response.msg else: break if assistant_response.terminated or user_response.terminated: break except Exception as e: print(f"RolePlaying第{i+1}轮出现问题:{e}") break needs_analysis = "\n".join(analysis_results) if analysis_results else "需求分析未完成" print(f"需求分析完成") print(f"\n阶段3:推荐决策制定(真实场馆优先)") print("-" * 40) decision_prompt = f""" 基于以下信息制定个性化推荐方案: 用户需求:{user_description} 收集的场馆信息:{collected_info} 深度需求分析:{needs_analysis} 需求优先级确认:{priority_confirmation} 已验证的真实场馆数据: {json.dumps(expanded_search_results['verified_venues'], ensure_ascii=False, indent=2)} 大众点评平台场馆(免验证): {json.dumps(expanded_search_results['dianping_venues'], ensure_ascii=False, indent=2)} 美团平台场馆(免验证): {json.dumps(expanded_search_results['meituan_venues'], ensure_ascii=False, indent=2)} 微信小程序平台场馆(免验证): {json.dumps(expanded_search_results['wechat_venues'], ensure_ascii=False, indent=2)} 搜索范围信息: - 搜索区域:{expanded_search_results['search_areas']} - 扩展级别:{expanded_search_results['expansion_level']} 请使用数学工具进行量化分析,制定3-5个具体的场馆推荐方案。 重要要求: 1. 优先推荐平台免验证场馆(大众点评/美团/微信小程序),其次推荐已验证场馆 2. 平台场馆已经过平台审核,质量有保障,可以直接推荐 3. 每个推荐必须包含:场馆名称、地址、联系方式、价格、推荐理由 4. 标明场馆的验证状态(平台免验证/已验证) 5. 说明推荐场馆与用户需求的匹配度 6. 如果推荐了扩展区域的场馆,要说明交通方案 7. 绝对不要编造或虚构场馆信息 推荐优先级:大众点评场馆 > 美团场馆 > 微信小程序场馆 > 已验证场馆 > 其他场馆 """ decision_result = self.recommendation_specialist.step( BaseMessage.make_user_message(role_name="User", content=decision_prompt) ) recommendation = decision_result.msg.content if decision_result.msg else "推荐生成失败" print(f"推荐方案完成:基于{len(expanded_search_results['verified_venues'])}个已验证场馆") print(f"\n阶段4:质量评估") print("-" * 40) assessment_prompt = f""" 请评估以下推荐结果的质量: 原始用户需求:{user_description} 最终推荐方案:{recommendation} 请从完整性、相关性、可操作性、实用性、个性化程度等维度进行评估, 并给出具体的评分和改进建议。 请以JSON格式输出评估结果,包含: {{ "scores": {{ "completeness": 0.0-1.0, "relevance": 0.0-1.0, "actionability": 0.0-1.0, "practicality": 0.0-1.0, "personalization": 0.0-1.0 }}, "overall_score": 0.0-5.0, "improvement_suggestions": ["建议1", "建议2", "建议3"], "missing_information": ["缺失信息1", "缺失信息2"], "optimization_areas": ["优化方向1", "优化方向2"] }} """ assessment_result = self.quality_assessor.step( BaseMessage.make_user_message(role_name="User", content=assessment_prompt) ) quality_assessment = assessment_result.msg.content if assessment_result.msg else "质量评估失败" print(f"质量评估完成:{quality_assessment[:200]}...") print(f"\n阶段5:改进执行与标准化输出") print("-" * 40) improvement_prompt = f""" 根据质量评估的结果,请优化和完善推荐方案: 原始用户信息:{case_info} 用户需求描述:{user_description} 收集的场馆信息:{collected_info} 需求分析结果:{needs_analysis} 初始推荐方案:{recommendation} 质量评估结果:{quality_assessment} 请根据评估建议进行以下改进: 1. 补充缺失的场馆详细信息 2. 优化推荐内容的结构和逻辑 3. 增强个性化建议 4. 提供更具体的行动指导 5. 使用统一的标准化模板格式 请生成最终的标准化推荐报告,确保信息完整、结构清晰、个性化程度高。 """ improvement_result = self.improvement_executor.step( BaseMessage.make_user_message(role_name="User", content=improvement_prompt) ) final_recommendation = improvement_result.msg.content if improvement_result.msg else "改进执行失败" print(f"改进执行完成:{final_recommendation[:200]}...") try: venues_data = self._extract_venue_data_from_recommendation(final_recommendation, case_info) standardized_report = RecommendationTemplate.format_final_recommendation( user_info=case_info, venues=venues_data, analysis=needs_analysis[:500] + "..." if len(needs_analysis) > 500 else needs_analysis ) print(f"标准化报告生成完成") except Exception as e: print(f"标准化处理出现问题,使用改进后的推荐: {e}") standardized_report = final_recommendation quality_metrics = self.validator.validate_recommendation(final_recommendation, case_info) print(f"\n📊 推荐质量评估:") print(f"完整性: {quality_metrics['completeness']:.2f}") print(f"相关性: {quality_metrics['relevance']:.2f}") print(f"具体性: {quality_metrics['specificity']:.2f}") print(f"可操作性: {quality_metrics['actionability']:.2f}") print(f"总体质量: {quality_metrics['overall_quality']:.2f}") metadata = { "case_info": case_info, "workflow_type": "workforce_enhanced", "stages_completed": 5, "quality_score": self._extract_quality_score(quality_assessment), "quality_metrics": quality_metrics, "tool_usage": "workforce_collaboration", "history_context_used": len(relevant_history) > 0, "timestamp": datetime.now().isoformat() } self.memory_manager.write_enhanced_record( user_description, standardized_report, metadata ) user_id = f"user_{hash(user_description) % 10000}" self.memory_manager.update_user_preferences(user_id, case_info) print("\n💾 增强版Workforce协作结果已保存到记忆系统") print("🎯 用户偏好模型已更新") print("=" * 80) return { "case_info": case_info, "user_description": user_description, "search_results": expanded_search_results, "priority_confirmation": priority_confirmation, "collected_info": collected_info, "needs_analysis": needs_analysis, "initial_recommendation": recommendation, "quality_assessment": quality_assessment, "final_recommendation": final_recommendation, "standardized_report": standardized_report, "verified_venues_count": len(expanded_search_results['verified_venues']), "platform_venues_count": { "dianping": len(expanded_search_results['dianping_venues']), "meituan": len(expanded_search_results['meituan_venues']), "wechat": len(expanded_search_results['wechat_venues']) }, "total_venues_count": len(expanded_search_results['verified_venues']) + len(expanded_search_results['dianping_venues']) + len(expanded_search_results['meituan_venues']) + len(expanded_search_results['wechat_venues']), "search_areas": expanded_search_results['search_areas'], "expansion_level": expanded_search_results['expansion_level'], "timestamp": datetime.now().isoformat() } def get_enhanced_memory_analysis(self): """增强的记忆分析(从优化版本适配)""" all_history = self.memory_manager.chat_history.retrieve(window_size=20) if not all_history: print("📊 暂无历史记录") return print("\n📊 增强记忆系统分析报告:") print("=" * 60) user_requests = [] recommendations = [] quality_scores = [] for record in all_history: content = record.memory_record.message.content if record.memory_record.message.role_name == "User": user_requests.append(content) if "[元数据:" in content: try: metadata_str = content.split("[元数据:")[1].split("]")[0] metadata = json.loads(metadata_str) if "quality_metrics" in metadata: quality_scores.append(metadata["quality_metrics"]["overall_quality"]) except: pass elif record.memory_record.message.role_name == "SportsConsultant": recommendations.append(content) print(f"📈 总处理请求数: {len(user_requests)}") print(f"📋 总推荐方案数: {len(recommendations)}") if quality_scores: avg_quality = sum(quality_scores) / len(quality_scores) print(f"⭐ 平均推荐质量: {avg_quality:.2f}") print(f"🏆 最高质量分数: {max(quality_scores):.2f}") print(f"📉 最低质量分数: {min(quality_scores):.2f}") sport_preferences = {} location_preferences = {} for request in user_requests: sports = ["篮球", "羽毛球", "游泳", "健身", "跑步", "足球", "网球", "瑜伽", "乒乓球", "攀岩"] for sport in sports: if sport in request: sport_preferences[sport] = sport_preferences.get(sport, 0) + 1 locations = ["朝阳", "海淀", "东城", "西城", "丰台", "石景山"] for location in locations: if location in request: location_preferences[location] = location_preferences.get(location, 0) + 1 if sport_preferences: print(f"🏃 运动偏好排行: {dict(sorted(sport_preferences.items(), key=lambda x: x[1], reverse=True))}") if location_preferences: print(f"📍 区域偏好排行: {dict(sorted(location_preferences.items(), key=lambda x: x[1], reverse=True))}") print("=" * 60) def benchmark_system_performance(self, num_tests: int = 3): """系统性能基准测试(从优化版本适配)""" print(f"\n🧪 开始Workforce系统性能基准测试 ({num_tests}次测试)") print("=" * 60) results = [] total_start_time = datetime.now() for i in range(num_tests): print(f"\n测试 {i+1}/{num_tests}") start_time = datetime.now() result = self.start_workforce_recommendation(use_random_case=True) end_time = datetime.now() duration = (end_time - start_time).total_seconds() quality_score = 0.0 try: if 'quality_assessment' in result: quality_score = self._extract_quality_score(result['quality_assessment']) except: quality_score = 0.5 test_result = { "test_id": i+1, "duration": duration, "quality_score": quality_score, "total_venues": result.get('total_venues_count', 0), "verified_venues": result.get('verified_venues_count', 0), "platform_venues": sum(result.get('platform_venues_count', {}).values()), "expansion_level": result.get('expansion_level', 0) } results.append(test_result) print(f"⏱️ 耗时: {duration:.2f}秒") print(f"⭐ 质量: {test_result['quality_score']:.2f}") print(f"🏢 场馆数: {test_result['total_venues']}个") total_end_time = datetime.now() total_duration = (total_end_time - total_start_time).total_seconds() avg_duration = sum(r["duration"] for r in results) / len(results) avg_quality = sum(r["quality_score"] for r in results) / len(results) avg_venues = sum(r["total_venues"] for r in results) / len(results) avg_verified = sum(r["verified_venues"] for r in results) / len(results) avg_platform = sum(r["platform_venues"] for r in results) / len(results) print(f"\n📊 Workforce系统基准测试结果汇总:") print(f"总耗时: {total_duration:.2f}秒") print(f"平均单次耗时: {avg_duration:.2f}秒") print(f"平均质量分数: {avg_quality:.2f}") print(f"平均场馆数量: {avg_venues:.1f}个") print(f"平均已验证场馆: {avg_verified:.1f}个") print(f"平均平台场馆: {avg_platform:.1f}个") print(f"系统稳定性: {'优秀' if avg_quality > 0.7 else '良好' if avg_quality > 0.5 else '需要改进'}") print("=" * 60) return results
def main(use_mock_data_if_no_key=True): """主函数 - 演示Workforce版运动场馆推荐系统 Args: use_mock_data_if_no_key: 是否在API密钥未设置或API调用失败时使用模拟数据作为备用 """ print("启动Workforce版运动场馆推荐系统") print("=" * 80) print(f"API调用模式: {'优先使用真实API,备用模拟数据' if use_mock_data_if_no_key else '仅使用真实API,无备用数据'}") print("=" * 80) try: system = WorkforceSportsVenueRecommendationSystem(model, use_mock_data_if_no_key=use_mock_data_if_no_key) result = system.start_workforce_recommendation(use_random_case=True) print("\n增强版Workforce协作完成!") print("=" * 80) print(f"用户画像:{result['case_info']['occupation']} - {result['case_info']['sport_preference']}") print(f"信息收集:完成(搜索{len(result['search_areas'])}个区域)") print(f"扩展搜索:级别{result['expansion_level']}(找到{result['verified_venues_count']}个已验证场馆)") print(f"平台场馆:大众点评{result['platform_venues_count']['dianping']}个,美团{result['platform_venues_count']['meituan']}个,微信{result['platform_venues_count']['wechat']}个(免验证)") print(f"总计场馆:{result['total_venues_count']}个可用场馆") if result['priority_confirmation']: print(f"优先级确认:完成") print(f"需求分析:完成(RolePlaying模式)") print(f"推荐决策:完成(基于真实场馆数据)") print(f"质量评估:完成") print(f"改进执行:完成(真实性验证+统一模板输出)") print(f"\n初始推荐方案:") print("-" * 60) print(result['initial_recommendation'][:300] + "..." if len(result['initial_recommendation']) > 300 else result['initial_recommendation']) print(f"\n质量评估结果:") print("-" * 60) print(result['quality_assessment'][:300] + "..." if len(result['quality_assessment']) > 300 else result['quality_assessment']) print(f"\n改进后推荐方案:") print("-" * 60) print(result['final_recommendation'][:300] + "..." if len(result['final_recommendation']) > 300 else result['final_recommendation']) print(f"\n标准化推荐报告:") print("-" * 60) print(result['standardized_report']) print(f"\n🧠 记忆系统分析演示:") print("=" * 80) system.get_enhanced_memory_analysis() print(f"\n🧪 是否进行系统性能基准测试?") print("这将运行多次推荐流程来评估系统性能") print("输入 'y' 或 'yes' 确认,其他键跳过") print("\n🎉 Workforce版增强系统演示完成!") print("✅ 智能记忆管理系统成功实现用户偏好学习") print("✅ 多Agent协作提升了推荐准确度和完整性") print("✅ 结果验证系统确保了推荐质量") print("✅ 真实平台数据源提供了可靠的场馆信息") except Exception as e: print(Fore.RED + f"❌ 系统运行出错: {e}") traceback.print_exc()
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='Workforce版运动场馆推荐系统') parser.add_argument('--no-mock', action='store_true', help='禁用模拟数据备用模式,仅使用真实API') parser.add_argument('--mock', action='store_true', help='启用模拟数据备用模式(默认)') args = parser.parse_args() use_mock_data = not args.no_mock if not args.mock else True main(use_mock_data_if_no_key=use_mock_data)
|