banner
毅种循环

毅种循环

请将那贻笑罪过以逐字吟咏 如对冰川投以游丝般倾诉

Leak Map AK One-Click Detection Utilization

Background#

During an APK decompilation, a KEY related to the map was found. Several interfaces were tried but failed to call successfully. It was unclear which corresponding interface and service were involved, so I collaborated with GPT to write this code, aiming to achieve automated AK utilization.

Code#

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import argparse
import requests
import random
import time
import certifi
import json
import traceback
import os
from termcolor import colored
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.exceptions import InsecureRequestWarning

# --- Banner ---
BANNER = """


                                                                                           
                            by: Automatic Detection Tool for Map API Keys
"""

# --- Configuration ---

# Emojis for output enhancement
EMOJI_DETECT = "🔍"
EMOJI_PLATFORM = "🏷️"
EMOJI_KEY_FORMAT = "🔑"
EMOJI_SERVICE = "▶️"
EMOJI_SUCCESS = "✅"
EMOJI_FAIL = "❌"
EMOJI_CLOCK = "⏱️"
EMOJI_INFO = "ℹ️"
EMOJI_ERROR = "❗"
EMOJI_WARNING = "⚠️"
EMOJI_NETWORK = "🌐"
EMOJI_QUOTA = "🚦"
EMOJI_PERM = "🔒"
EMOJI_WRITE = "💾"
EMOJI_TOOL = "🛠️"


# Disable SSL warnings (use cautiously)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Safe coordinate bounds
SAFE_BOUNDS = {
    'global': {'lon_min': -180, 'lon_max': 180, 'lat_min': -90, 'lat_max': 90},
    'china': {'lon_min': 73.66, 'lon_max': 135.05, 'lat_min': 3.86, 'lat_max': 53.55}
}

# Test address pool
TEST_ADDRESS_POOL = [
    "Wangjing Soho, Chaoyang District, Beijing",
    "1288 Lujiazui Ring Road, Pudong New District, Shanghai",
    "5 Linjiang Avenue, Zhujiang New Town, Tianhe District, Guangzhou",
    "6001 Shennan Avenue, Futian District, Shenzhen",
    "1 Hongxing Road, Jinjiang District, Chengdu"
]

# --- Helper Classes ---

class GeoGenerator:
    """Generates random geographic data."""
    @staticmethod
    def random_coord(region='china'):
        bounds = SAFE_BOUNDS[region]
        return (
            round(random.uniform(bounds['lat_min'], bounds['lat_max']), 6),
            round(random.uniform(bounds['lon_min'], bounds['lon_max']), 6)
        )

    @staticmethod
    def random_ip():
        # Generate a public IP range more likely to be locatable
        while True:
            ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
            parts = [int(p) for p in ip.split('.')]
            if parts[0] == 10: continue
            if parts[0] == 127: continue
            if parts[0] == 172 and 16 <= parts[1] <= 31: continue
            if parts[0] == 192 and parts[1] == 168: continue
            if parts[0] >= 224: continue
            return ip


class KeyValidator:
    """Validates key format and detects platform."""
    @staticmethod
    def detect_platform(key: str) -> Tuple[str, str]:
        """Detects platform and provides format description."""
        if re.match(r'^[0-9a-fA-F]{32}$', key):
            return 'amap', 'Amap (32-bit Hex)'
        if re.match(r'^[A-Za-z0-9]{32}$', key):
            if re.match(r'^[0-9a-fA-F]{32}$', key):
                 return 'ambiguous', 'Could be Amap or Baidu (pure Hex characters)'
            return 'baidu', 'Baidu (32-bit Alnum)'
        return 'unknown', 'Unknown format'


