banner
毅种循环

毅种循环

请将那贻笑罪过以逐字吟咏 如对冰川投以游丝般倾诉

泄露地图AK一键检测利用

背景#

在一次 APK 反编译的时候发现存在一个关于 map 的 KEY,试了几个接口无法调用成功,具体不知道哪个对应的接口和服务,于是配合 GPT 写了这份代码,旨在实现自动化 AK 利用。

代码#

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import argparse
import requests
import random
import time
import certifi
import json
import traceback
import os
from termcolor import colored
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.exceptions import InsecureRequestWarning

# --- Banner ---
BANNER = """


                                                                                           
                            by: 地图API密钥全自动检测工具
"""

# --- Configuration ---

# Emojis for output enhancement
EMOJI_DETECT = "🔍"
EMOJI_PLATFORM = "🏷️"
EMOJI_KEY_FORMAT = "🔑"
EMOJI_SERVICE = "▶️"
EMOJI_SUCCESS = "✅"
EMOJI_FAIL = "❌"
EMOJI_CLOCK = "⏱️"
EMOJI_INFO = "ℹ️"
EMOJI_ERROR = "❗"
EMOJI_WARNING = "⚠️"
EMOJI_NETWORK = "🌐"
EMOJI_QUOTA = "🚦"
EMOJI_PERM = "🔒"
EMOJI_WRITE = "💾"
EMOJI_TOOL = "🛠️"


# Disable SSL warnings (use cautiously)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Safe coordinate bounds
SAFE_BOUNDS = {
    'global': {'lon_min': -180, 'lon_max': 180, 'lat_min': -90, 'lat_max': 90},
    'china': {'lon_min': 73.66, 'lon_max': 135.05, 'lat_min': 3.86, 'lat_max': 53.55}
}

# Test address pool
TEST_ADDRESS_POOL = [
    "北京市朝阳区望京soho",
    "上海市浦东新区陆家嘴环路1288号",
    "广州市天河区珠江新城临江大道5号",
    "深圳市福田区深南大道6001号",
    "成都市锦江区红星路三段1号"
]

# --- Helper Classes ---

class GeoGenerator:
    """Generates random geographic data."""
    @staticmethod
    def random_coord(region='china'):
        bounds = SAFE_BOUNDS[region]
        return (
            round(random.uniform(bounds['lat_min'], bounds['lat_max']), 6),
            round(random.uniform(bounds['lon_min'], bounds['lon_max']), 6)
        )

    @staticmethod
    def random_ip():
        # Generate a public IP range more likely to be locatable
        while True:
            ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
            parts = [int(p) for p in ip.split('.')]
            if parts[0] == 10: continue
            if parts[0] == 127: continue
            if parts[0] == 172 and 16 <= parts[1] <= 31: continue
            if parts[0] == 192 and parts[1] == 168: continue
            if parts[0] >= 224: continue
            return ip


class KeyValidator:
    """Validates key format and detects platform."""
    @staticmethod
    def detect_platform(key: str) -> Tuple[str, str]:
        """Detects platform and provides format description."""
        if re.match(r'^[0-9a-fA-F]{32}$', key):
            return 'amap', '高德 (32位 Hex)'
        if re.match(r'^[A-Za-z0-9]{32}$', key):
            if re.match(r'^[0-9a-fA-F]{32}$', key):
                 return 'ambiguous', '可能是高德或百度 (纯Hex字符)'
            return 'baidu', '百度 (32位 Alnum)'
        return 'unknown', '未知格式'


