banner
毅种循环

毅种循环

请将那贻笑罪过以逐字吟咏 如对冰川投以游丝般倾诉

泄露マップAKワンクリック検出利用

背景#

APK の逆コンパイル中に、map に関する KEY が存在することがわかりました。いくつかのインターフェースを試しましたが、成功しませんでした。具体的にどのインターフェースとサービスに対応しているのかわからなかったので、GPT と協力してこのコードを書きました。自動化 AK の利用を実現することを目的としています。

コード#

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import argparse
import requests
import random
import time
import certifi
import json
import traceback
import os
from termcolor import colored
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.exceptions import InsecureRequestWarning

# --- Banner ---
BANNER = """


                                                                                           
                            by: 地図API密钥全自动检测工具
"""

# --- Configuration ---

# Emojis for output enhancement
EMOJI_DETECT = "🔍"
EMOJI_PLATFORM = "🏷️"
EMOJI_KEY_FORMAT = "🔑"
EMOJI_SERVICE = "▶️"
EMOJI_SUCCESS = "✅"
EMOJI_FAIL = "❌"
EMOJI_CLOCK = "⏱️"
EMOJI_INFO = "ℹ️"
EMOJI_ERROR = "❗"
EMOJI_WARNING = "⚠️"
EMOJI_NETWORK = "🌐"
EMOJI_QUOTA = "🚦"
EMOJI_PERM = "🔒"
EMOJI_WRITE = "💾"
EMOJI_TOOL = "🛠️"


# Disable SSL warnings (use cautiously)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Safe coordinate bounds
SAFE_BOUNDS = {
    'global': {'lon_min': -180, 'lon_max': 180, 'lat_min': -90, 'lat_max': 90},
    'china': {'lon_min': 73.66, 'lon_max': 135.05, 'lat_min': 3.86, 'lat_max': 53.55}
}

# Test address pool
TEST_ADDRESS_POOL = [
    "北京市朝阳区望京soho",
    "上海市浦东新区陆家嘴环路1288号",
    "广州市天河区珠江新城临江大道5号",
    "深圳市福田区深南大道6001号",
    "成都市锦江区红星路三段1号"
]

# --- Helper Classes ---

class GeoGenerator:
    """ランダムな地理データを生成します。"""
    @staticmethod
    def random_coord(region='china'):
        bounds = SAFE_BOUNDS[region]
        return (
            round(random.uniform(bounds['lat_min'], bounds['lat_max']), 6),
            round(random.uniform(bounds['lon_min'], bounds['lon_max']), 6)
        )

    @staticmethod
    def random_ip():
        # より見つけやすい公共IP範囲を生成します
        while True:
            ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
            parts = [int(p) for p in ip.split('.')]
            if parts[0] == 10: continue
            if parts[0] == 127: continue
            if parts[0] == 172 and 16 <= parts[1] <= 31: continue
            if parts[0] == 192 and parts[1] == 168: continue
            if parts[0] >= 224: continue
            return ip


class KeyValidator:
    """キーのフォーマットを検証し、プラットフォームを検出します。"""
    @staticmethod
    def detect_platform(key: str) -> Tuple[str, str]:
        """プラットフォームを検出し、フォーマットの説明を提供します。"""
        if re.match(r'^[0-9a-fA-F]{32}$', key):
            return 'amap', '高德 (32ビット Hex)'
        if re.match(r'^[A-Za-z0-9]{32}$', key):
            if re.match(r'^[0-9a-fA-F]{32}$', key):
                 return 'ambiguous', '高德または百度の可能性 (純Hex文字)'
            return 'baidu', '百度 (32ビット Alnum)'
        return 'unknown', '未知のフォーマット'


