Background#
During an APK decompilation, a KEY related to the map was found. Several interfaces were tried but failed to call successfully. It was unclear which corresponding interface and service were involved, so I collaborated with GPT to write this code, aiming to achieve automated AK utilization.
Code#
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
import requests
import random
import time
import certifi
import json
import traceback
import os
from termcolor import colored
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.exceptions import InsecureRequestWarning
# --- Banner ---
BANNER = """
by: Automatic Detection Tool for Map API Keys
"""
# --- Configuration ---
# Emojis for output enhancement
EMOJI_DETECT = "🔍"
EMOJI_PLATFORM = "🏷️"
EMOJI_KEY_FORMAT = "🔑"
EMOJI_SERVICE = "▶️"
EMOJI_SUCCESS = "✅"
EMOJI_FAIL = "❌"
EMOJI_CLOCK = "⏱️"
EMOJI_INFO = "ℹ️"
EMOJI_ERROR = "❗"
EMOJI_WARNING = "⚠️"
EMOJI_NETWORK = "🌐"
EMOJI_QUOTA = "🚦"
EMOJI_PERM = "🔒"
EMOJI_WRITE = "💾"
EMOJI_TOOL = "🛠️"
# Disable SSL warnings (use cautiously)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Safe coordinate bounds
SAFE_BOUNDS = {
'global': {'lon_min': -180, 'lon_max': 180, 'lat_min': -90, 'lat_max': 90},
'china': {'lon_min': 73.66, 'lon_max': 135.05, 'lat_min': 3.86, 'lat_max': 53.55}
}
# Test address pool
TEST_ADDRESS_POOL = [
"Wangjing Soho, Chaoyang District, Beijing",
"1288 Lujiazui Ring Road, Pudong New District, Shanghai",
"5 Linjiang Avenue, Zhujiang New Town, Tianhe District, Guangzhou",
"6001 Shennan Avenue, Futian District, Shenzhen",
"1 Hongxing Road, Jinjiang District, Chengdu"
]
# --- Helper Classes ---
class GeoGenerator:
"""Generates random geographic data."""
@staticmethod
def random_coord(region='china'):
bounds = SAFE_BOUNDS[region]
return (
round(random.uniform(bounds['lat_min'], bounds['lat_max']), 6),
round(random.uniform(bounds['lon_min'], bounds['lon_max']), 6)
)
@staticmethod
def random_ip():
# Generate a public IP range more likely to be locatable
while True:
ip = f"{random.randint(1, 223)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
parts = [int(p) for p in ip.split('.')]
if parts[0] == 10: continue
if parts[0] == 127: continue
if parts[0] == 172 and 16 <= parts[1] <= 31: continue
if parts[0] == 192 and parts[1] == 168: continue
if parts[0] >= 224: continue
return ip
class KeyValidator:
"""Validates key format and detects platform."""
@staticmethod
def detect_platform(key: str) -> Tuple[str, str]:
"""Detects platform and provides format description."""
if re.match(r'^[0-9a-fA-F]{32}$', key):
return 'amap', 'Amap (32-bit Hex)'
if re.match(r'^[A-Za-z0-9]{32}$', key):
if re.match(r'^[0-9a-fA-F]{32}$', key):
return 'ambiguous', 'Could be Amap or Baidu (pure Hex characters)'
return 'baidu', 'Baidu (32-bit Alnum)'
return 'unknown', 'Unknown format'
class APITester:
"""Executes API test calls."""
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0'
]
def _generate_service_configs(self, platform: str) -> Dict:
"""Generates service configs dynamically for fresh random data."""
coord = GeoGenerator.random_coord()
random_addr = random.choice(TEST_ADDRESS_POOL)
random_ip = GeoGenerator.random_ip()
if platform == 'amap':
return {
"Static Map": {
"url": "https://restapi.amap.com/v3/staticmap",
"params": {"location": f"{coord[1]},{coord[0]}", "zoom": 10, "size": "400*300", "markers": f"mid,,A:{coord[1]},{coord[0]}"},
"is_static": True
},
"Geocoding": {
"url": "https://restapi.amap.com/v3/geocode/geo",
"params": {"address": random_addr}
},
"Reverse Geocoding": {
"url": "https://restapi.amap.com/v3/geocode/regeo",
"params": {"location": f"{coord[1]},{coord[0]}"}
},
"Route Planning": {
"url": "https://restapi.amap.com/v3/direction/driving",
"params": {"origin": "116.481028,39.989643", "destination": "116.434446,39.90816"}
},
"IP Location": {
"url": "https://restapi.amap.com/v3/ip",
"params": {"ip": random_ip}
}
}
elif platform == 'baidu':
return {
"Static Map": {
"url": "https://api.map.baidu.com/staticimage/v2",
"params": {"center": f"{coord[1]},{coord[0]}", "zoom": 10, "width": 400, "height": 300, "markers": f"{coord[1]},{coord[0]}"},
"is_static": True
},
"Geocoding": {
"url": "https://api.map.baidu.com/geocoding/v3/",
"params": {"address": random_addr, "output": "json"}
},
"Reverse Geocoding": {
"url": "https://api.map.baidu.com/reverse_geocoding/v3/",
"params": {"location": f"{coord[0]},{coord[1]}", "output": "json"}
},
"Route Planning": {
"url": "https://api.map.baidu.com/direction/v2/driving",
"params": {"origin": "40.01116,116.339303", "destination": "39.936404,116.452562"}
},
"IP Location": {
"url": "https://api.map.baidu.com/location/ip",
"params": {"ip": random_ip, "coor": "bd09ll"}
}
}
return {}
def _call_api(self, url: str, params: dict, platform: str, service_name: str, is_static: bool = False) -> Tuple[bool, dict]:
"""Executes a single API request with retries."""
current_params = params.copy()
retries = current_params.pop('retry', 2) + 1
detail = {'params': params, 'latency': None, 'error': None, 'error_info': None, 'response': None}
for attempt in range(retries):
response = None
try:
headers = {'User-Agent': random.choice(self.user_agents)}
start_time = time.time()
response = requests.get(
url,
params=current_params,
headers=headers,
timeout=15,
verify=certifi.where(),
stream=is_static
)
latency = round((time.time() - start_time) * 1000, 2)
detail['latency'] = latency
content_type = response.headers.get('Content-Type', '').lower()
if is_static and response.status_code == 200 and content_type.startswith('image/'):
detail['response'] = {'content_type': content_type, 'status_code': 200}
response.close()
return True, detail
if response.status_code != 200:
error_msg = f"HTTP {response.status_code}"
try:
error_data = response.json()
error_msg += f". API Msg: {json.dumps(error_data, ensure_ascii=False)}"
except (json.JSONDecodeError, requests.exceptions.RequestException):
error_msg += f". Response: {response.text[:200]}..."
detail['error'] = error_msg
raise requests.HTTPError(error_msg, response=response)
data = None
try:
if 'application/json' in content_type or 'text/javascript' in content_type:
data = response.json()
elif platform == 'amap' and service_name == "IP Location" and 'text/html' in content_type and response.text.startswith('{') and response.text.endswith('}'):
data = json.loads(response.text)
else:
raise ValueError(f"Unexpected response type: {content_type}")
detail['response'] = data
success = False
error_info = ""
if platform == 'amap':
success = str(data.get('status')) == '1'
if not success: error_info = data.get('info', 'Unknown Amap error') + f" (infocode: {data.get('infocode', '')})"
elif platform == 'baidu':
success = data.get('status') == 0
if not success: error_info = data.get('message', 'Unknown Baidu error') + f" (status: {data.get('status', '')})"
else: error_info = "Unknown platform error"
detail['error_info'] = error_info if not success else None
return success, detail
except json.JSONDecodeError as e:
detail['error'] = f"JSON parsing failed ({content_type}): {str(e)} | Response: {response.text[:200]}..."
return False, detail
except ValueError as e:
detail['error'] = f"{str(e)} | Response: {response.text[:200]}..."
return False, detail
except requests.exceptions.Timeout:
detail['error'] = "Request timed out"
if attempt == retries - 1: return False, detail
time.sleep(1 + 1.5 * attempt)
except requests.exceptions.RequestException as e:
detail['error'] = f"Request error: {type(e).__name__}" + (f" (Status: {response.status_code})" if response else "")
if attempt == retries - 1: return False, detail
time.sleep(1 + 1.5 * attempt)
except Exception as e:
detail['error'] = f"Unknown processing error: {type(e).__name__}: {str(e)}"
print(f"{EMOJI_ERROR} {colored('Internal script error', 'red')}: {traceback.format_exc()[:500]}...")
return False, detail
finally:
if response:
response.close()
detail['error'] = f"Exceeded maximum retry attempts ({retries}). Last error: {detail['error']}"
return False, detail
def test_service(self, platform: str, service_name: str, key: str, service_config: dict) -> Tuple[bool, dict]:
"""Tests a single service with its config."""
params = service_config['params'].copy()
params['key' if platform == 'amap' else 'ak'] = key
is_static = service_config.get('is_static', False)
return self._call_api(service_config['url'], params, platform, service_name, is_static)
class ReportGenerator:
"""Generates console output for test results."""
@staticmethod
def print_result(service: str, platform: str, success: bool, detail: dict):
status_icon = EMOJI_SUCCESS if success else EMOJI_FAIL
status_color = 'green' if success else 'red'
service_status = colored("Success" if success else "Failed", status_color)
print(f"{EMOJI_SERVICE} {service.ljust(7)} {status_icon} {service_status}")
indent = " │"
latency = detail.get('latency')
latency_str = f"{latency:.2f} ms" if latency is not None else "N/A"
print(f"{indent} {EMOJI_CLOCK} Latency: {latency_str}")
params_str = " | ".join([f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={str(v)[:30]}{'...' if len(str(v))>30 else ''}"
for k, v in detail.get('params', {}).items() if k not in ['key', 'ak']])
if params_str:
print(f"{indent} {EMOJI_INFO} Parameters: {colored(params_str, 'cyan')}")
if success:
success_info_str = ReportGenerator._extract_success_info(service, platform, detail.get('response'))
if success_info_str:
print(f"{indent} {EMOJI_INFO} Result: {colored(success_info_str, 'green')}")
error_msg = detail.get('error')
error_info = detail.get('error_info')
if error_info:
print(f"{indent} {EMOJI_WARNING} Reason: {colored(error_info, 'yellow')}")
ReportGenerator._print_error_analysis(error_info)
elif error_msg:
print(f"{indent} {EMOJI_ERROR} Reason: {colored(error_msg, 'red')}")
ReportGenerator._print_error_analysis(error_msg)
elif not success:
print(f"{indent} {EMOJI_ERROR} Reason: {colored('Failed for unknown reasons', 'red')}")
print( " └" + "─" * 30)
@staticmethod
def _extract_success_info(service: str, platform: str, response_data: Optional[Any]) -> str:
"""Extracts a brief summary from successful API response data."""
if response_data is None: return ""
try:
if isinstance(response_data, dict) and response_data.get('content_type', '').startswith('image/'):
return f"Successfully retrieved image ({response_data['content_type']})"
elif isinstance(response_data, dict):
if service == "IP Location":
city = "Unknown"
if platform == 'amap': city = response_data.get('city') if isinstance(response_data.get('city'), str) and response_data.get('city') else response_data.get('adcode', 'N/A')
elif platform == 'baidu': city = response_data.get('content', {}).get('address_detail', {}).get('city', 'N/A')
return f"Located city: {city}" if city and city != 'N/A' else "Location successful (no city information)"
elif service == "Geocoding":
loc = "Unknown"
if platform == 'amap': loc = response_data.get('geocodes', [{}])[0].get('location', 'N/A')
elif platform == 'baidu': loc_dict = response_data.get('result', {}).get('location', {}); loc = f"{loc_dict.get('lat', 'N/A')},{loc_dict.get('lng', 'N/A')}" if loc_dict else 'N/A'
return f"Retrieved coordinates: {loc}"
elif service == "Reverse Geocoding":
addr = "Unknown"
if platform == 'amap': addr = response_data.get('regeocode', {}).get('formatted_address', 'N/A')
elif platform == 'baidu': addr = response_data.get('result', {}).get('formatted_address', 'N/A')
return f"Retrieved address: {addr[:40]}{'...' if len(addr)>40 else ''}"
elif service == "Route Planning":
dist, dur = "N/A", "N/A"
try:
if platform == 'amap': path = response_data.get('route', {}).get('paths', [{}])[0]; dist, dur = path.get('distance'), path.get('duration')
elif platform == 'baidu': route = response_data.get('result', {}).get('routes', [{}])[0]; dist, dur = route.get('distance'), route.get('duration')
return f"Distance: {dist} meters, Time: {dur} seconds"
except (IndexError, KeyError, TypeError): return "Route planning successful"
else: return "Call successful"
return ""
except Exception as e:
return colored(f"Result parsing error: {e}", 'magenta')
@staticmethod
def _print_error_analysis(error_text: str):
"""Prints specific warning icons based on error text keywords."""
indent = " │ "
error_lower = error_text.lower()
analysis_printed = False
if 'quota' in error_lower or '配额' in error_lower or '并发' in error_lower or 'qps' in error_lower or 'limit' in error_lower:
print(f"{indent}{EMOJI_QUOTA} {colored('[Suspected quota/concurrency limit]', 'yellow')}")
analysis_printed = True
if 'invalid user key' in error_lower or 'key不正确' in error_lower or 'ak不存在' in error_lower or '权限校验失败' in error_lower or 'permission denied' in error_lower or 'key status' in error_lower or '无效' in error_lower :
print(f"{indent}{EMOJI_PERM} {colored('[Invalid key or service not enabled/insufficient permissions]', 'red')}")
analysis_printed = True
if 'domain' in error_lower or 'ip白名单' in error_lower or 'referer' in error_lower or 'sn校验失败' in error_lower:
print(f"{indent}{EMOJI_PERM} {colored('[IP/domain/Referer/SN whitelist verification failed]', 'magenta')}")
analysis_printed = True
if 'timeout' in error_lower or '超时' in error_lower:
print(f"{indent}{EMOJI_NETWORK} {colored('[Request timed out]', 'blue')}")
analysis_printed = True
if 'json解析失败' in error_lower or '非预期响应类型' in error_lower:
print(f"{indent}{EMOJI_NETWORK} {colored('[Response format error or unexpected]', 'magenta')}")
analysis_printed = True
if '请求错误' in error_text or 'http' in error_lower or 'connection' in error_lower or 'ssl' in error_lower:
print(f"{indent}{EMOJI_NETWORK} {colored('[Network/connection/HTTP error]', 'blue')}")
analysis_printed = True
# --- Main Execution ---
def parse_args():
"""Parses command line arguments."""
parser = argparse.ArgumentParser(
description=f"{EMOJI_TOOL} Automatic Detection Tool for Map API Keys (v3.6)", # Version bump for banner
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-k', '--keys', nargs='+', help="Specify one or more keys directly in the command line")
parser.add_argument('-f', '--file', help="Path to the file containing keys (one key per line)")
parser.add_argument('-t', '--threads', type=int, default=5, help="Number of concurrent testing threads (default: 5)")
parser.add_argument('-o', '--output', help="Output successful detection results to the specified JSON file")
parser.add_argument('--skip-static', action='store_true', help="Skip detection of static map services")
temp_args, _ = parser.parse_known_args()
if not temp_args.keys and not temp_args.file:
# Print banner before showing help on error
print(colored(BANNER, 'cyan'))
parser.print_help()
print(colored("\nError: At least one key must be provided via -k or -f.", "red"))
exit(1)
return parser.parse_args()
def main(keys_to_test: List[str], num_threads: int, output_file: Optional[str], skip_static: bool):
"""Main detection workflow."""
tester = APITester()
futures_map = {}
results_for_output = []
print(f"{EMOJI_DETECT} Starting detection of {len(keys_to_test)} keys using {num_threads} threads...")
if skip_static: print(f"{EMOJI_INFO} Skipping static map detection.")
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for key in keys_to_test:
key = key.strip()
if not key: continue
platform, key_format_desc = KeyValidator.detect_platform(key)
masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key
if platform == 'unknown':
print(colored(f"\n{EMOJI_FAIL} Invalid key format: {key}", "red"))
continue
if platform == 'ambiguous':
print(colored(f"\n{EMOJI_WARNING} Detected ambiguous key: {masked_key}", "yellow"))
print(colored(f" Format ({key_format_desc}) could belong to Amap or Baidu, will attempt both.", "yellow"))
platforms_to_try = ['amap', 'baidu']
else:
platforms_to_try = [platform]
for p in platforms_to_try:
print(colored(f"\n{EMOJI_DETECT} Testing {p.upper()} service - Key: {masked_key}", "blue", attrs=['bold']))
print(f" {EMOJI_PLATFORM} Platform identified: {p.upper()}")
print(f" {EMOJI_KEY_FORMAT} Key format: {key_format_desc}")
platform_services = tester._generate_service_configs(p)
for service_name, service_config in platform_services.items():
if skip_static and service_config.get('is_static', False):
continue
future = executor.submit(tester.test_service, p, service_name, key, service_config)
futures_map[future] = {'key': key, 'platform': p, 'service_name': service_name}
print(colored("\n--- Detection Results ---", attrs=['bold']))
processed_count = 0
total_tasks = len(futures_map)
for future in as_completed(futures_map):
processed_count += 1
context = futures_map[future]
key = context['key']
platform = context['platform']
service_name = context['service_name']
masked_key = f"{key[:6]}...{key[-4:]}" if len(key) > 10 else key
try:
success, detail = future.result()
ReportGenerator.print_result(service_name, platform, success, detail)
if success and output_file:
result_item = {
'key': key,
'platform': platform,
'service_name': service_name,
'request_params': detail.get('params'),
'response_summary': ReportGenerator._extract_success_info(service_name, platform, detail.get('response')),
'latency_ms': detail.get('latency'),
}
results_for_output.append(result_item)
except Exception as exc:
print(colored(f"{EMOJI_FAIL} {service_name.ljust(7)}", 'red'))
print(f" └─ {colored(f'An internal error occurred while executing {service_name} (Platform: {platform}, Key: {masked_key}): {exc}', 'red')}")
if output_file:
print(colored(f"\n{EMOJI_WRITE} Writing {len(results_for_output)} successful results to: {output_file}", "blue"))
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results_for_output, f, ensure_ascii=False, indent=4)
print(colored(f"{EMOJI_SUCCESS} Successfully written to JSON file.", "green"))
except IOError as e:
print(colored(f"{EMOJI_ERROR} Failed to write to file: {e}", "red"))
except TypeError as e:
print(colored(f"{EMOJI_ERROR} Failed to serialize results to JSON (may contain non-serializable data): {e}", "red"))
if __name__ == "__main__":
# --- Print Banner First ---
print(colored(BANNER, 'cyan')) # Choose a color for the banner
args = parse_args() # Parse arguments after printing banner
keys = []
if args.keys:
keys.extend(args.keys)
if args.file:
try:
with open(args.file, 'r', encoding='utf-8') as f:
keys.extend([line.strip() for line in f if line.strip()])
except FileNotFoundError:
print(colored(f"{EMOJI_ERROR} Error: Key file '{args.file}' not found.", "red"))
exit(1)
except Exception as e:
print(colored(f"{EMOJI_ERROR} Error: An error occurred while reading key file '{args.file}': {e}", "red"))
exit(1)
unique_keys = sorted(list(set(keys)))
# Input validation is handled within parse_args now
main(unique_keys, args.threads, args.output, args.skip_static)
print(colored(f"\n{EMOJI_TOOL} Detection completed.", attrs=['bold']))
Automatic Detection Tool for Map API Keys
Introduction
Automatic Detection Tool for Map API Keys is an efficient and comprehensive solution designed to simplify the management and risk assessment of map API keys during penetration testing. Whether for penetration testers or security researchers, analyzing an application or system's reliance on map APIs often presents multiple challenges, including key validity verification, service permission detection, and potential security risk assessment. This tool automates these critical tasks, significantly enhancing efficiency and helping identify potential security vulnerabilities related to map services.
Main Features
- Key Validity Verification: Automatically verifies the validity status of map API keys (Amap/Baidu) through multiple interfaces and service calls, quickly identifying keys that may be abused or have been leaked.
- Batch Key Testing: Supports batch testing of multiple map API keys, improving penetration testing efficiency and quickly assessing the risks of a large number of keys.
Technical Characteristics
- Python Development: Developed using Python, offering good cross-platform compatibility and readability.
- Requests Library: Utilizes the Requests library to send HTTP requests, simplifying the API calling process.
Quick Start
- Run the script:
python your_script_name.py -k your_amap_key
Output as follows: