amazon_invoice_downloader/main.py

670 lines
26 KiB
Python

import argparse
import json
import os
import re
import smtplib
import time
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from email.message import EmailMessage
from pathlib import Path
from typing import Optional
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
import dateparser
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
APP_DIR = Path.home() / ".amazon_invoice_downloader"
CONFIG_PATH = APP_DIR / "config.json"
STORAGE_STATE_PATH = APP_DIR / "storage_state.json"
DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json"
INVOICE_KEYWORDS = [
"invoice",
"rechnung",
"faktura",
"vat",
"steuer",
"bill",
"beleg",
]
@dataclass
class OrderInvoice:
order_date: date
order_id: str
invoice_links: list[str]
def ensure_app_dir() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
if not CONFIG_PATH.exists():
raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.")
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
def save_config(config: dict) -> None:
ensure_app_dir()
CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8")
def build_context_options(config: dict) -> dict:
locale = config.get("locale", "de-DE")
timezone = config.get("timezone", "Europe/Berlin")
return {
"locale": locale,
"timezone_id": timezone,
"extra_http_headers": {
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
},
}
def strtobool(value: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def get_notification_settings(config: dict) -> dict:
smtp_cfg = config.get("smtp", {})
return {
"recipient": os.getenv("NOTIFY_EMAIL", config.get("notify_email", "stefan.heyn@googlemail.com")),
"smtp_host": os.getenv("SMTP_HOST", smtp_cfg.get("host", "")),
"smtp_port": int(os.getenv("SMTP_PORT", str(smtp_cfg.get("port", 587)))),
"smtp_user": os.getenv("SMTP_USER", smtp_cfg.get("user", "")),
"smtp_password": os.getenv("SMTP_PASSWORD", smtp_cfg.get("password", "")),
"smtp_from": os.getenv("SMTP_FROM", smtp_cfg.get("from_addr", "")),
"smtp_starttls": strtobool(os.getenv("SMTP_STARTTLS", str(smtp_cfg.get("starttls", True)))),
"smtp_ssl": strtobool(os.getenv("SMTP_SSL", str(smtp_cfg.get("ssl", False)))),
}
def send_notification(config: dict, subject: str, body: str) -> None:
settings = get_notification_settings(config)
recipient = settings["recipient"]
host = settings["smtp_host"]
if not host or not recipient:
return
sender = settings["smtp_from"] or settings["smtp_user"] or recipient
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = recipient
msg.set_content(body)
smtp_cls = smtplib.SMTP_SSL if settings["smtp_ssl"] else smtplib.SMTP
with smtp_cls(host, settings["smtp_port"], timeout=20) as smtp:
if not settings["smtp_ssl"] and settings["smtp_starttls"]:
smtp.starttls()
if settings["smtp_user"]:
smtp.login(settings["smtp_user"], settings["smtp_password"])
smtp.send_message(msg)
def is_login_page(page) -> bool:
url = page.url.lower()
if any(part in url for part in ["/ap/signin", "/signin", "openid.oa"]):
return True
email_fields = page.locator('input[type="email"], input[name="email"], #ap_email')
return email_fields.count() > 0
def parse_iso_date(value: str) -> date:
try:
return datetime.strptime(value, "%Y-%m-%d").date()
except ValueError as exc:
raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc
def parse_date_from_text(text: str) -> Optional[date]:
patterns = [
r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b",
r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b",
r"\b\d{1,2}/\d{1,2}/\d{4}\b",
r"\b\d{4}-\d{2}-\d{2}\b",
]
for pattern in patterns:
m = re.search(pattern, text)
if not m:
continue
parsed = dateparser.parse(
m.group(0),
languages=["de", "en"],
settings={"DATE_ORDER": "DMY"},
)
if parsed:
return parsed.date()
return None
def parse_order_date_from_text(text: str) -> Optional[date]:
focused_patterns = [
r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})",
r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})",
r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})",
]
for pattern in focused_patterns:
m = re.search(pattern, text, flags=re.IGNORECASE)
if not m:
continue
parsed = dateparser.parse(
m.group(2),
languages=["de", "en"],
settings={"DATE_ORDER": "DMY"},
)
if parsed:
return parsed.date()
return parse_date_from_text(text)
def parse_order_id_from_text(text: str) -> Optional[str]:
m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text)
if m:
return m.group(0)
return None
def years_for_range(start_date: date, end_date: date) -> list[int]:
return list(range(end_date.year, start_date.year - 1, -1))
def with_amazon_language(url: str, amazon_language: str) -> str:
parsed = urlparse(url)
query = dict(parse_qsl(parsed.query, keep_blank_values=True))
query["language"] = amazon_language
return urlunparse(parsed._replace(query=urlencode(query)))
def build_orders_url(marketplace: str, year: int, amazon_language: str) -> str:
base = f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}"
return with_amazon_language(base, amazon_language)
def text_contains_invoice_hint(text: str) -> bool:
t = text.lower()
return any(k in t for k in INVOICE_KEYWORDS)
def collect_order_detail_links(page, base_url: str) -> list[str]:
links = page.locator("a")
out: list[str] = []
seen = set()
for i in range(links.count()):
link = links.nth(i)
href = link.get_attribute("href")
if not href:
continue
lower = href.lower()
if (
"order-details" not in lower
and "orderid=" not in lower
and "/your-orders/order-details" not in lower
):
continue
absolute = urljoin(base_url, href)
if absolute in seen:
continue
out.append(absolute)
seen.add(absolute)
return out
def find_next_page_url(page, base_url: str) -> Optional[str]:
a_last = page.locator("ul.a-pagination li.a-last a").first
if a_last.count() > 0:
href = a_last.get_attribute("href")
if href:
return urljoin(base_url, href)
fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first
if fallback.count() > 0:
href = fallback.get_attribute("href")
if href:
return urljoin(base_url, href)
return None
def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]:
links = scope.locator("a")
out: list[str] = []
seen = set()
for i in range(links.count()):
link = links.nth(i)
text = (link.inner_text(timeout=1000) or "").strip()
href = link.get_attribute("href")
if not href:
continue
absolute = urljoin(base_url, href)
if absolute in seen:
continue
lower_href = absolute.lower()
if (
text_contains_invoice_hint(text)
or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"])
):
out.append(absolute)
seen.add(absolute)
return out
def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]:
cards = page.locator("div.order-card")
results: list[OrderInvoice] = []
for i in range(cards.count()):
card = cards.nth(i)
card_text = card.inner_text(timeout=2000)
order_date = parse_order_date_from_text(card_text)
if order_date is None:
if debug:
print(f"[debug] order-card ohne Datum (index={i + 1})")
continue
order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}"
invoice_links = extract_invoice_links_from_scope(card, base_url)
if not invoice_links:
continue
results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links))
return results
def extract_invoice_candidates_from_detail(
context,
detail_url: str,
base_url: str,
amazon_language: str,
debug: bool = False,
) -> Optional[OrderInvoice]:
detail_page = context.new_page()
try:
detail_page.goto(
with_amazon_language(detail_url, amazon_language),
wait_until="domcontentloaded",
timeout=15000,
)
detail_page.wait_for_timeout(1200)
body_text = detail_page.inner_text("body", timeout=4000)
order_date = parse_order_date_from_text(body_text)
if order_date is None:
if debug:
print(f"[debug] Kein Datum in Detailseite: {detail_url}")
return None
invoice_links = extract_invoice_links_from_scope(detail_page, base_url)
if debug:
print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)")
if not invoice_links:
return None
order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}"
return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)
except PlaywrightTimeoutError:
if debug:
print(f"[debug] Timeout bei Detailseite: {detail_url}")
return None
finally:
detail_page.close()
def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]:
pdfs = set()
for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE):
href = m.group(1)
absolute = urljoin(source_url, href)
low = absolute.lower()
if ".pdf" in low or "download" in low:
pdfs.add(absolute)
return list(pdfs)
def write_debug_json(debug_json_path: Path, payload: dict) -> None:
debug_json_path.parent.mkdir(parents=True, exist_ok=True)
debug_json_path.write_text(
json.dumps(payload, indent=2, ensure_ascii=False),
encoding="utf-8",
)
def looks_like_pdf(content: bytes) -> bool:
return content.startswith(b"%PDF-")
def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path:
safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id)
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
out = download_dir / filename
while out.exists():
sequence_no += 1
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
out = download_dir / filename
return out
def configure(args) -> None:
config = {
"marketplace": args.marketplace,
"download_dir": str(Path(args.download_dir).expanduser().resolve()),
"headless": args.headless,
"locale": args.locale,
"timezone": args.timezone,
"currency": args.currency,
"amazon_language": args.amazon_language,
"notify_email": args.notify_email,
"smtp": {
"host": args.smtp_host,
"port": args.smtp_port,
"user": args.smtp_user,
"password": args.smtp_password,
"from_addr": args.smtp_from,
"starttls": not args.smtp_no_starttls,
"ssl": args.smtp_ssl,
},
}
save_config(config)
ensure_app_dir()
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(**build_context_options(config))
page = context.new_page()
page.goto(
with_amazon_language(
f"https://www.amazon.{args.marketplace}/your-orders/orders",
args.amazon_language,
),
wait_until="domcontentloaded",
)
print("Bitte im Browser bei Amazon einloggen.")
if args.login_wait_seconds > 0:
print(
f"Warte {args.login_wait_seconds} Sekunden auf Login, "
"danach wird die Session automatisch gespeichert."
)
time.sleep(args.login_wait_seconds)
else:
print("Wenn Bestellungen sichtbar sind, Enter druecken.")
input()
context.storage_state(path=str(STORAGE_STATE_PATH))
browser.close()
print(f"Konfiguration gespeichert: {CONFIG_PATH}")
print(f"Session gespeichert: {STORAGE_STATE_PATH}")
def download(args) -> None:
config = load_config()
if not STORAGE_STATE_PATH.exists():
raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.")
if args.yesterday:
yesterday = date.today() - timedelta(days=1)
start_date = yesterday
end_date = yesterday
else:
if not args.date_from or not args.date_to:
raise SystemExit("Bitte entweder --yesterday oder --from und --to angeben.")
start_date = parse_iso_date(args.date_from)
end_date = parse_iso_date(args.date_to)
if start_date > end_date:
raise SystemExit("'from' muss kleiner/gleich 'to' sein.")
marketplace = config["marketplace"]
amazon_language = config.get("amazon_language", "de_DE")
download_dir = Path(args.output or config["download_dir"]).expanduser().resolve()
download_dir.mkdir(parents=True, exist_ok=True)
debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None)
recipient = get_notification_settings(config).get("recipient", "")
with sync_playwright() as p:
browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True)))
context_options = build_context_options(config)
context_options["storage_state"] = str(STORAGE_STATE_PATH)
context = browser.new_context(**context_options)
page = context.new_page()
base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders"
invoices: list[OrderInvoice] = []
seen_invoice_urls = set()
years = years_for_range(start_date, end_date)
debug_payload = {
"requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()},
"years": years,
"pages": [],
"totals": {
"order_cards": 0,
"candidates_with_date": 0,
"candidates_in_range": 0,
},
}
for year in years:
filtered_url = build_orders_url(marketplace, year, amazon_language)
if args.debug:
print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}")
page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000)
if is_login_page(page):
msg = (
"Amazon-Session ist abgelaufen oder Login wurde angefordert.\n"
f"URL: {page.url}\n"
f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n"
"Bitte 'configure' erneut ausfuehren."
)
try:
send_notification(
config,
subject="Amazon Invoice Downloader: Session abgelaufen",
body=msg,
)
except Exception as notify_exc:
print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}")
raise SystemExit(
"Session abgelaufen. Bitte 'configure' erneut ausfuehren."
+ (f" Benachrichtigung an {recipient} gesendet." if recipient else "")
)
visited_page_urls = set()
for page_idx in range(args.max_pages):
if page.url in visited_page_urls:
if args.debug:
print(f"[debug] Abbruch wegen wiederholter URL: {page.url}")
break
visited_page_urls.add(page.url)
page.wait_for_timeout(1500)
page_cards = page.locator("div.order-card")
card_count = page_cards.count()
overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug)
debug_payload["pages"].append(
{
"year": year,
"page": page_idx + 1,
"url": page.url,
"order_cards": card_count,
"overview_candidates": len(overview_candidates),
}
)
debug_payload["totals"]["order_cards"] += card_count
if args.debug:
print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)")
for candidate in overview_candidates:
debug_payload["totals"]["candidates_with_date"] += 1
if start_date <= candidate.order_date <= end_date:
filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls]
if not filtered_links:
continue
seen_invoice_urls.update(filtered_links)
invoices.append(
OrderInvoice(
order_date=candidate.order_date,
order_id=candidate.order_id,
invoice_links=filtered_links,
)
)
debug_payload["totals"]["candidates_in_range"] += 1
if args.debug:
print(
f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)"
)
next_page_url = find_next_page_url(page, base_orders_url)
if not next_page_url:
break
try:
page.goto(
with_amazon_language(next_page_url, amazon_language),
wait_until="domcontentloaded",
timeout=15000,
)
if is_login_page(page):
msg = (
"Amazon-Session ist waehrend der Pagination abgelaufen.\n"
f"URL: {page.url}\n"
f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n"
"Bitte 'configure' erneut ausfuehren."
)
try:
send_notification(
config,
subject="Amazon Invoice Downloader: Session waehrend Download abgelaufen",
body=msg,
)
except Exception as notify_exc:
print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}")
raise SystemExit(
"Session abgelaufen. Bitte 'configure' erneut ausfuehren."
+ (f" Benachrichtigung an {recipient} gesendet." if recipient else "")
)
except PlaywrightTimeoutError:
break
downloaded = 0
seen_saved_urls = set()
order_file_counters: dict[str, int] = {}
for idx, order in enumerate(invoices, start=1):
for link_idx, invoice_url in enumerate(order.invoice_links, start=1):
if invoice_url in seen_saved_urls:
continue
try:
response = context.request.get(
invoice_url,
headers={"referer": page.url},
timeout=20000,
)
content_type = (response.headers.get("content-type", "") or "").lower()
body = response.body()
if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body):
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
out.write_bytes(body)
downloaded += 1
seen_saved_urls.add(invoice_url)
print(f"Gespeichert: {out}")
continue
if "html" in content_type:
nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url)
if args.debug:
print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)")
for pdf_link in nested_pdf_links:
if pdf_link in seen_saved_urls:
continue
pdf_resp = context.request.get(
pdf_link,
headers={"referer": invoice_url},
timeout=20000,
)
pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower()
pdf_body = pdf_resp.body()
if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body):
continue
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
out.write_bytes(pdf_body)
downloaded += 1
seen_saved_urls.add(pdf_link)
print(f"Gespeichert: {out}")
elif args.debug:
print(
f"[debug] Uebersprungen {invoice_url} "
f"(status={response.status}, content-type={content_type})"
)
except Exception as exc:
if args.debug:
print(f"[debug] Fehler bei {invoice_url}: {exc}")
continue
browser.close()
debug_payload["totals"]["downloaded"] = downloaded
if debug_json_target:
debug_path = Path(debug_json_target).expanduser().resolve()
write_debug_json(debug_path, debug_payload)
print(f"Debug-JSON gespeichert: {debug_path}")
print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen")
sub = parser.add_subparsers(dest="command", required=True)
p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern")
p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk")
p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen")
p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode")
p_config.add_argument("--locale", default="de-DE", help="Browser-Locale, z. B. de-DE")
p_config.add_argument("--timezone", default="Europe/Berlin", help="Zeitzone, z. B. Europe/Berlin")
p_config.add_argument("--currency", default="EUR", help="Waehrungshinweis fuer Konfiguration")
p_config.add_argument("--amazon-language", default="de_DE", help="Amazon URL-Sprache, z. B. de_DE")
p_config.add_argument("--notify-email", default="stefan.heyn@googlemail.com", help="Empfaenger fuer Ablauf-Benachrichtigungen")
p_config.add_argument("--smtp-host", default="", help="SMTP-Server, z. B. smtp.gmail.com")
p_config.add_argument("--smtp-port", type=int, default=587, help="SMTP-Port, Standard 587")
p_config.add_argument("--smtp-user", default="", help="SMTP-Benutzer")
p_config.add_argument("--smtp-password", default="", help="SMTP-Passwort oder App-Passwort")
p_config.add_argument("--smtp-from", default="", help="Absenderadresse (optional)")
p_config.add_argument("--smtp-ssl", action="store_true", help="SMTP ueber SSL (typisch Port 465)")
p_config.add_argument("--smtp-no-starttls", action="store_true", help="STARTTLS deaktivieren")
p_config.add_argument(
"--login-wait-seconds",
type=int,
default=60,
help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).",
)
p_config.set_defaults(func=configure)
p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen")
p_dl.add_argument("--from", dest="date_from", help="Startdatum YYYY-MM-DD")
p_dl.add_argument("--to", dest="date_to", help="Enddatum YYYY-MM-DD")
p_dl.add_argument("--yesterday", action="store_true", help="Laedt nur Rechnungen von gestern")
p_dl.add_argument("--output", help="Optionales Zielverzeichnis")
p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten")
p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None)
p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks")
p_dl.add_argument(
"--debug-json",
nargs="?",
const=str(DEFAULT_DEBUG_JSON_PATH),
default=None,
help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).",
)
p_dl.set_defaults(func=download)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()