class APITester:
    """Executes API test calls."""
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0'
        ]

    def _generate_service_configs(self, platform: str) -> Dict:
         """Generates service configs dynamically for fresh random data."""
         coord = GeoGenerator.random_coord()
         random_addr = random.choice(TEST_ADDRESS_POOL)
         random_ip = GeoGenerator.random_ip()

         if platform == 'amap':
            return {
                "Static Map": {
                    "url": "https://restapi.amap.com/v3/staticmap",
                    "params": {"location": f"{coord[1]},{coord[0]}", "zoom": 10, "size": "400*300", "markers": f"mid,,A:{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "Geocoding": {
                    "url": "https://restapi.amap.com/v3/geocode/geo",
                    "params": {"address": random_addr}
                },
                "Reverse Geocoding": {
                    "url": "https://restapi.amap.com/v3/geocode/regeo",
                    "params": {"location": f"{coord[1]},{coord[0]}"}
                },
                "Route Planning": {
                    "url": "https://restapi.amap.com/v3/direction/driving",
                    "params": {"origin": "116.481028,39.989643", "destination": "116.434446,39.90816"}
                },
                "IP Location": {
                    "url": "https://restapi.amap.com/v3/ip",
                    "params": {"ip": random_ip}
                }
            }
         elif platform == 'baidu':
            return {
                "Static Map": {
                    "url": "https://api.map.baidu.com/staticimage/v2",
                    "params": {"center": f"{coord[1]},{coord[0]}", "zoom": 10, "width": 400, "height": 300, "markers": f"{coord[1]},{coord[0]}"},
                    "is_static": True
                },
                "Geocoding": {
                    "url": "https://api.map.baidu.com/geocoding/v3/",
                    "params": {"address": random_addr, "output": "json"}
                },
                "Reverse Geocoding": {
                    "url": "https://api.map.baidu.com/reverse_geocoding/v3/",
                    "params": {"location": f"{coord[0]},{coord[1]}", "output": "json"}
                },
                "Route Planning": {
                    "url": "https://api.map.baidu.com/direction/v2/driving",
                    "params": {"origin": "40.01116,116.339303", "destination": "39.936404,116.452562"}
                },
                "IP Location": {
                    "url": "https://api.map.baidu.com/location/ip",
                    "params": {"ip": random_ip, "coor": "bd09ll"}
                }
            }
         return {}

    def _call_api(self, url: str, params: dict, platform: str, service_name: str, is_static: bool = False) -> Tuple[bool, dict]:
        """Executes a single API request with retries."""
        current_params = params.copy()
        retries = current_params.pop('retry', 2) + 1
        detail = {'params': params, 'latency': None, 'error': None, 'error_info': None, 'response': None}

        for attempt in range(retries):
            response = None
            try:
                headers = {'User-Agent': random.choice(self.user_agents)}
                start_time = time.time()
                response = requests.get(
                    url,
                    params=current_params,
                    headers=headers,
                    timeout=15,
                    verify=certifi.where(),
                    stream=is_static
                )
                latency = round((time.time() - start_time) * 1000, 2)
                detail['latency'] = latency

                content_type = response.headers.get('Content-Type', '').lower()

                if is_static and response.status_code == 200 and content_type.startswith('image/'):
                    detail['response'] = {'content_type': content_type, 'status_code': 200}
                    response.close()
                    return True, detail

                if response.status_code != 200:
                    error_msg = f"HTTP {response.status_code}"
                    try:
                        error_data = response.json()
                        error_msg += f". API Msg: {json.dumps(error_data, ensure_ascii=False)}"
                    except (json.JSONDecodeError, requests.exceptions.RequestException):
                        error_msg += f". Response: {response.text[:200]}..."
                    detail['error'] = error_msg
                    raise requests.HTTPError(error_msg, response=response)

                data = None
                try:
                    if 'application/json' in content_type or 'text/javascript' in content_type:
                         data = response.json()
                    elif platform == 'amap' and service_name == "IP Location" and 'text/html' in content_type and response.text.startswith('{') and response.text.endswith('}'):
                         data = json.loads(response.text)
                    else:
                         raise ValueError(f"Unexpected response type: {content_type}")

                    detail['response'] = data

                    success = False
                    error_info = ""
                    if platform == 'amap':
                        success = str(data.get('status')) == '1'
                        if not success: error_info = data.get('info', 'Unknown Amap error') + f" (infocode: {data.get('infocode', '')})"
                    elif platform == 'baidu':
                        success = data.get('status') == 0
                        if not success: error_info = data.get('message', 'Unknown Baidu error') + f" (status: {data.get('status', '')})"
                    else: error_info = "Unknown platform error"

                    detail['error_info'] = error_info if not success else None
                    return success, detail

                except json.JSONDecodeError as e:
                    detail['error'] = f"JSON parsing failed ({content_type}): {str(e)} | Response: {response.text[:200]}..."
                    return False, detail
                except ValueError as e:
                    detail['error'] = f"{str(e)} | Response: {response.text[:200]}..."
                    return False, detail

            except requests.exceptions.Timeout:
                detail['error'] = "Request timed out"
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except requests.exceptions.RequestException as e:
                detail['error'] = f"Request error: {type(e).__name__}" + (f" (Status: {response.status_code})" if response else "")
                if attempt == retries - 1: return False, detail
                time.sleep(1 + 1.5 * attempt)
            except Exception as e:
                 detail['error'] = f"Unknown processing error: {type(e).__name__}: {str(e)}"
                 print(f"{EMOJI_ERROR} {colored('Internal script error', 'red')}: {traceback.format_exc()[:500]}...")
                 return False, detail
            finally:
                if response:
                    response.close()

        detail['error'] = f"Exceeded maximum retry attempts ({retries}). Last error: {detail['error']}"
        return False, detail

    def test_service(self, platform: str, service_name: str, key: str, service_config: dict) -> Tuple[bool, dict]:
        """Tests a single service with its config."""
        params = service_config['params'].copy()
        params['key' if platform == 'amap' else 'ak'] = key
        is_static = service_config.get('is_static', False)
        return self._call_api(service_config['url'], params, platform, service_name, is_static)


class ReportGenerator:
    """Generates console output for test results."""
    @staticmethod
    def print_result(service: str, platform: str, success: bool, detail: dict):
        status_icon = EMOJI_SUCCESS if success else EMOJI_FAIL
        status_color = 'green' if success else 'red'
        service_status = colored("Success" if success else "Failed", status_color)

        print(f"{EMOJI_SERVICE} {service.ljust(7)} {status_icon} {service_status}")

        indent = "  │"

        latency = detail.get('latency')
        latency_str = f"{latency:.2f} ms" if latency is not None else "N/A"
        print(f"{indent} {EMOJI_CLOCK} Latency: {latency_str}")

        params_str = " | ".join([f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={str(v)[:30]}{'...' if len(str(v))>30 else ''}"
                                 for k, v in detail.get('params', {}).items() if k not in ['key', 'ak']])
        if params_str:
             print(f"{indent} {EMOJI_INFO} Parameters: {colored(params_str, 'cyan')}")

        if success:
            success_info_str = ReportGenerator._extract_success_info(service, platform, detail.get('response'))
            if success_info_str:
                print(f"{indent} {EMOJI_INFO} Result: {colored(success_info_str, 'green')}")

        error_msg = detail.get('error')
        error_info = detail.get('error_info')

        if error_info:
            print(f"{indent} {EMOJI_WARNING} Reason: {colored(error_info, 'yellow')}")
            ReportGenerator._print_error_analysis(error_info)
        elif error_msg:
            print(f"{indent} {EMOJI_ERROR} Reason: {colored(error_msg, 'red')}")
            ReportGenerator._print_error_analysis(error_msg)
        elif not success:
            print(f"{indent} {EMOJI_ERROR} Reason: {colored('Failed for unknown reasons', 'red')}")
        print( "  └" + "─" * 30)


    @staticmethod
    def _extract_success_info(service: str, platform: str, response_data: Optional[Any]) -> str:
        """Extracts a brief summary from successful API response data."""
        if response_data is None: return ""
        try:
            if isinstance(response_data, dict) and response_data.get('content_type', '').startswith('image/'):
                return f"Successfully retrieved image ({response_data['content_type']})"
            elif isinstance(response_data, dict):
                if service == "IP Location":
                    city = "Unknown"
                    if platform == 'amap': city = response_data.get('city') if isinstance(response_data.get('city'), str) and response_data.get('city') else response_data.get('adcode', 'N/A')
                    elif platform == 'baidu': city = response_data.get('content', {}).get('address_detail', {}).get('city', 'N/A')
                    return f"Located city: {city}" if city and city != 'N/A' else "Location successful (no city information)"
                elif service == "Geocoding":
                    loc = "Unknown"
                    if platform == 'amap': loc = response_data.get('geocodes', [{}])[0].get('location', 'N/A')
                    elif platform == 'baidu': loc_dict = response_data.get('result', {}).get('location', {}); loc = f"{loc_dict.get('lat', 'N/A')},{loc_dict.get('lng', 'N/A')}" if loc_dict else 'N/A'
                    return f"Retrieved coordinates: {loc}"
                elif service == "Reverse Geocoding":
                    addr = "Unknown"
                    if platform == 'amap': addr = response_data.get('regeocode', {}).get('formatted_address', 'N/A')
                    elif platform == 'baidu': addr = response_data.get('result', {}).get('formatted_address', 'N/A')
                    return f"Retrieved address: {addr[:40]}{'...' if len(addr)>40 else ''}"
                elif service == "Route Planning":
                    dist, dur = "N/A", "N/A"
                    try:
                        if platform == 'amap': path = response_data.get('route', {}).get('paths', [{}])[0]; dist, dur = path.get('distance'), path.get('duration')
                        elif platform == 'baidu': route = response_data.get('result', {}).get('routes', [{}])[0]; dist, dur = route.get('distance'), route.get('duration')
                        return f"Distance: {dist} meters, Time: {dur} seconds"
                    except (IndexError, KeyError, TypeError): return "Route planning successful"
                else: return "Call successful"
            return ""
        except Exception as e:
            return colored(f"Result parsing error: {e}", 'magenta')

    @staticmethod
    def _print_error_analysis(error_text: str):
        """Prints specific warning icons based on error text keywords."""
        indent = "  │  "
        error_lower = error_text.lower()
        analysis_printed = False
        if 'quota' in error_lower or '配额' in error_lower or '并发' in error_lower or 'qps' in error_lower or 'limit' in error_lower:
            print(f"{indent}{EMOJI_QUOTA} {colored('[Suspected quota/concurrency limit]', 'yellow')}")
            analysis_printed = True
        if 'invalid user key' in error_lower or 'key不正确' in error_lower or 'ak不存在' in error_lower or '权限校验失败' in error_lower or 'permission denied' in error_lower or 'key status' in error_lower or '无效' in error_lower :
            print(f"{indent}{EMOJI_PERM} {colored('[Invalid key or service not enabled/insufficient permissions]', 'red')}")
            analysis_printed = True
        if 'domain' in error_lower or 'ip白名单' in error_lower or 'referer' in error_lower or 'sn校验失败' in error_lower:
             print(f"{indent}{EMOJI_PERM} {colored('[IP/domain/Referer/SN whitelist verification failed]', 'magenta')}")
             analysis_printed = True
        if 'timeout' in error_lower or '超时' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[Request timed out]', 'blue')}")
            analysis_printed = True
        if 'json解析失败' in error_lower or '非预期响应类型' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[Response format error or unexpected]', 'magenta')}")
            analysis_printed = True
        if '请求错误' in error_text or 'http' in error_lower or 'connection' in error_lower or 'ssl' in error_lower:
            print(f"{indent}{EMOJI_NETWORK} {colored('[Network/connection/HTTP error]', 'blue')}")
            analysis_printed = True


# --- Main Execution ---

def parse_args():
    """Parses command line arguments."""
    parser = argparse.ArgumentParser(
        description=f"{EMOJI_TOOL} Automatic Detection Tool for Map API Keys (v3.6)", # Version bump for banner
        formatter_class=argparse.RawTextHelpFormatter
        )
    parser.add_argument('-k', '--keys', nargs='+', help="Specify one or more keys directly in the command line")
    parser.add_argument('-f', '--file', help="Path to the file containing keys (one key per line)")
    parser.add_argument('-t', '--threads', type=int, default=5, help="Number of concurrent testing threads (default: 5)")
    parser.add_argument('-o', '--output', help="Output successful detection results to the specified JSON file")
    parser.add_argument('--skip-static', action='store_true', help="Skip detection of static map services")

    temp_args, _ = parser.parse_known_args()
    if not temp_args.keys and not temp_args.file:
         # Print banner before showing help on error
         print(colored(BANNER, 'cyan'))
         parser.print_help()
         print(colored("\nError: At least one key must be provided via -k or -f.", "red"))
         exit(1)

    return parser.parse_args()

def main(keys_to_test: List[str], num_threads: int, output_file: Optional[str], skip_static: bool):
    """Main detection workflow."""
    tester = APITester()
    futures_map = {}
    results_for_output = []

    print(f"{EMOJI_DETECT} Starting detection of {len(keys_to_test)} keys using {num_threads} threads...")
    if skip_static: print(f"{EMOJI_INFO} Skipping static map detection.")

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for key in keys_to_test:
            key = key.strip()
            if not key: continue

            platform, key_format_desc = KeyValidator.detect_platform(key)
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            if platform == 'unknown':
                print(colored(f"\n{EMOJI_FAIL} Invalid key format: {key}", "red"))
                continue
            if platform == 'ambiguous':
                 print(colored(f"\n{EMOJI_WARNING} Detected ambiguous key: {masked_key}", "yellow"))
                 print(colored(f"  Format ({key_format_desc}) could belong to Amap or Baidu, will attempt both.", "yellow"))
                 platforms_to_try = ['amap', 'baidu']
            else:
                 platforms_to_try = [platform]

            for p in platforms_to_try:
                print(colored(f"\n{EMOJI_DETECT} Testing {p.upper()} service - Key: {masked_key}", "blue", attrs=['bold']))
                print(f"  {EMOJI_PLATFORM} Platform identified: {p.upper()}")
                print(f"  {EMOJI_KEY_FORMAT} Key format: {key_format_desc}")

                platform_services = tester._generate_service_configs(p)

                for service_name, service_config in platform_services.items():
                    if skip_static and service_config.get('is_static', False):
                        continue

                    future = executor.submit(tester.test_service, p, service_name, key, service_config)
                    futures_map[future] = {'key': key, 'platform': p, 'service_name': service_name}

        print(colored("\n--- Detection Results ---", attrs=['bold']))
        processed_count = 0
        total_tasks = len(futures_map)

        for future in as_completed(futures_map):
            processed_count += 1
            context = futures_map[future]
            key = context['key']
            platform = context['platform']
            service_name = context['service_name']
            masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key

            try:
                success, detail = future.result()
                ReportGenerator.print_result(service_name, platform, success, detail)

                if success and output_file:
                    result_item = {
                        'key': key,
                        'platform': platform,
                        'service_name': service_name,
                        'request_params': detail.get('params'),
                        'response_summary': ReportGenerator._extract_success_info(service_name, platform, detail.get('response')),
                        'latency_ms': detail.get('latency'),
                    }
                    results_for_output.append(result_item)

            except Exception as exc:
                 print(colored(f"{EMOJI_FAIL} {service_name.ljust(7)}", 'red'))
                 print(f"  └─ {colored(f'An internal error occurred while executing {service_name} (Platform: {platform}, Key: {masked_key}): {exc}', 'red')}")

    if output_file:
        print(colored(f"\n{EMOJI_WRITE} Writing {len(results_for_output)} successful results to: {output_file}", "blue"))
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(results_for_output, f, ensure_ascii=False, indent=4)
            print(colored(f"{EMOJI_SUCCESS} Successfully written to JSON file.", "green"))
        except IOError as e:
            print(colored(f"{EMOJI_ERROR} Failed to write to file: {e}", "red"))
        except TypeError as e:
             print(colored(f"{EMOJI_ERROR} Failed to serialize results to JSON (may contain non-serializable data): {e}", "red"))


if __name__ == "__main__":
    # --- Print Banner First ---
    print(colored(BANNER, 'cyan')) # Choose a color for the banner

    args = parse_args() # Parse arguments after printing banner
    keys = []

    if args.keys:
        keys.extend(args.keys)
    if args.file:
        try:
            with open(args.file, 'r', encoding='utf-8') as f:
                keys.extend([line.strip() for line in f if line.strip()])
        except FileNotFoundError:
             print(colored(f"{EMOJI_ERROR} Error: Key file '{args.file}' not found.", "red"))
             exit(1)
        except Exception as e:
             print(colored(f"{EMOJI_ERROR} Error: An error occurred while reading key file '{args.file}': {e}", "red"))
             exit(1)

    unique_keys = sorted(list(set(keys)))

    # Input validation is handled within parse_args now

    main(unique_keys, args.threads, args.output, args.skip_static)
    print(colored(f"\n{EMOJI_TOOL} Detection completed.", attrs=['bold']))

Automatic Detection Tool for Map API Keys

Introduction

Automatic Detection Tool for Map API Keys is an efficient and comprehensive solution designed to simplify the management and risk assessment of map API keys during penetration testing. Whether for penetration testers or security researchers, analyzing an application or system's reliance on map APIs often presents multiple challenges, including key validity verification, service permission detection, and potential security risk assessment. This tool automates these critical tasks, significantly enhancing efficiency and helping identify potential security vulnerabilities related to map services.

Main Features

  • Key Validity Verification: Automatically verifies the validity status of map API keys (Amap/Baidu) through multiple interfaces and service calls, quickly identifying keys that may be abused or have been leaked.
  • Batch Key Testing: Supports batch testing of multiple map API keys, improving penetration testing efficiency and quickly assessing the risks of a large number of keys.

Technical Characteristics

  • Python Development: Developed using Python, offering good cross-platform compatibility and readability.
  • Requests Library: Utilizes the Requests library to send HTTP requests, simplifying the API calling process.

Quick Start

  1. Run the script:
    python your_script_name.py -k your_amap_key

Output as follows:

image

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.