- config.json: BER→EZE route, €700 threshold, 14-28 nights, Dec-Mar preferred - monitor.py: config-driven, multi-route, windowed date search, openclaw alerts - adapters/base.py: abstract FlightAdapter interface - adapters/kiwi.py: Kiwi/Tequila v2 adapter (stub until KIWI_API_KEY is set) - requirements.txt, cron-install.sh (daily 08:00 UTC), README.md
226 lines
7.0 KiB
Python
226 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
||
"""Flight price monitor.
|
||
|
||
Reads routes from config.json, searches for round-trip flights via the
|
||
selected adapter, and fires an openclaw alert when prices drop below the
|
||
configured threshold.
|
||
|
||
Usage:
|
||
python monitor.py [--adapter kiwi] [--config config.json]
|
||
"""
|
||
|
||
import argparse
|
||
import importlib
|
||
import json
|
||
import logging
|
||
import subprocess
|
||
import sys
|
||
from datetime import date, timedelta
|
||
from pathlib import Path
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Logging — append to monitor.log in same directory as this script
|
||
# ---------------------------------------------------------------------------
|
||
LOG_FILE = Path(__file__).parent / "monitor.log"
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
handlers=[
|
||
logging.FileHandler(LOG_FILE, encoding="utf-8"),
|
||
logging.StreamHandler(sys.stdout),
|
||
],
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Adapter loader
|
||
# ---------------------------------------------------------------------------
|
||
|
||
ADAPTER_MAP = {
|
||
"kiwi": "adapters.kiwi.KiwiAdapter",
|
||
}
|
||
|
||
|
||
def load_adapter(name: str):
|
||
"""Dynamically load an adapter class by short name."""
|
||
if name not in ADAPTER_MAP:
|
||
raise ValueError(
|
||
f"Unknown adapter '{name}'. Available: {', '.join(ADAPTER_MAP)}"
|
||
)
|
||
module_path, class_name = ADAPTER_MAP[name].rsplit(".", 1)
|
||
module = importlib.import_module(module_path)
|
||
cls = getattr(module, class_name)
|
||
return cls()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Date window generation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def generate_windows(route: dict, horizon_days: int = 365) -> list[tuple[str, str]]:
|
||
"""Generate (date_from, date_to) pairs to search.
|
||
|
||
Produces one window per fortnight for the next `horizon_days` days.
|
||
Each window spans 2 weeks of possible departure dates.
|
||
"""
|
||
today = date.today()
|
||
preferred = set(route.get("preferred_months", []))
|
||
monitor_year_round = route.get("monitor_year_round", True)
|
||
|
||
windows = []
|
||
step = timedelta(weeks=2)
|
||
window_span = timedelta(weeks=2)
|
||
|
||
cursor = today + timedelta(days=7) # start one week out
|
||
end_horizon = today + timedelta(days=horizon_days)
|
||
|
||
while cursor < end_horizon:
|
||
month = cursor.month
|
||
if monitor_year_round or month in preferred:
|
||
date_from = cursor.strftime("%Y-%m-%d")
|
||
date_to = (cursor + window_span).strftime("%Y-%m-%d")
|
||
windows.append((date_from, date_to))
|
||
cursor += step
|
||
|
||
return windows
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Alerting
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def send_alert(route: dict, result: dict):
|
||
"""Fire an openclaw alert for a price hit."""
|
||
msg = (
|
||
f"✈️ Flight alert: {route['origin']}→{route['destination']} "
|
||
f"€{result['price_eur']:.0f} on "
|
||
f"{result['outbound_date']}–{result['return_date']}! "
|
||
f"{result['deep_link']}"
|
||
)
|
||
cmd = ["openclaw", "send", "--text", msg]
|
||
try:
|
||
subprocess.run(cmd, check=True, timeout=30)
|
||
logger.info("ALERT SENT: %s", msg)
|
||
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as exc:
|
||
logger.error("Failed to send alert: %s | command: %s", exc, " ".join(cmd))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main monitor loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def check_route(route: dict, adapter) -> int:
|
||
"""Check a single route. Returns number of alerts fired."""
|
||
route_id = route["id"]
|
||
origin = route["origin"]
|
||
destination = route["destination"]
|
||
threshold = route["threshold_eur"]
|
||
min_nights = route["min_nights"]
|
||
max_nights = route["max_nights"]
|
||
|
||
logger.info(
|
||
"Checking %s: %s→%s threshold=€%s min=%d max=%d nights",
|
||
route_id, origin, destination, threshold, min_nights, max_nights,
|
||
)
|
||
|
||
windows = generate_windows(route)
|
||
logger.info("Generated %d search windows for %s", len(windows), route_id)
|
||
|
||
all_results: list[dict] = []
|
||
|
||
for date_from, date_to in windows:
|
||
try:
|
||
results = adapter.search(
|
||
origin=origin,
|
||
destination=destination,
|
||
date_from=date_from,
|
||
date_to=date_to,
|
||
min_nights=min_nights,
|
||
max_nights=max_nights,
|
||
)
|
||
except Exception as exc:
|
||
logger.error(
|
||
"Adapter error for %s window %s–%s: %s",
|
||
route_id, date_from, date_to, exc,
|
||
)
|
||
results = []
|
||
|
||
logger.debug(
|
||
"Window %s–%s: %d results", date_from, date_to, len(results)
|
||
)
|
||
all_results.extend(results)
|
||
|
||
alerts_fired = 0
|
||
|
||
if not all_results:
|
||
logger.info("Route %s: no results returned (adapter stub or no flights found)", route_id)
|
||
return 0
|
||
|
||
# Sort by price
|
||
all_results.sort(key=lambda r: r["price_eur"])
|
||
best = all_results[0]
|
||
logger.info(
|
||
"Route %s: %d results, best price €%.0f on %s–%s",
|
||
route_id, len(all_results), best["price_eur"], best["outbound_date"], best["return_date"],
|
||
)
|
||
|
||
for result in all_results:
|
||
if result["price_eur"] < threshold:
|
||
send_alert(route, result)
|
||
alerts_fired += 1
|
||
|
||
if alerts_fired == 0:
|
||
logger.info(
|
||
"Route %s: best price €%.0f is above threshold €%s — no alert",
|
||
route_id, best["price_eur"], threshold,
|
||
)
|
||
|
||
return alerts_fired
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Flight price monitor")
|
||
parser.add_argument(
|
||
"--adapter", default="kiwi",
|
||
choices=list(ADAPTER_MAP.keys()),
|
||
help="Price adapter to use (default: kiwi)",
|
||
)
|
||
parser.add_argument(
|
||
"--config", default=Path(__file__).parent / "config.json",
|
||
help="Path to config file (default: config.json)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
config_path = Path(args.config)
|
||
if not config_path.exists():
|
||
logger.error("Config file not found: %s", config_path)
|
||
sys.exit(1)
|
||
|
||
with config_path.open() as f:
|
||
config = json.load(f)
|
||
|
||
routes = config.get("routes", [])
|
||
if not routes:
|
||
logger.warning("No routes defined in config — nothing to do.")
|
||
return
|
||
|
||
try:
|
||
adapter = load_adapter(args.adapter)
|
||
except ValueError as exc:
|
||
logger.error(str(exc))
|
||
sys.exit(1)
|
||
|
||
logger.info("=== Flight monitor run start | adapter=%s | routes=%d ===", args.adapter, len(routes))
|
||
|
||
total_alerts = 0
|
||
for route in routes:
|
||
total_alerts += check_route(route, adapter)
|
||
|
||
logger.info("=== Run complete | total alerts=%d ===", total_alerts)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|