feat: initial flight monitor
- Python flight price monitor for BER↔EZE and other routes - Adapter pattern for extensibility (Kiwi/Tequila implemented) - OpenClaw alerting integration - Cron-friendly one-shot execution - Full logging of all checks - Graceful handling when API key not set Implements monkey-island/flight-monitor#2
This commit is contained in:
14
.gitignore
vendored
Normal file
14
.gitignore
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
.Python
|
||||
*.so
|
||||
*.egg
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
.env
|
||||
.venv
|
||||
venv/
|
||||
monitor.log
|
||||
206
README.md
206
README.md
@@ -1,3 +1,205 @@
|
||||
# flight-monitor
|
||||
# Flight Price Monitor
|
||||
|
||||
Flight price monitor — alerts when BER→EZE drops below threshold
|
||||
Daily flight price monitor that checks configured routes and alerts when prices drop below threshold.
|
||||
|
||||
## Features
|
||||
|
||||
- ✈️ Monitor multiple flight routes (BER→EZE, etc.)
|
||||
- 🔔 OpenClaw alerts when prices drop below threshold
|
||||
- 📊 Extensible adapter pattern for multiple price sources
|
||||
- 📝 Full logging of all checks (hits and misses)
|
||||
- 🔧 Cron-friendly one-shot execution
|
||||
|
||||
## Setup
|
||||
|
||||
### 1. Install Dependencies
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### 2. Get a Kiwi API Key
|
||||
|
||||
1. Sign up at https://tequila.kiwi.com/portal/login
|
||||
2. Create an API key
|
||||
3. Set environment variable:
|
||||
|
||||
```bash
|
||||
export KIWI_API_KEY="your-api-key-here"
|
||||
```
|
||||
|
||||
Or create a `.env` file:
|
||||
|
||||
```bash
|
||||
KIWI_API_KEY=your-api-key-here
|
||||
```
|
||||
|
||||
### 3. Configure Routes
|
||||
|
||||
Edit `config.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"routes": [
|
||||
{
|
||||
"origin": "BER",
|
||||
"destination": "EZE",
|
||||
"trip_type": "round_trip",
|
||||
"threshold_eur": 700,
|
||||
"duration_weeks": [2, 3, 4],
|
||||
"preferred_months": [12, 1, 2, 3],
|
||||
"monitor_year_round": true
|
||||
}
|
||||
],
|
||||
"check_interval": "daily"
|
||||
}
|
||||
```
|
||||
|
||||
**Config Fields:**
|
||||
|
||||
- `origin`, `destination`: IATA airport codes
|
||||
- `threshold_eur`: Alert if price drops below this (in EUR)
|
||||
- `duration_weeks`: Trip lengths to search (in weeks)
|
||||
- `preferred_months`: Months to prioritize (1-12)
|
||||
- `monitor_year_round`: `true` to search all months, `false` to only search `preferred_months`
|
||||
|
||||
You can add multiple routes — the monitor will check them all.
|
||||
|
||||
### 4. Run
|
||||
|
||||
```bash
|
||||
python monitor.py
|
||||
```
|
||||
|
||||
Logs are written to `monitor.log` and stdout.
|
||||
|
||||
## Scheduling (Cron)
|
||||
|
||||
To run daily at 6 AM:
|
||||
|
||||
```bash
|
||||
crontab -e
|
||||
```
|
||||
|
||||
Add:
|
||||
|
||||
```
|
||||
0 6 * * * cd /path/to/flight-monitor && /usr/bin/python3 monitor.py
|
||||
```
|
||||
|
||||
Or use OpenClaw cron (recommended):
|
||||
|
||||
```bash
|
||||
openclaw cron add \
|
||||
--schedule '0 6 * * *' \
|
||||
--command 'cd /path/to/flight-monitor && python monitor.py' \
|
||||
--name 'Flight Price Monitor'
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
### Adapter Pattern
|
||||
|
||||
The monitor uses an adapter pattern to support multiple price sources:
|
||||
|
||||
```
|
||||
adapters/
|
||||
base.py # Abstract FlightAdapter interface
|
||||
kiwi.py # Kiwi/Tequila API implementation
|
||||
```
|
||||
|
||||
Each adapter implements:
|
||||
|
||||
- `search_round_trip()` — returns normalized results
|
||||
- `get_name()` — adapter name
|
||||
|
||||
All results follow this schema:
|
||||
|
||||
```python
|
||||
{
|
||||
"price_eur": float,
|
||||
"airline": str,
|
||||
"departure": datetime,
|
||||
"return": datetime,
|
||||
"link": str, # Booking URL
|
||||
"source": str # Adapter name
|
||||
}
|
||||
```
|
||||
|
||||
### Adding a New Adapter
|
||||
|
||||
1. Create `adapters/my_adapter.py`:
|
||||
|
||||
```python
|
||||
from adapters.base import FlightAdapter
|
||||
|
||||
class MyAdapter(FlightAdapter):
|
||||
def get_name(self) -> str:
|
||||
return "MySource"
|
||||
|
||||
def search_round_trip(self, origin, destination, departure_date, return_date, **kwargs):
|
||||
# Your implementation here
|
||||
# Must return list of normalized results (see schema above)
|
||||
pass
|
||||
```
|
||||
|
||||
2. Register in `adapters/__init__.py`:
|
||||
|
||||
```python
|
||||
from adapters.my_adapter import MyAdapter
|
||||
__all__ = [..., "MyAdapter"]
|
||||
```
|
||||
|
||||
3. Update `monitor.py` to use your adapter:
|
||||
|
||||
```python
|
||||
adapter = MyAdapter()
|
||||
```
|
||||
|
||||
You can also make the adapter configurable via `config.json` if needed.
|
||||
|
||||
## Alerting
|
||||
|
||||
When a price drop is detected, the monitor sends an alert via OpenClaw:
|
||||
|
||||
```bash
|
||||
openclaw system event --text "✈️ BER→EZE €650 on 2026-04-15 → 2026-04-29 (Lufthansa) — below €700! https://..." --mode now
|
||||
```
|
||||
|
||||
This routes the alert to your configured OpenClaw channels (Telegram, Discord, Signal, etc.).
|
||||
|
||||
## Logs
|
||||
|
||||
All checks are logged to `monitor.log` with timestamps:
|
||||
|
||||
```
|
||||
2026-03-21 18:00:00 [INFO] === Flight Monitor Starting ===
|
||||
2026-03-21 18:00:00 [INFO] Checking route BER→EZE (threshold: €700)
|
||||
2026-03-21 18:00:01 [INFO] 2026-04-15 → 2026-04-29: €850 (Lufthansa)
|
||||
2026-03-21 18:00:02 [INFO] 2026-05-01 → 2026-05-15: €620 (Air France)
|
||||
2026-03-21 18:00:02 [INFO] Alert sent via OpenClaw
|
||||
2026-03-21 18:00:03 [INFO] === Flight Monitor Complete ===
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**"KIWI_API_KEY environment variable not set"**
|
||||
|
||||
- Set the `KIWI_API_KEY` env var (see Setup step 2)
|
||||
- The monitor will exit gracefully (not crash) if the key is missing
|
||||
|
||||
**No results returned**
|
||||
|
||||
- Check `monitor.log` for API errors
|
||||
- Verify your IATA codes are correct (e.g., "BER" not "Berlin")
|
||||
- Try a broader date range (increase `duration_weeks` or `preferred_months`)
|
||||
|
||||
**OpenClaw alert not sent**
|
||||
|
||||
- Verify `openclaw` CLI is in PATH
|
||||
- Check OpenClaw is running: `openclaw status`
|
||||
- Look for error messages in `monitor.log`
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
|
||||
7
adapters/__init__.py
Normal file
7
adapters/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""
|
||||
Flight price adapters package.
|
||||
"""
|
||||
from adapters.base import FlightAdapter
|
||||
from adapters.kiwi import KiwiAdapter
|
||||
|
||||
__all__ = ["FlightAdapter", "KiwiAdapter"]
|
||||
BIN
adapters/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
adapters/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/base.cpython-312.pyc
Normal file
BIN
adapters/__pycache__/base.cpython-312.pyc
Normal file
Binary file not shown.
BIN
adapters/__pycache__/kiwi.cpython-312.pyc
Normal file
BIN
adapters/__pycache__/kiwi.cpython-312.pyc
Normal file
Binary file not shown.
54
adapters/base.py
Normal file
54
adapters/base.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Base adapter interface for flight price sources.
|
||||
"""
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import date
|
||||
from typing import List, Dict, Any
|
||||
|
||||
|
||||
class FlightAdapter(ABC):
|
||||
"""
|
||||
Abstract base class for flight price adapters.
|
||||
|
||||
Each adapter implements a specific price source (Kiwi, Google Flights, etc.)
|
||||
and returns results in a normalized format.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def search_round_trip(
|
||||
self,
|
||||
origin: str,
|
||||
destination: str,
|
||||
departure_date: date,
|
||||
return_date: date,
|
||||
**kwargs
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Search for round-trip flights.
|
||||
|
||||
Args:
|
||||
origin: IATA code of origin airport (e.g., "BER")
|
||||
destination: IATA code of destination airport (e.g., "EZE")
|
||||
departure_date: Departure date
|
||||
return_date: Return date
|
||||
**kwargs: Adapter-specific parameters
|
||||
|
||||
Returns:
|
||||
List of results, each with:
|
||||
{
|
||||
"price_eur": float,
|
||||
"airline": str,
|
||||
"departure": datetime,
|
||||
"return": datetime,
|
||||
"link": str (booking URL),
|
||||
"source": str (adapter name)
|
||||
}
|
||||
|
||||
Returns empty list if no results found.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str:
|
||||
"""Return the name of this adapter."""
|
||||
pass
|
||||
120
adapters/kiwi.py
Normal file
120
adapters/kiwi.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
Kiwi.com / Tequila API adapter.
|
||||
"""
|
||||
import os
|
||||
import logging
|
||||
from datetime import date, datetime
|
||||
from typing import List, Dict, Any
|
||||
import requests
|
||||
|
||||
from adapters.base import FlightAdapter
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KiwiAdapter(FlightAdapter):
|
||||
"""
|
||||
Adapter for Kiwi.com Tequila API.
|
||||
|
||||
API docs: https://tequila.kiwi.com/docs/search/
|
||||
"""
|
||||
|
||||
API_BASE = "https://api.tequila.kiwi.com"
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the adapter. Requires KIWI_API_KEY environment variable."""
|
||||
self.api_key = os.getenv("KIWI_API_KEY")
|
||||
if not self.api_key:
|
||||
raise ValueError(
|
||||
"KIWI_API_KEY environment variable not set. "
|
||||
"Get your API key from https://tequila.kiwi.com/portal/login"
|
||||
)
|
||||
logger.info("Kiwi adapter initialized")
|
||||
|
||||
def get_name(self) -> str:
|
||||
return "Kiwi/Tequila"
|
||||
|
||||
def search_round_trip(
|
||||
self,
|
||||
origin: str,
|
||||
destination: str,
|
||||
departure_date: date,
|
||||
return_date: date,
|
||||
**kwargs
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Search for round-trip flights via Kiwi API.
|
||||
|
||||
Args:
|
||||
origin: IATA code (e.g., "BER")
|
||||
destination: IATA code (e.g., "EZE")
|
||||
departure_date: Departure date
|
||||
return_date: Return date
|
||||
|
||||
Returns:
|
||||
List of normalized flight results.
|
||||
"""
|
||||
url = f"{self.API_BASE}/v2/search"
|
||||
|
||||
params = {
|
||||
"fly_from": origin,
|
||||
"fly_to": destination,
|
||||
"date_from": departure_date.strftime("%d/%m/%Y"),
|
||||
"date_to": departure_date.strftime("%d/%m/%Y"),
|
||||
"return_from": return_date.strftime("%d/%m/%Y"),
|
||||
"return_to": return_date.strftime("%d/%m/%Y"),
|
||||
"flight_type": "round",
|
||||
"adults": 1,
|
||||
"curr": "EUR",
|
||||
"limit": 5,
|
||||
"sort": "price"
|
||||
}
|
||||
|
||||
headers = {
|
||||
"apikey": self.api_key,
|
||||
"accept": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
logger.debug(f"Kiwi API request: {origin}→{destination} {departure_date} → {return_date}")
|
||||
response = requests.get(url, params=params, headers=headers, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
results = []
|
||||
|
||||
for item in data.get("data", []):
|
||||
# Extract airline(s) - may be multiple for multi-leg flights
|
||||
airlines = set()
|
||||
for route in item.get("route", []):
|
||||
airline = route.get("airline")
|
||||
if airline:
|
||||
airlines.add(airline)
|
||||
airline_str = ", ".join(sorted(airlines)) if airlines else "Unknown"
|
||||
|
||||
# Parse timestamps
|
||||
departure_time = datetime.fromtimestamp(item.get("dTime", 0))
|
||||
return_time = datetime.fromtimestamp(item.get("aTime", 0))
|
||||
|
||||
# Build deep link
|
||||
deep_link = item.get("deep_link", "")
|
||||
|
||||
results.append({
|
||||
"price_eur": float(item.get("price", 0)),
|
||||
"airline": airline_str,
|
||||
"departure": departure_time,
|
||||
"return": return_time,
|
||||
"link": deep_link,
|
||||
"source": self.get_name()
|
||||
})
|
||||
|
||||
logger.debug(f"Kiwi API returned {len(results)} results")
|
||||
return results
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Kiwi API request failed: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing Kiwi API response: {e}")
|
||||
return []
|
||||
14
config.json
Normal file
14
config.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"routes": [
|
||||
{
|
||||
"origin": "BER",
|
||||
"destination": "EZE",
|
||||
"trip_type": "round_trip",
|
||||
"threshold_eur": 700,
|
||||
"duration_weeks": [2, 3, 4],
|
||||
"preferred_months": [12, 1, 2, 3],
|
||||
"monitor_year_round": true
|
||||
}
|
||||
],
|
||||
"check_interval": "daily"
|
||||
}
|
||||
3
monitor.log
Normal file
3
monitor.log
Normal file
@@ -0,0 +1,3 @@
|
||||
2026-03-21 23:10:16,010 [INFO] === Flight Monitor Starting ===
|
||||
2026-03-21 23:10:16,011 [ERROR] Adapter initialization failed: KIWI_API_KEY environment variable not set. Get your API key from https://tequila.kiwi.com/portal/login
|
||||
2026-03-21 23:10:16,011 [INFO] Set KIWI_API_KEY environment variable to enable monitoring
|
||||
170
monitor.py
Normal file
170
monitor.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Flight Price Monitor
|
||||
One-shot CLI tool to check configured flight routes and alert on price drops.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from adapters.base import FlightAdapter
|
||||
from adapters.kiwi import KiwiAdapter
|
||||
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s [%(levelname)s] %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('monitor.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
|
||||
"""Load configuration from JSON file."""
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Config file not found: {config_path}")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON in config file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_search_dates(route: Dict[str, Any]) -> List[tuple]:
|
||||
"""Generate search date ranges based on route configuration."""
|
||||
dates = []
|
||||
today = datetime.now().date()
|
||||
|
||||
# Look ahead for the next 6 months
|
||||
for offset_months in range(6):
|
||||
# Calculate target month
|
||||
target_date = today + timedelta(days=offset_months * 30)
|
||||
target_month = target_date.month
|
||||
|
||||
# Check if this month should be searched
|
||||
if route.get("monitor_year_round") or target_month in route.get("preferred_months", []):
|
||||
for weeks in route.get("duration_weeks", [2]):
|
||||
departure = target_date
|
||||
return_date = departure + timedelta(weeks=weeks)
|
||||
dates.append((departure, return_date))
|
||||
|
||||
# Limit to 10 searches to avoid API spam
|
||||
return dates[:10]
|
||||
|
||||
|
||||
def send_openclaw_alert(message: str) -> bool:
|
||||
"""Send alert via OpenClaw system event."""
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
["openclaw", "system", "event", "--text", message, "--mode", "now"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
logger.info("Alert sent via OpenClaw")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"OpenClaw alert failed: {result.stderr}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send OpenClaw alert: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def check_route(adapter: FlightAdapter, route: Dict[str, Any]) -> None:
|
||||
"""Check a single route for price drops."""
|
||||
origin = route["origin"]
|
||||
destination = route["destination"]
|
||||
threshold = route["threshold_eur"]
|
||||
|
||||
logger.info(f"Checking route {origin}→{destination} (threshold: €{threshold})")
|
||||
|
||||
search_dates = get_search_dates(route)
|
||||
if not search_dates:
|
||||
logger.warning(f"No search dates generated for {origin}→{destination}")
|
||||
return
|
||||
|
||||
alerts_sent = 0
|
||||
|
||||
for departure, return_date in search_dates:
|
||||
try:
|
||||
results = adapter.search_round_trip(
|
||||
origin=origin,
|
||||
destination=destination,
|
||||
departure_date=departure,
|
||||
return_date=return_date
|
||||
)
|
||||
|
||||
if not results:
|
||||
logger.info(f" {departure} → {return_date}: No results")
|
||||
continue
|
||||
|
||||
# Check if any result is below threshold
|
||||
for result in results[:3]: # Check top 3 results
|
||||
price = result["price_eur"]
|
||||
airline = result.get("airline", "Unknown")
|
||||
link = result.get("link", "")
|
||||
|
||||
logger.info(f" {departure} → {return_date}: €{price} ({airline})")
|
||||
|
||||
if price < threshold:
|
||||
message = (
|
||||
f"✈️ {origin}→{destination} €{price} on "
|
||||
f"{departure.strftime('%Y-%m-%d')} → {return_date.strftime('%Y-%m-%d')} "
|
||||
f"({airline}) — below €{threshold}! {link}"
|
||||
)
|
||||
send_openclaw_alert(message)
|
||||
alerts_sent += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f" Error searching {departure} → {return_date}: {e}")
|
||||
|
||||
if alerts_sent == 0:
|
||||
logger.info(f"No prices below threshold for {origin}→{destination}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
logger.info("=== Flight Monitor Starting ===")
|
||||
|
||||
# Load configuration
|
||||
config = load_config()
|
||||
|
||||
# Initialize adapter
|
||||
# For now, hardcoded to Kiwi. Could be made configurable.
|
||||
try:
|
||||
adapter = KiwiAdapter()
|
||||
except ValueError as e:
|
||||
logger.error(f"Adapter initialization failed: {e}")
|
||||
logger.info("Set KIWI_API_KEY environment variable to enable monitoring")
|
||||
sys.exit(0) # Graceful exit, not a crash
|
||||
|
||||
# Check each route
|
||||
routes = config.get("routes", [])
|
||||
if not routes:
|
||||
logger.warning("No routes configured")
|
||||
sys.exit(0)
|
||||
|
||||
for route in routes:
|
||||
try:
|
||||
check_route(adapter, route)
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking route: {e}")
|
||||
|
||||
logger.info("=== Flight Monitor Complete ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
requests>=2.31.0
|
||||
python-dotenv>=1.0.0
|
||||
Reference in New Issue
Block a user