class APITester:
    """APIテスト呼び出しを実行します。"""
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0'
        ]

    def _generate_service_configs(self, platform: str) -> Dict:
         """新しいランダムデータのためにサービス設定を動的に生成します。"""
         coord = GeoGenerator.random_coord()
         random_addr = random.choice(TEST_ADDRESS_POOL)
         random_ip = GeoGenerator.random_ip()

         if platform == 'amap':
            return {
                "静的地図": {
                    "url": "https://restapi.amap.com/v3/staticmap",
                    "params": {"location": f"{coord[1]},{coord[0]}", "zoom": 10, "size": "400*300", "markers": f"mid,,A:{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "地理コーディング": {
                    "url": "https://restapi.amap.com/v3/geocode/geo",
                    "params": {"address": random_addr}
                },
                "逆地理コーディング": {
                    "url": "https://restapi.amap.com/v3/geocode/regeo",
                    "params": {"location": f"{coord[1]},{coord[0]}"}
                },
                "ルート計画": {
                    "url": "https://restapi.amap.com/v3/direction/driving",
                    "params": {"origin": "116.481028,39.989643", "destination": "116.434446,39.90816"}
                },
                "IP位置情報": {
                    "url": "https://restapi.amap.com/v3/ip",
                    "params": {"ip": random_ip}
                }
            }
         elif platform == 'baidu':
            return {
                "静的地図": {
                    "url": "https://api.map.baidu.com/staticimage/v2",
                    "params": {"center": f"{coord[1]},{coord[0]}", "zoom": 10, "width": 400, "height": 300, "markers": f"{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "地理コーディング": {
                    "url": "https://api.map.baidu.com/geocoding/v3/",
                    "params": {"address": random_addr, "output": "json"}
                },
                "逆地理コーディング": {
                    "url": "https://api.map.baidu.com/reverse_geocoding/v3/",
                    "params": {"location": f"{coord[0]},{coord[1]}", "output": "json"}
                },
                "ルート計画": {
                    "url": "https://api.map.baidu.com/direction/v2/driving",
                    "params": {"origin": "40.01116,116.339303", "destination": "39.936404,116.452562"}
                },
                "IP位置情報": {
                    "url": "https://api.map.baidu.com/location/ip",
                    "params": {"ip": random_ip, "coor": "bd09ll"}
                }
            }
         return {}

    def _call_api(self, url: str, params: dict, platform: str, service_name: str, is_static: bool = False) -> Tuple[bool, dict]:
        """リトライを伴う単一のAPIリクエストを実行します。"""
        current_params = params.copy()
        retries = current_params.pop('retry', 2) + 1
        detail = {'params': params, 'latency': None, 'error': None, 'error_info': None, 'response': None}

        for attempt in range(retries):
            response = None
            try:
                headers = {'User-Agent': random.choice(self.user_agents)}
                start_time = time.time()
                response = requests.get(
                    url,
                    params=current_params,
                    headers=headers,
                    timeout=15,
                    verify=certifi.where(),
                    stream=is_static
                )
                latency = round((time.time() - start_time) * 1000, 2)
                detail['latency'] = latency

                content_type = response.headers.get('Content-Type', '').lower()

                if is_static and response.status_code == 200 and content_type.startswith('image/'):
                    detail['response'] = {'content_type': content_type, 'status_code': 200}
                    response.close()
                    return True, detail

                if response.status_code != 200:
                    error_msg = f"HTTP {response.status_code}"
                    try:
                        error_data = response.json()
                        error_msg += f". API Msg: {json.dumps(error_data, ensure_ascii=False)}"
                    except (json.JSONDecodeError, requests.exceptions.RequestException):
                        error_msg += f". Response: {response.text[:200]}..."
                    detail['error'] = error_msg
                    raise requests.HTTPError(error_msg, response=response)

                data = None
                try:
                    if 'application/json' in content_type or 'text/javascript' in content_type:
                         data = response.json()
                    elif platform == 'amap' and service_name == "IP位置情報" and 'text/html' in content_type and response.text.startswith('{') and response.text.endswith('}'):
                         data = json.loads(response.text)
                    else:
                         raise ValueError(f"非予期の応答タイプ: {content_type}")

                    detail['response'] = data

                    success = False
                    error_info = ""
                    if platform == 'amap':
                        success = str(data.get('status')) == '1'
                        if not success: error_info = data.get('info', '未知の高德エラー') + f" (infocode: {data.get('infocode', '')})"
                    elif platform == 'baidu':
                        success = data.get('status') == 0
                        if not success: error_info = data.get('message', '未知の百度エラー') + f" (status: {data.get('status', '')})"
                    else: error_info = "未知のプラットフォームエラー"

                    detail['error_info'] = error_info if not success else None
                    return success, detail

                except json.JSONDecodeError as e:
                    detail['error'] = f"JSON解析失敗 ({content_type}): {str(e)} | 応答: {response.text[:200]}..."
                    return False, detail
                except ValueError as e:
                    detail['error'] = f"{str(e)} | 応答: {response.text[:200]}..."
                    return False, detail

            except requests.exceptions.Timeout:
                detail['error'] = "リクエストタイムアウト"
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except requests.exceptions.RequestException as e:
                detail['error'] = f"リクエストエラー: {type(e).__name__}" + (f" (Status: {response.status_code})" if response else "")
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except Exception as e:
                 detail['error'] = f"未知の処理エラー: {type(e).__name__}: {str(e)}"
                 print(f"{EMOJI_ERROR} {colored('内部スクリプトエラー', 'red')}: {traceback.format_exc()[:500]}...")
                 return False, detail
            finally:
                if response:
                    response.close()

        detail['error'] = f"最大リトライ回数を超えました ({retries}). 最後のエラー: {detail['error']}"
        return False, detail

    def test_service(self, platform: str, service_name: str, key: str, service_config: dict) -> Tuple[bool, dict]:
        """サービスの設定を使って単一のサービスをテストします。"""
        params = service_config['params'].copy()
        params['key' if platform == 'amap' else 'ak'] = key
        is_static = service_config.get('is_static', False)
        return self._call_api(service_config['url'], params, platform, service_name, is_static)


class ReportGenerator:
    """テスト結果のコンソール出力を生成します。"""
    @staticmethod
    def print_result(service: str, platform: str, success: bool, detail: dict):
        status_icon = EMOJI_SUCCESS if success else EMOJI_FAIL
        status_color = 'green' if success else 'red'
        service_status = colored("成功" if success else "失敗", status_color)

        print(f"{EMOJI_SERVICE} {service.ljust(7)} {status_icon} {service_status}")

        indent = "  │"

        latency = detail.get('latency')
        latency_str = f"{latency:.2f} ms" if latency is not None else "N/A"
        print(f"{indent} {EMOJI_CLOCK} 耗時: {latency_str}")

        params_str = " | ".join([f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={str(v)[:30]}{'...' if len(str(v))>30 else ''}"
                                 for k, v in detail.get('params', {}).items() if k not in ['key', 'ak']])
        if params_str:
             print(f"{indent} {EMOJI_INFO} パラメータ: {colored(params_str, 'cyan')}")

        if success:
            success_info_str = ReportGenerator._extract_success_info(service, platform, detail.get('response'))
            if success_info_str:
                print(f"{indent} {EMOJI_INFO} 結果: {colored(success_info_str, 'green')}")

        error_msg = detail.get('error')
        error_info = detail.get('error_info')

        if error_info:
            print(f"{indent} {EMOJI_WARNING} 原因: {colored(error_info, 'yellow')}")
            ReportGenerator._print_error_analysis(error_info)
        elif error_msg:
            print(f"{indent} {EMOJI_ERROR} 原因: {colored(error_msg, 'red')}")
            ReportGenerator._print_error_analysis(error_msg)
        elif not success:
            print(f"{indent} {EMOJI_ERROR} 原因: {colored('未知の原因による失敗', 'red')}")
        print( "  └" + "─" * 30)


    @staticmethod
    def _extract_success_info(service: str, platform: str, response_data: Optional[Any]) -> str:
        """成功したAPI応答データから簡潔な要約を抽出します。"""
        if response_data is None: return ""
        try:
            if isinstance(response_data, dict) and response_data.get('content_type', '').startswith('image/'):
                return f"画像を取得成功 ({response_data['content_type']})"
            elif isinstance(response_data, dict):
                if service == "IP位置情報":
                    city = "未知"
                    if platform == 'amap': city = response_data.get('city') if isinstance(response_data.get('city'), str) and response_data.get('city') else response_data.get('adcode', 'N/A')
                    elif platform == 'baidu': city = response_data.get('content', {}).get('address_detail', {}).get('city', 'N/A')
                    return f"位置情報の都市: {city}" if city and city != 'N/A' else "位置情報取得成功 (都市情報なし)"
                elif service == "地理コーディング":
                    loc = "未知"
                    if platform == 'amap': loc = response_data.get('geocodes', [{}])[0].get('location', 'N/A')
                    elif platform == 'baidu': loc_dict = response_data.get('result', {}).get('location', {}); loc = f"{loc_dict.get('lat', 'N/A')},{loc_dict.get('lng', 'N/A')}" if loc_dict else 'N/A'
                    return f"座標を取得: {loc}"
                elif service == "逆地理コーディング":
                    addr = "未知"
                    if platform == 'amap': addr = response_data.get('regeocode', {}).get('formatted_address', 'N/A')
                    elif platform == 'baidu': addr = response_data.get('result', {}).get('formatted_address', 'N/A')
                    return f"住所を取得: {addr[:40]}{'...' if len(addr)>40 else ''}"
                elif service == "ルート計画":
                    dist, dur = "N/A", "N/A"
                    try:
                        if platform == 'amap': path = response_data.get('route', {}).get('paths', [{}])[0]; dist, dur = path.get('distance'), path.get('duration')
                        elif platform == 'baidu': route = response_data.get('result', {}).get('routes', [{}])[0]; dist, dur = route.get('distance'), route.get('duration')
                        return f"距離: {dist}メートル, 時間: {dur}秒"
                    except (IndexError, KeyError, TypeError): return "ルート計画成功"
                else: return "呼び出し成功"
            return ""
        except Exception as e:
            return colored(f"結果解析エラー: {e}", 'magenta')

    @staticmethod
    def _print_error_analysis(error_text: str):
        """エラーテキストのキーワードに基づいて特定の警告アイコンを印刷します。"""
        indent = "  │  "
        error_lower = error_text.lower()
        analysis_printed = False
        if 'quota' in error_lower or '配額' in error_lower or '並行' in error_lower or 'qps' in error_lower or 'limit' in error_lower:
            print(f"{indent}{EMOJI_QUOTA} {colored('[疑似配額/並行制限]', 'yellow')}")
            analysis_printed = True
        if 'invalid user key' in error_lower or 'key不正' in error_lower or 'ak存在しない' in error_lower or '権限確認失敗' in error_lower or 'permission denied' in error_lower or 'key status' in error_lower or '無効' in error_lower :
            print(f"{indent}{EMOJI_PERM} {colored('[キー無効またはサービス未開通/権限不足]', 'red')}")
            analysis_printed = True
        if 'domain' in error_lower or 'ipホワイトリスト' in error_lower or 'referer' in error_lower or 'sn確認失敗' in error_lower:
             print(f"{indent}{EMOJI_PERM} {colored('[IP/ドメイン/Referer/SNホワイトリスト確認失敗]', 'magenta')}")
             analysis_printed = True
        if 'timeout' in error_lower or '超時' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[リクエストタイムアウト]', 'blue')}")
            analysis_printed = True
        if 'json解析失敗' in error_lower or '非予期応答タイプ' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[応答フォーマットエラーまたは非予期]', 'magenta')}")
            analysis_printed = True
        if 'リクエストエラー' in error_text or 'http' in error_lower or 'connection' in error_lower or 'ssl' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[ネットワーク/接続/HTTPエラー]', 'blue')}")
            analysis_printed = True


# --- Main Execution ---

def parse_args():
    """コマンドライン引数を解析します。"""
    parser = argparse.ArgumentParser(
        description=f"{EMOJI_TOOL} 地図API密钥全自动检测工具 (v3.6)", # Version bump for banner
        formatter_class=argparse.RawTextHelpFormatter
        )
    parser.add_argument('-k', '--keys', nargs='+', help="コマンドラインで1つまたは複数のキーを指定します")
    parser.add_argument('-f', '--file', help="キーを含むファイルパス (1行に1つのキー)")
    parser.add_argument('-t', '--threads', type=int, default=5, help="並行テストスレッド数 (デフォルト: 5)")
    parser.add_argument('-o', '--output', help="成功した検出結果を指定のJSONファイルに出力します")
    parser.add_argument('--skip-static', action='store_true', help="静的地図サービスの検出をスキップします")

    temp_args, _ = parser.parse_known_args()
    if not temp_args.keys and not temp_args.file:
         # エラー時にヘルプを表示する前にバナーを印刷します
         print(colored(BANNER, 'cyan'))
         parser.print_help()
         print(colored("\nエラー: -k または -f を通じて少なくとも1つのキーを提供する必要があります。", "red"))
         exit(1)

    return parser.parse_args()

def main(keys_to_test: List[str], num_threads: int, output_file: Optional[str], skip_static: bool):
    """メイン検出ワークフロー。"""
    tester = APITester()
    futures_map = {}
    results_for_output = []

    print(f"{EMOJI_DETECT} {len(keys_to_test)} 個のキーの検出を開始します。スレッド数: {num_threads}...")
    if skip_static: print(f"{EMOJI_INFO} 静的地図検出をスキップするように設定されました。")

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for key in keys_to_test:
            key = key.strip()
            if not key: continue

            platform, key_format_desc = KeyValidator.detect_platform(key)
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            if platform == 'unknown':
                print(colored(f"\n{EMOJI_FAIL} 無効なキー形式: {key}", "red"))
                continue
            if platform == 'ambiguous':
                 print(colored(f"\n{EMOJI_WARNING} 曖昧なキーが検出されました: {masked_key}", "yellow"))
                 print(colored(f"  フォーマット ({key_format_desc}) は高德または百度の可能性があり、両方を試みます。", "yellow"))
                 platforms_to_try = ['amap', 'baidu']
            else:
                 platforms_to_try = [platform]

            for p in platforms_to_try:
                print(colored(f"\n{EMOJI_DETECT} {p.upper()} サービスを検出中 - キー: {masked_key}", "blue", attrs=['bold']))
                print(f"  {EMOJI_PLATFORM} プラットフォーム識別: {p.upper()}")
                print(f"  {EMOJI_KEY_FORMAT} キー形式: {key_format_desc}")

                platform_services = tester._generate_service_configs(p)

                for service_name, service_config in platform_services.items():
                    if skip_static and service_config.get('is_static', False):
                        continue

                    future = executor.submit(tester.test_service, p, service_name, key, service_config)
                    futures_map[future] = {'key': key, 'platform': p, 'service_name': service_name}

        print(colored("\n--- 検出結果 ---", attrs=['bold']))
        processed_count = 0
        total_tasks = len(futures_map)

        for future in as_completed(futures_map):
            processed_count += 1
            context = futures_map[future]
            key = context['key']
            platform = context['platform']
            service_name = context['service_name']
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            try:
                success, detail = future.result()
                ReportGenerator.print_result(service_name, platform, success, detail)

                if success and output_file:
                    result_item = {
                        'key': key,
                        'platform': platform,
                        'service_name': service_name,
                        'request_params': detail.get('params'),
                        'response_summary': ReportGenerator._extract_success_info(service_name, platform, detail.get('response')),
                        'latency_ms': detail.get('latency'),
                    }
                    results_for_output.append(result_item)

            except Exception as exc:
                 print(colored(f"{EMOJI_FAIL} {service_name.ljust(7)}", 'red'))
                 print(f"  └─ {colored(f'実行中に内部エラーが発生しました {service_name} (プラットフォーム: {platform}, キー: {masked_key}): {exc}', 'red')}")

    if output_file:
        print(colored(f"\n{EMOJI_WRITE} {len(results_for_output)} 件の成功結果を {output_file} に書き込んでいます", "blue"))
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(results_for_output, f, ensure_ascii=False, indent=4)
            print(colored(f"{EMOJI_SUCCESS} JSONファイルに正常に書き込みました。", "green"))
        except IOError as e:
            print(colored(f"{EMOJI_ERROR} ファイル書き込み失敗: {e}", "red"))
        except TypeError as e:
             print(colored(f"{EMOJI_ERROR} 結果をJSONにシリアライズする際に失敗しました (シリアライズできないデータが含まれている可能性があります): {e}", "red"))


if __name__ == "__main__":
    # --- バナーを最初に印刷 ---
    print(colored(BANNER, 'cyan')) # バナーの色を選択

    args = parse_args() # バナーを印刷した後に引数を解析
    keys = []

    if args.keys:
        keys.extend(args.keys)
    if args.file:
        try:
            with open(args.file, 'r', encoding='utf-8') as f:
                keys.extend([line.strip() for line in f if line.strip()])
        except FileNotFoundError:
             print(colored(f"{EMOJI_ERROR} エラー: キーファイル '{args.file}' が見つかりません。", "red"))
             exit(1)
        except Exception as e:
             print(colored(f"{EMOJI_ERROR} エラー: キーファイル '{args.file}' を読み込む際にエラーが発生しました: {e}", "red"))
             exit(1)

    unique_keys = sorted(list(set(keys)))

    # 入力検証は現在parse_args内で処理されています

    main(unique_keys, args.threads, args.output, args.skip_static)
    print(colored(f"\n{EMOJI_TOOL} 検出完了。", attrs=['bold']))

地図 API 密钥全自動検出ツール

概要

地図 API 密钥全自動検出ツール は、ペネトレーションテストプロセスにおける地図 API キーの管理とリスク評価作業を簡素化することを目的とした効率的で包括的なソリューションです。 ペネトレーションテスト担当者やセキュリティ研究者は、アプリケーションやシステムが地図 API に依存している際に、キーの有効性検証、サービス権限の検出、潜在的なセキュリティリスク評価などの複数の課題に直面することがよくあります。 このツールは、これらの重要なタスクを自動化することで、作業効率を大幅に向上させ、地図サービスに関連する潜在的なセキュリティ脆弱性を特定するのに役立ちます。

主な機能

  • キーの有効性検証: 複数のインターフェースとサービス呼び出しを自動で検証し、地図 API キーの有効状態(高德 / 百度)を迅速に識別し、悪用される可能性のあるキーや漏洩したキーを特定します。
  • バッチキーテスト: 複数の地図 API キーを一度にテストすることをサポートし、ペネトレーションテストの効率を向上させ、大量のキーのリスクを迅速に評価します。

技術的特徴

  • Python 開発: Python 言語で開発されており、優れたクロスプラットフォーム性と可読性を持っています。
  • Requests ライブラリ: Requests ライブラリを使用して HTTP リクエストを送信し、API 呼び出しプロセスを簡素化します。

クイックスタート

  1. スクリプトを実行:
    python your_script_name.py -k your_amap_key

出力は以下の通りです:

image

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。