Files
flight-monitor/monitor.py
Otis 97ec0e5eec feat: flight price monitor with Kiwi adapter stub and cron setup
- config.json: BER→EZE route, €700 threshold, 14-28 nights, Dec-Mar preferred
- monitor.py: config-driven, multi-route, windowed date search, openclaw alerts
- adapters/base.py: abstract FlightAdapter interface
- adapters/kiwi.py: Kiwi/Tequila v2 adapter (stub until KIWI_API_KEY is set)
- requirements.txt, cron-install.sh (daily 08:00 UTC), README.md
2026-03-19 07:52:54 +00:00

226 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Flight price monitor.
Reads routes from config.json, searches for round-trip flights via the
selected adapter, and fires an openclaw alert when prices drop below the
configured threshold.
Usage:
python monitor.py [--adapter kiwi] [--config config.json]
"""
import argparse
import importlib
import json
import logging
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
# ---------------------------------------------------------------------------
# Logging — append to monitor.log in same directory as this script
# ---------------------------------------------------------------------------
LOG_FILE = Path(__file__).parent / "monitor.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Adapter loader
# ---------------------------------------------------------------------------
ADAPTER_MAP = {
"kiwi": "adapters.kiwi.KiwiAdapter",
}
def load_adapter(name: str):
"""Dynamically load an adapter class by short name."""
if name not in ADAPTER_MAP:
raise ValueError(
f"Unknown adapter '{name}'. Available: {', '.join(ADAPTER_MAP)}"
)
module_path, class_name = ADAPTER_MAP[name].rsplit(".", 1)
module = importlib.import_module(module_path)
cls = getattr(module, class_name)
return cls()
# ---------------------------------------------------------------------------
# Date window generation
# ---------------------------------------------------------------------------
def generate_windows(route: dict, horizon_days: int = 365) -> list[tuple[str, str]]:
"""Generate (date_from, date_to) pairs to search.
Produces one window per fortnight for the next `horizon_days` days.
Each window spans 2 weeks of possible departure dates.
"""
today = date.today()
preferred = set(route.get("preferred_months", []))
monitor_year_round = route.get("monitor_year_round", True)
windows = []
step = timedelta(weeks=2)
window_span = timedelta(weeks=2)
cursor = today + timedelta(days=7) # start one week out
end_horizon = today + timedelta(days=horizon_days)
while cursor < end_horizon:
month = cursor.month
if monitor_year_round or month in preferred:
date_from = cursor.strftime("%Y-%m-%d")
date_to = (cursor + window_span).strftime("%Y-%m-%d")
windows.append((date_from, date_to))
cursor += step
return windows
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def send_alert(route: dict, result: dict):
"""Fire an openclaw alert for a price hit."""
msg = (
f"✈️ Flight alert: {route['origin']}{route['destination']} "
f"{result['price_eur']:.0f} on "
f"{result['outbound_date']}{result['return_date']}! "
f"{result['deep_link']}"
)
cmd = ["openclaw", "send", "--text", msg]
try:
subprocess.run(cmd, check=True, timeout=30)
logger.info("ALERT SENT: %s", msg)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as exc:
logger.error("Failed to send alert: %s | command: %s", exc, " ".join(cmd))
# ---------------------------------------------------------------------------
# Main monitor loop
# ---------------------------------------------------------------------------
def check_route(route: dict, adapter) -> int:
"""Check a single route. Returns number of alerts fired."""
route_id = route["id"]
origin = route["origin"]
destination = route["destination"]
threshold = route["threshold_eur"]
min_nights = route["min_nights"]
max_nights = route["max_nights"]
logger.info(
"Checking %s: %s%s threshold=€%s min=%d max=%d nights",
route_id, origin, destination, threshold, min_nights, max_nights,
)
windows = generate_windows(route)
logger.info("Generated %d search windows for %s", len(windows), route_id)
all_results: list[dict] = []
for date_from, date_to in windows:
try:
results = adapter.search(
origin=origin,
destination=destination,
date_from=date_from,
date_to=date_to,
min_nights=min_nights,
max_nights=max_nights,
)
except Exception as exc:
logger.error(
"Adapter error for %s window %s%s: %s",
route_id, date_from, date_to, exc,
)
results = []
logger.debug(
"Window %s%s: %d results", date_from, date_to, len(results)
)
all_results.extend(results)
alerts_fired = 0
if not all_results:
logger.info("Route %s: no results returned (adapter stub or no flights found)", route_id)
return 0
# Sort by price
all_results.sort(key=lambda r: r["price_eur"])
best = all_results[0]
logger.info(
"Route %s: %d results, best price €%.0f on %s%s",
route_id, len(all_results), best["price_eur"], best["outbound_date"], best["return_date"],
)
for result in all_results:
if result["price_eur"] < threshold:
send_alert(route, result)
alerts_fired += 1
if alerts_fired == 0:
logger.info(
"Route %s: best price €%.0f is above threshold €%s — no alert",
route_id, best["price_eur"], threshold,
)
return alerts_fired
def main():
parser = argparse.ArgumentParser(description="Flight price monitor")
parser.add_argument(
"--adapter", default="kiwi",
choices=list(ADAPTER_MAP.keys()),
help="Price adapter to use (default: kiwi)",
)
parser.add_argument(
"--config", default=Path(__file__).parent / "config.json",
help="Path to config file (default: config.json)",
)
args = parser.parse_args()
config_path = Path(args.config)
if not config_path.exists():
logger.error("Config file not found: %s", config_path)
sys.exit(1)
with config_path.open() as f:
config = json.load(f)
routes = config.get("routes", [])
if not routes:
logger.warning("No routes defined in config — nothing to do.")
return
try:
adapter = load_adapter(args.adapter)
except ValueError as exc:
logger.error(str(exc))
sys.exit(1)
logger.info("=== Flight monitor run start | adapter=%s | routes=%d ===", args.adapter, len(routes))
total_alerts = 0
for route in routes:
total_alerts += check_route(route, adapter)
logger.info("=== Run complete | total alerts=%d ===", total_alerts)
if __name__ == "__main__":
main()