class APITester:
    """Executes API test calls."""
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0'
        ]

    def _generate_service_configs(self, platform: str) -> Dict:
         """Generates service configs dynamically for fresh random data."""
         coord = GeoGenerator.random_coord()
         random_addr = random.choice(TEST_ADDRESS_POOL)
         random_ip = GeoGenerator.random_ip()

         if platform == 'amap':
            return {
                "静态地图": {
                    "url": "https://restapi.amap.com/v3/staticmap",
                    "params": {"location": f"{coord[1]},{coord[0]}", "zoom": 10, "size": "400*300", "markers": f"mid,,A:{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "地理编码": {
                    "url": "https://restapi.amap.com/v3/geocode/geo",
                    "params": {"address": random_addr}
                },
                "逆地理编码": {
                    "url": "https://restapi.amap.com/v3/geocode/regeo",
                    "params": {"location": f"{coord[1]},{coord[0]}"}
                },
                "路径规划": {
                    "url": "https://restapi.amap.com/v3/direction/driving",
                    "params": {"origin": "116.481028,39.989643", "destination": "116.434446,39.90816"}
                },
                "IP定位": {
                    "url": "https://restapi.amap.com/v3/ip",
                    "params": {"ip": random_ip}
                }
            }
         elif platform == 'baidu':
            return {
                "静态地图": {
                    "url": "https://api.map.baidu.com/staticimage/v2",
                    "params": {"center": f"{coord[1]},{coord[0]}", "zoom": 10, "width": 400, "height": 300, "markers": f"{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "地理编码": {
                    "url": "https://api.map.baidu.com/geocoding/v3/",
                    "params": {"address": random_addr, "output": "json"}
                },
                "逆地理编码": {
                    "url": "https://api.map.baidu.com/reverse_geocoding/v3/",
                    "params": {"location": f"{coord[0]},{coord[1]}", "output": "json"}
                },
                "路径规划": {
                    "url": "https://api.map.baidu.com/direction/v2/driving",
                    "params": {"origin": "40.01116,116.339303", "destination": "39.936404,116.452562"}
                },
                "IP定位": {
                    "url": "https://api.map.baidu.com/location/ip",
                    "params": {"ip": random_ip, "coor": "bd09ll"}
                }
            }
         return {}

    def _call_api(self, url: str, params: dict, platform: str, service_name: str, is_static: bool = False) -> Tuple[bool, dict]:
        """Executes a single API request with retries."""
        current_params = params.copy()
        retries = current_params.pop('retry', 2) + 1
        detail = {'params': params, 'latency': None, 'error': None, 'error_info': None, 'response': None}

        for attempt in range(retries):
            response = None
            try:
                headers = {'User-Agent': random.choice(self.user_agents)}
                start_time = time.time()
                response = requests.get(
                    url,
                    params=current_params,
                    headers=headers,
                    timeout=15,
                    verify=certifi.where(),
                    stream=is_static
                )
                latency = round((time.time() - start_time) * 1000, 2)
                detail['latency'] = latency

                content_type = response.headers.get('Content-Type', '').lower()

                if is_static and response.status_code == 200 and content_type.startswith('image/'):
                    detail['response'] = {'content_type': content_type, 'status_code': 200}
                    response.close()
                    return True, detail

                if response.status_code != 200:
                    error_msg = f"HTTP {response.status_code}"
                    try:
                        error_data = response.json()
                        error_msg += f". API Msg: {json.dumps(error_data, ensure_ascii=False)}"
                    except (json.JSONDecodeError, requests.exceptions.RequestException):
                        error_msg += f". Response: {response.text[:200]}..."
                    detail['error'] = error_msg
                    raise requests.HTTPError(error_msg, response=response)

                data = None
                try:
                    if 'application/json' in content_type or 'text/javascript' in content_type:
                         data = response.json()
                    elif platform == 'amap' and service_name == "IP定位" and 'text/html' in content_type and response.text.startswith('{') and response.text.endswith('}'):
                         data = json.loads(response.text)
                    else:
                         raise ValueError(f"非预期响应类型: {content_type}")

                    detail['response'] = data

                    success = False
                    error_info = ""
                    if platform == 'amap':
                        success = str(data.get('status')) == '1'
                        if not success: error_info = data.get('info', '未知高德错误') + f" (infocode: {data.get('infocode', '')})"
                    elif platform == 'baidu':
                        success = data.get('status') == 0
                        if not success: error_info = data.get('message', '未知百度错误') + f" (status: {data.get('status', '')})"
                    else: error_info = "未知平台错误"

                    detail['error_info'] = error_info if not success else None
                    return success, detail

                except json.JSONDecodeError as e:
                    detail['error'] = f"JSON解析失败 ({content_type}): {str(e)} | 响应: {response.text[:200]}..."
                    return False, detail
                except ValueError as e:
                    detail['error'] = f"{str(e)} | 响应: {response.text[:200]}..."
                    return False, detail

            except requests.exceptions.Timeout:
                detail['error'] = "请求超时"
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except requests.exceptions.RequestException as e:
                detail['error'] = f"请求错误: {type(e).__name__}" + (f" (Status: {response.status_code})" if response else "")
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except Exception as e:
                 detail['error'] = f"未知处理错误: {type(e).__name__}: {str(e)}"
                 print(f"{EMOJI_ERROR} {colored('内部脚本错误', 'red')}: {traceback.format_exc()[:500]}...")
                 return False, detail
            finally:
                if response:
                    response.close()

        detail['error'] = f"超过最大重试次数 ({retries}). 最后错误: {detail['error']}"
        return False, detail

    def test_service(self, platform: str, service_name: str, key: str, service_config: dict) -> Tuple[bool, dict]:
        """Tests a single service with its config."""
        params = service_config['params'].copy()
        params['key' if platform == 'amap' else 'ak'] = key
        is_static = service_config.get('is_static', False)
        return self._call_api(service_config['url'], params, platform, service_name, is_static)


class ReportGenerator:
    """Generates console output for test results."""
    @staticmethod
    def print_result(service: str, platform: str, success: bool, detail: dict):
        status_icon = EMOJI_SUCCESS if success else EMOJI_FAIL
        status_color = 'green' if success else 'red'
        service_status = colored("成功" if success else "失败", status_color)

        print(f"{EMOJI_SERVICE} {service.ljust(7)} {status_icon} {service_status}")

        indent = "  │"

        latency = detail.get('latency')
        latency_str = f"{latency:.2f} ms" if latency is not None else "N/A"
        print(f"{indent} {EMOJI_CLOCK} 耗时: {latency_str}")

        params_str = " | ".join([f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={str(v)[:30]}{'...' if len(str(v))>30 else ''}"
                                 for k, v in detail.get('params', {}).items() if k not in ['key', 'ak']])
        if params_str:
             print(f"{indent} {EMOJI_INFO} 参数: {colored(params_str, 'cyan')}")

        if success:
            success_info_str = ReportGenerator._extract_success_info(service, platform, detail.get('response'))
            if success_info_str:
                print(f"{indent} {EMOJI_INFO} 结果: {colored(success_info_str, 'green')}")

        error_msg = detail.get('error')
        error_info = detail.get('error_info')

        if error_info:
            print(f"{indent} {EMOJI_WARNING} 原因: {colored(error_info, 'yellow')}")
            ReportGenerator._print_error_analysis(error_info)
        elif error_msg:
            print(f"{indent} {EMOJI_ERROR} 原因: {colored(error_msg, 'red')}")
            ReportGenerator._print_error_analysis(error_msg)
        elif not success:
            print(f"{indent} {EMOJI_ERROR} 原因: {colored('未知原因失败', 'red')}")
        print( "  └" + "─" * 30)


    @staticmethod
    def _extract_success_info(service: str, platform: str, response_data: Optional[Any]) -> str:
        """Extracts a brief summary from successful API response data."""
        if response_data is None: return ""
        try:
            if isinstance(response_data, dict) and response_data.get('content_type', '').startswith('image/'):
                return f"成功获取图片 ({response_data['content_type']})"
            elif isinstance(response_data, dict):
                if service == "IP定位":
                    city = "未知"
                    if platform == 'amap': city = response_data.get('city') if isinstance(response_data.get('city'), str) and response_data.get('city') else response_data.get('adcode', 'N/A')
                    elif platform == 'baidu': city = response_data.get('content', {}).get('address_detail', {}).get('city', 'N/A')
                    return f"定位城市: {city}" if city and city != 'N/A' else "定位成功 (无城市信息)"
                elif service == "地理编码":
                    loc = "未知"
                    if platform == 'amap': loc = response_data.get('geocodes', [{}])[0].get('location', 'N/A')
                    elif platform == 'baidu': loc_dict = response_data.get('result', {}).get('location', {}); loc = f"{loc_dict.get('lat', 'N/A')},{loc_dict.get('lng', 'N/A')}" if loc_dict else 'N/A'
                    return f"获取坐标: {loc}"
                elif service == "逆地理编码":
                    addr = "未知"
                    if platform == 'amap': addr = response_data.get('regeocode', {}).get('formatted_address', 'N/A')
                    elif platform == 'baidu': addr = response_data.get('result', {}).get('formatted_address', 'N/A')
                    return f"获取地址: {addr[:40]}{'...' if len(addr)>40 else ''}"
                elif service == "路径规划":
                    dist, dur = "N/A", "N/A"
                    try:
                        if platform == 'amap': path = response_data.get('route', {}).get('paths', [{}])[0]; dist, dur = path.get('distance'), path.get('duration')
                        elif platform == 'baidu': route = response_data.get('result', {}).get('routes', [{}])[0]; dist, dur = route.get('distance'), route.get('duration')
                        return f"距离: {dist}米, 时间: {dur}秒"
                    except (IndexError, KeyError, TypeError): return "路径规划成功"
                else: return "调用成功"
            return ""
        except Exception as e:
            return colored(f"结果解析出错: {e}", 'magenta')

    @staticmethod
    def _print_error_analysis(error_text: str):
        """Prints specific warning icons based on error text keywords."""
        indent = "  │  "
        error_lower = error_text.lower()
        analysis_printed = False
        if 'quota' in error_lower or '配额' in error_lower or '并发' in error_lower or 'qps' in error_lower or 'limit' in error_lower:
            print(f"{indent}{EMOJI_QUOTA} {colored('[疑似配额/并发限制]', 'yellow')}")
            analysis_printed = True
        if 'invalid user key' in error_lower or 'key不正确' in error_lower or 'ak不存在' in error_lower or '权限校验失败' in error_lower or 'permission denied' in error_lower or 'key status' in error_lower or '无效' in error_lower :
            print(f"{indent}{EMOJI_PERM} {colored('[密钥无效或服务未开通/权限不足]', 'red')}")
            analysis_printed = True
        if 'domain' in error_lower or 'ip白名单' in error_lower or 'referer' in error_lower or 'sn校验失败' in error_lower:
             print(f"{indent}{EMOJI_PERM} {colored('[IP/域名/Referer/SN白名单校验失败]', 'magenta')}")
             analysis_printed = True
        if 'timeout' in error_lower or '超时' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[请求超时]', 'blue')}")
            analysis_printed = True
        if 'json解析失败' in error_lower or '非预期响应类型' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[响应格式错误或非预期]', 'magenta')}")
            analysis_printed = True
        if '请求错误' in error_text or 'http' in error_lower or 'connection' in error_lower or 'ssl' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[网络/连接/HTTP错误]', 'blue')}")
            analysis_printed = True


# --- Main Execution ---

def parse_args():
    """Parses command line arguments."""
    parser = argparse.ArgumentParser(
        description=f"{EMOJI_TOOL} 地图API密钥全自动检测工具 (v3.6)", # Version bump for banner
        formatter_class=argparse.RawTextHelpFormatter
        )
    parser.add_argument('-k', '--keys', nargs='+', help="直接在命令行指定一个或多个密钥")
    parser.add_argument('-f', '--file', help="包含密钥的文件路径 (每行一个密钥)")
    parser.add_argument('-t', '--threads', type=int, default=5, help="并发测试线程数 (默认: 5)")
    parser.add_argument('-o', '--output', help="将成功的检测结果输出到指定的JSON文件")
    parser.add_argument('--skip-static', action='store_true', help="跳过静态地图服务的检测")

    temp_args, _ = parser.parse_known_args()
    if not temp_args.keys and not temp_args.file:
         # Print banner before showing help on error
         print(colored(BANNER, 'cyan'))
         parser.print_help()
         print(colored("\n错误: 必须通过 -k 或 -f 提供至少一个密钥。", "red"))
         exit(1)

    return parser.parse_args()

def main(keys_to_test: List[str], num_threads: int, output_file: Optional[str], skip_static: bool):
    """Main detection workflow."""
    tester = APITester()
    futures_map = {}
    results_for_output = []

    print(f"{EMOJI_DETECT} 开始检测 {len(keys_to_test)} 个密钥,使用 {num_threads} 个线程...")
    if skip_static: print(f"{EMOJI_INFO} 已设置跳过静态地图检测。")

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for key in keys_to_test:
            key = key.strip()
            if not key: continue

            platform, key_format_desc = KeyValidator.detect_platform(key)
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            if platform == 'unknown':
                print(colored(f"\n{EMOJI_FAIL} 无效密钥格式: {key}", "red"))
                continue
            if platform == 'ambiguous':
                 print(colored(f"\n{EMOJI_WARNING} 检测到模糊密钥: {masked_key}", "yellow"))
                 print(colored(f"  格式 ({key_format_desc}) 可能属于高德或百度,将尝试两者。", "yellow"))
                 platforms_to_try = ['amap', 'baidu']
            else:
                 platforms_to_try = [platform]

            for p in platforms_to_try:
                print(colored(f"\n{EMOJI_DETECT} 正在检测 {p.upper()} 服务 - 密钥: {masked_key}", "blue", attrs=['bold']))
                print(f"  {EMOJI_PLATFORM} 平台识别: {p.upper()}")
                print(f"  {EMOJI_KEY_FORMAT} Key格式: {key_format_desc}")

                platform_services = tester._generate_service_configs(p)

                for service_name, service_config in platform_services.items():
                    if skip_static and service_config.get('is_static', False):
                        continue

                    future = executor.submit(tester.test_service, p, service_name, key, service_config)
                    futures_map[future] = {'key': key, 'platform': p, 'service_name': service_name}

        print(colored("\n--- 检测结果 ---", attrs=['bold']))
        processed_count = 0
        total_tasks = len(futures_map)

        for future in as_completed(futures_map):
            processed_count += 1
            context = futures_map[future]
            key = context['key']
            platform = context['platform']
            service_name = context['service_name']
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            try:
                success, detail = future.result()
                ReportGenerator.print_result(service_name, platform, success, detail)

                if success and output_file:
                    result_item = {
                        'key': key,
                        'platform': platform,
                        'service_name': service_name,
                        'request_params': detail.get('params'),
                        'response_summary': ReportGenerator._extract_success_info(service_name, platform, detail.get('response')),
                        'latency_ms': detail.get('latency'),
                    }
                    results_for_output.append(result_item)

            except Exception as exc:
                 print(colored(f"{EMOJI_FAIL} {service_name.ljust(7)}", 'red'))
                 print(f"  └─ {colored(f'执行 {service_name} (平台: {platform}, 密钥: {masked_key}) 时发生内部错误: {exc}', 'red')}")

    if output_file:
        print(colored(f"\n{EMOJI_WRITE} 正在将 {len(results_for_output)} 条成功结果写入到: {output_file}", "blue"))
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(results_for_output, f, ensure_ascii=False, indent=4)
            print(colored(f"{EMOJI_SUCCESS} 成功写入 JSON 文件。", "green"))
        except IOError as e:
            print(colored(f"{EMOJI_ERROR} 写入文件失败: {e}", "red"))
        except TypeError as e:
             print(colored(f"{EMOJI_ERROR} 序列化结果为JSON时失败 (可能包含无法序列化的数据): {e}", "red"))


if __name__ == "__main__":
    # --- Print Banner First ---
    print(colored(BANNER, 'cyan')) # Choose a color for the banner

    args = parse_args() # Parse arguments after printing banner
    keys = []

    if args.keys:
        keys.extend(args.keys)
    if args.file:
        try:
            with open(args.file, 'r', encoding='utf-8') as f:
                keys.extend([line.strip() for line in f if line.strip()])
        except FileNotFoundError:
             print(colored(f"{EMOJI_ERROR} 错误:密钥文件 '{args.file}' 未找到。", "red"))
             exit(1)
        except Exception as e:
             print(colored(f"{EMOJI_ERROR} 错误:读取密钥文件 '{args.file}' 时出错: {e}", "red"))
             exit(1)

    unique_keys = sorted(list(set(keys)))

    # Input validation is handled within parse_args now

    main(unique_keys, args.threads, args.output, args.skip_static)
    print(colored(f"\n{EMOJI_TOOL} 检测完成。", attrs=['bold']))

地图 API 密钥全自动检测工具

简介

地图 API 密钥全自动检测工具 是一款高效、全面的解决方案,旨在简化渗透测试过程中地图 API 密钥的管理和风险评估工作。 无论是渗透测试人员还是安全研究人员,在分析应用程序或系统对地图 API 的依赖时,常常面临密钥有效性验证、服务权限探测和潜在安全风险评估等多重挑战。 本工具通过自动化执行这些关键任务,极大地提升了水洞效率,并帮助识别与地图服务相关的潜在安全漏洞.

主要功能

  • 密钥有效性验证: 多接口,多服务调用自动验证地图 API 密钥的有效状态 (高德 / 百度),快速识别可能被滥用或已泄露的密钥。
  • 批量密钥测试: 支持批量测试多个地图 API 密钥,提高渗透测试效率,快速评估大量密钥的风险。

技术特点

  • Python 开发: 采用 Python 语言开发,具有良好的跨平台性和可读性。
  • Requests 库: 使用 Requests 库发送 HTTP 请求,简化 API 调用过程。

快速开始

  1. 运行脚本:
    python your_script_name.py -k your_amap_key

输出如下:

image

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。