feat: flight price monitor with Kiwi adapter stub and cron setup

- config.json: BER→EZE route, €700 threshold, 14-28 nights, Dec-Mar preferred
- monitor.py: config-driven, multi-route, windowed date search, openclaw alerts
- adapters/base.py: abstract FlightAdapter interface
- adapters/kiwi.py: Kiwi/Tequila v2 adapter (stub until KIWI_API_KEY is set)
- requirements.txt, cron-install.sh (daily 08:00 UTC), README.md
This commit is contained in:
Otis
2026-03-19 07:52:54 +00:00
commit 97ec0e5eec
8 changed files with 571 additions and 0 deletions

225
monitor.py Normal file
View File

@@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""Flight price monitor.
Reads routes from config.json, searches for round-trip flights via the
selected adapter, and fires an openclaw alert when prices drop below the
configured threshold.
Usage:
python monitor.py [--adapter kiwi] [--config config.json]
"""
import argparse
import importlib
import json
import logging
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
# ---------------------------------------------------------------------------
# Logging — append to monitor.log in same directory as this script
# ---------------------------------------------------------------------------
LOG_FILE = Path(__file__).parent / "monitor.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Adapter loader
# ---------------------------------------------------------------------------
ADAPTER_MAP = {
"kiwi": "adapters.kiwi.KiwiAdapter",
}
def load_adapter(name: str):
"""Dynamically load an adapter class by short name."""
if name not in ADAPTER_MAP:
raise ValueError(
f"Unknown adapter '{name}'. Available: {', '.join(ADAPTER_MAP)}"
)
module_path, class_name = ADAPTER_MAP[name].rsplit(".", 1)
module = importlib.import_module(module_path)
cls = getattr(module, class_name)
return cls()
# ---------------------------------------------------------------------------
# Date window generation
# ---------------------------------------------------------------------------
def generate_windows(route: dict, horizon_days: int = 365) -> list[tuple[str, str]]:
"""Generate (date_from, date_to) pairs to search.
Produces one window per fortnight for the next `horizon_days` days.
Each window spans 2 weeks of possible departure dates.
"""
today = date.today()
preferred = set(route.get("preferred_months", []))
monitor_year_round = route.get("monitor_year_round", True)
windows = []
step = timedelta(weeks=2)
window_span = timedelta(weeks=2)
cursor = today + timedelta(days=7) # start one week out
end_horizon = today + timedelta(days=horizon_days)
while cursor < end_horizon:
month = cursor.month
if monitor_year_round or month in preferred:
date_from = cursor.strftime("%Y-%m-%d")
date_to = (cursor + window_span).strftime("%Y-%m-%d")
windows.append((date_from, date_to))
cursor += step
return windows
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def send_alert(route: dict, result: dict):
"""Fire an openclaw alert for a price hit."""
msg = (
f"✈️ Flight alert: {route['origin']}{route['destination']} "
f"{result['price_eur']:.0f} on "
f"{result['outbound_date']}{result['return_date']}! "
f"{result['deep_link']}"
)
cmd = ["openclaw", "send", "--text", msg]
try:
subprocess.run(cmd, check=True, timeout=30)
logger.info("ALERT SENT: %s", msg)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as exc:
logger.error("Failed to send alert: %s | command: %s", exc, " ".join(cmd))
# ---------------------------------------------------------------------------
# Main monitor loop
# ---------------------------------------------------------------------------
def check_route(route: dict, adapter) -> int:
"""Check a single route. Returns number of alerts fired."""
route_id = route["id"]
origin = route["origin"]
destination = route["destination"]
threshold = route["threshold_eur"]
min_nights = route["min_nights"]
max_nights = route["max_nights"]
logger.info(
"Checking %s: %s%s threshold=€%s min=%d max=%d nights",
route_id, origin, destination, threshold, min_nights, max_nights,
)
windows = generate_windows(route)
logger.info("Generated %d search windows for %s", len(windows), route_id)
all_results: list[dict] = []
for date_from, date_to in windows:
try:
results = adapter.search(
origin=origin,
destination=destination,
date_from=date_from,
date_to=date_to,
min_nights=min_nights,
max_nights=max_nights,
)
except Exception as exc:
logger.error(
"Adapter error for %s window %s%s: %s",
route_id, date_from, date_to, exc,
)
results = []
logger.debug(
"Window %s%s: %d results", date_from, date_to, len(results)
)
all_results.extend(results)
alerts_fired = 0
if not all_results:
logger.info("Route %s: no results returned (adapter stub or no flights found)", route_id)
return 0
# Sort by price
all_results.sort(key=lambda r: r["price_eur"])
best = all_results[0]
logger.info(
"Route %s: %d results, best price €%.0f on %s%s",
route_id, len(all_results), best["price_eur"], best["outbound_date"], best["return_date"],
)
for result in all_results:
if result["price_eur"] < threshold:
send_alert(route, result)
alerts_fired += 1
if alerts_fired == 0:
logger.info(
"Route %s: best price €%.0f is above threshold €%s — no alert",
route_id, best["price_eur"], threshold,
)
return alerts_fired
def main():
parser = argparse.ArgumentParser(description="Flight price monitor")
parser.add_argument(
"--adapter", default="kiwi",
choices=list(ADAPTER_MAP.keys()),
help="Price adapter to use (default: kiwi)",
)
parser.add_argument(
"--config", default=Path(__file__).parent / "config.json",
help="Path to config file (default: config.json)",
)
args = parser.parse_args()
config_path = Path(args.config)
if not config_path.exists():
logger.error("Config file not found: %s", config_path)
sys.exit(1)
with config_path.open() as f:
config = json.load(f)
routes = config.get("routes", [])
if not routes:
logger.warning("No routes defined in config — nothing to do.")
return
try:
adapter = load_adapter(args.adapter)
except ValueError as exc:
logger.error(str(exc))
sys.exit(1)
logger.info("=== Flight monitor run start | adapter=%s | routes=%d ===", args.adapter, len(routes))
total_alerts = 0
for route in routes:
total_alerts += check_route(route, adapter)
logger.info("=== Run complete | total alerts=%d ===", total_alerts)
if __name__ == "__main__":
main()