import argparse import json import os import re import smtplib import time from dataclasses import dataclass from datetime import date, datetime, timedelta from email.message import EmailMessage from pathlib import Path from typing import Optional from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse import dateparser from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright APP_DIR = Path.home() / ".amazon_invoice_downloader" CONFIG_PATH = APP_DIR / "config.json" STORAGE_STATE_PATH = APP_DIR / "storage_state.json" DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json" INVOICE_KEYWORDS = [ "invoice", "rechnung", "faktura", "vat", "steuer", "bill", "beleg", ] @dataclass class OrderInvoice: order_date: date order_id: str invoice_links: list[str] def ensure_app_dir() -> None: APP_DIR.mkdir(parents=True, exist_ok=True) def load_config() -> dict: if not CONFIG_PATH.exists(): raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.") return json.loads(CONFIG_PATH.read_text(encoding="utf-8")) def save_config(config: dict) -> None: ensure_app_dir() CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8") def build_context_options(config: dict) -> dict: locale = config.get("locale", "de-DE") timezone = config.get("timezone", "Europe/Berlin") return { "locale": locale, "timezone_id": timezone, "extra_http_headers": { "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", }, } def strtobool(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} def get_notification_settings(config: dict) -> dict: smtp_cfg = config.get("smtp", {}) return { "recipient": os.getenv("NOTIFY_EMAIL", config.get("notify_email", "stefan.heyn@googlemail.com")), "smtp_host": os.getenv("SMTP_HOST", smtp_cfg.get("host", "")), "smtp_port": int(os.getenv("SMTP_PORT", str(smtp_cfg.get("port", 587)))), "smtp_user": os.getenv("SMTP_USER", smtp_cfg.get("user", "")), "smtp_password": os.getenv("SMTP_PASSWORD", smtp_cfg.get("password", "")), "smtp_from": os.getenv("SMTP_FROM", smtp_cfg.get("from_addr", "")), "smtp_starttls": strtobool(os.getenv("SMTP_STARTTLS", str(smtp_cfg.get("starttls", True)))), "smtp_ssl": strtobool(os.getenv("SMTP_SSL", str(smtp_cfg.get("ssl", False)))), } def send_notification(config: dict, subject: str, body: str) -> None: settings = get_notification_settings(config) recipient = settings["recipient"] host = settings["smtp_host"] if not host or not recipient: return sender = settings["smtp_from"] or settings["smtp_user"] or recipient msg = EmailMessage() msg["Subject"] = subject msg["From"] = sender msg["To"] = recipient msg.set_content(body) smtp_cls = smtplib.SMTP_SSL if settings["smtp_ssl"] else smtplib.SMTP with smtp_cls(host, settings["smtp_port"], timeout=20) as smtp: if not settings["smtp_ssl"] and settings["smtp_starttls"]: smtp.starttls() if settings["smtp_user"]: smtp.login(settings["smtp_user"], settings["smtp_password"]) smtp.send_message(msg) def is_login_page(page) -> bool: url = page.url.lower() if any(part in url for part in ["/ap/signin", "/signin", "openid.oa"]): return True email_fields = page.locator('input[type="email"], input[name="email"], #ap_email') return email_fields.count() > 0 def parse_iso_date(value: str) -> date: try: return datetime.strptime(value, "%Y-%m-%d").date() except ValueError as exc: raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc def parse_date_from_text(text: str) -> Optional[date]: patterns = [ r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b", r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b", r"\b\d{1,2}/\d{1,2}/\d{4}\b", r"\b\d{4}-\d{2}-\d{2}\b", ] for pattern in patterns: m = re.search(pattern, text) if not m: continue parsed = dateparser.parse( m.group(0), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return None def parse_order_date_from_text(text: str) -> Optional[date]: focused_patterns = [ r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})", ] for pattern in focused_patterns: m = re.search(pattern, text, flags=re.IGNORECASE) if not m: continue parsed = dateparser.parse( m.group(2), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return parse_date_from_text(text) def parse_order_id_from_text(text: str) -> Optional[str]: m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text) if m: return m.group(0) return None def years_for_range(start_date: date, end_date: date) -> list[int]: return list(range(end_date.year, start_date.year - 1, -1)) def with_amazon_language(url: str, amazon_language: str) -> str: parsed = urlparse(url) query = dict(parse_qsl(parsed.query, keep_blank_values=True)) query["language"] = amazon_language return urlunparse(parsed._replace(query=urlencode(query))) def build_orders_url(marketplace: str, year: int, amazon_language: str) -> str: base = f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}" return with_amazon_language(base, amazon_language) def text_contains_invoice_hint(text: str) -> bool: t = text.lower() return any(k in t for k in INVOICE_KEYWORDS) def collect_order_detail_links(page, base_url: str) -> list[str]: links = page.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) href = link.get_attribute("href") if not href: continue lower = href.lower() if ( "order-details" not in lower and "orderid=" not in lower and "/your-orders/order-details" not in lower ): continue absolute = urljoin(base_url, href) if absolute in seen: continue out.append(absolute) seen.add(absolute) return out def find_next_page_url(page, base_url: str) -> Optional[str]: a_last = page.locator("ul.a-pagination li.a-last a").first if a_last.count() > 0: href = a_last.get_attribute("href") if href: return urljoin(base_url, href) fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first if fallback.count() > 0: href = fallback.get_attribute("href") if href: return urljoin(base_url, href) return None def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]: links = scope.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) text = (link.inner_text(timeout=1000) or "").strip() href = link.get_attribute("href") if not href: continue absolute = urljoin(base_url, href) if absolute in seen: continue lower_href = absolute.lower() if ( text_contains_invoice_hint(text) or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"]) ): out.append(absolute) seen.add(absolute) return out def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]: cards = page.locator("div.order-card") results: list[OrderInvoice] = [] for i in range(cards.count()): card = cards.nth(i) card_text = card.inner_text(timeout=2000) order_date = parse_order_date_from_text(card_text) if order_date is None: if debug: print(f"[debug] order-card ohne Datum (index={i + 1})") continue order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}" invoice_links = extract_invoice_links_from_scope(card, base_url) if not invoice_links: continue results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)) return results def extract_invoice_candidates_from_detail( context, detail_url: str, base_url: str, amazon_language: str, debug: bool = False, ) -> Optional[OrderInvoice]: detail_page = context.new_page() try: detail_page.goto( with_amazon_language(detail_url, amazon_language), wait_until="domcontentloaded", timeout=15000, ) detail_page.wait_for_timeout(1200) body_text = detail_page.inner_text("body", timeout=4000) order_date = parse_order_date_from_text(body_text) if order_date is None: if debug: print(f"[debug] Kein Datum in Detailseite: {detail_url}") return None invoice_links = extract_invoice_links_from_scope(detail_page, base_url) if debug: print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)") if not invoice_links: return None order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}" return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links) except PlaywrightTimeoutError: if debug: print(f"[debug] Timeout bei Detailseite: {detail_url}") return None finally: detail_page.close() def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]: pdfs = set() for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE): href = m.group(1) absolute = urljoin(source_url, href) low = absolute.lower() if ".pdf" in low or "download" in low: pdfs.add(absolute) return list(pdfs) def write_debug_json(debug_json_path: Path, payload: dict) -> None: debug_json_path.parent.mkdir(parents=True, exist_ok=True) debug_json_path.write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8", ) def looks_like_pdf(content: bytes) -> bool: return content.startswith(b"%PDF-") def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path: safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id) filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename while out.exists(): sequence_no += 1 filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename return out def configure(args) -> None: config = { "marketplace": args.marketplace, "download_dir": str(Path(args.download_dir).expanduser().resolve()), "headless": args.headless, "locale": args.locale, "timezone": args.timezone, "currency": args.currency, "amazon_language": args.amazon_language, "notify_email": args.notify_email, "smtp": { "host": args.smtp_host, "port": args.smtp_port, "user": args.smtp_user, "password": args.smtp_password, "from_addr": args.smtp_from, "starttls": not args.smtp_no_starttls, "ssl": args.smtp_ssl, }, } save_config(config) ensure_app_dir() with sync_playwright() as p: browser = p.chromium.launch(headless=False) context = browser.new_context(**build_context_options(config)) page = context.new_page() page.goto( with_amazon_language( f"https://www.amazon.{args.marketplace}/your-orders/orders", args.amazon_language, ), wait_until="domcontentloaded", ) print("Bitte im Browser bei Amazon einloggen.") if args.login_wait_seconds > 0: print( f"Warte {args.login_wait_seconds} Sekunden auf Login, " "danach wird die Session automatisch gespeichert." ) time.sleep(args.login_wait_seconds) else: print("Wenn Bestellungen sichtbar sind, Enter druecken.") input() context.storage_state(path=str(STORAGE_STATE_PATH)) browser.close() print(f"Konfiguration gespeichert: {CONFIG_PATH}") print(f"Session gespeichert: {STORAGE_STATE_PATH}") def download(args) -> None: config = load_config() if not STORAGE_STATE_PATH.exists(): raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.") if args.yesterday: yesterday = date.today() - timedelta(days=1) start_date = yesterday end_date = yesterday else: if not args.date_from or not args.date_to: raise SystemExit("Bitte entweder --yesterday oder --from und --to angeben.") start_date = parse_iso_date(args.date_from) end_date = parse_iso_date(args.date_to) if start_date > end_date: raise SystemExit("'from' muss kleiner/gleich 'to' sein.") marketplace = config["marketplace"] amazon_language = config.get("amazon_language", "de_DE") download_dir = Path(args.output or config["download_dir"]).expanduser().resolve() download_dir.mkdir(parents=True, exist_ok=True) debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None) recipient = get_notification_settings(config).get("recipient", "") with sync_playwright() as p: browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True))) context_options = build_context_options(config) context_options["storage_state"] = str(STORAGE_STATE_PATH) context = browser.new_context(**context_options) page = context.new_page() base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders" invoices: list[OrderInvoice] = [] seen_invoice_urls = set() years = years_for_range(start_date, end_date) debug_payload = { "requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()}, "years": years, "pages": [], "totals": { "order_cards": 0, "candidates_with_date": 0, "candidates_in_range": 0, }, } for year in years: filtered_url = build_orders_url(marketplace, year, amazon_language) if args.debug: print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}") page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000) if is_login_page(page): msg = ( "Amazon-Session ist abgelaufen oder Login wurde angefordert.\n" f"URL: {page.url}\n" f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n" "Bitte 'configure' erneut ausfuehren." ) try: send_notification( config, subject="Amazon Invoice Downloader: Session abgelaufen", body=msg, ) except Exception as notify_exc: print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}") raise SystemExit( "Session abgelaufen. Bitte 'configure' erneut ausfuehren." + (f" Benachrichtigung an {recipient} gesendet." if recipient else "") ) visited_page_urls = set() for page_idx in range(args.max_pages): if page.url in visited_page_urls: if args.debug: print(f"[debug] Abbruch wegen wiederholter URL: {page.url}") break visited_page_urls.add(page.url) page.wait_for_timeout(1500) page_cards = page.locator("div.order-card") card_count = page_cards.count() overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug) debug_payload["pages"].append( { "year": year, "page": page_idx + 1, "url": page.url, "order_cards": card_count, "overview_candidates": len(overview_candidates), } ) debug_payload["totals"]["order_cards"] += card_count if args.debug: print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)") for candidate in overview_candidates: debug_payload["totals"]["candidates_with_date"] += 1 if start_date <= candidate.order_date <= end_date: filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls] if not filtered_links: continue seen_invoice_urls.update(filtered_links) invoices.append( OrderInvoice( order_date=candidate.order_date, order_id=candidate.order_id, invoice_links=filtered_links, ) ) debug_payload["totals"]["candidates_in_range"] += 1 if args.debug: print( f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)" ) next_page_url = find_next_page_url(page, base_orders_url) if not next_page_url: break try: page.goto( with_amazon_language(next_page_url, amazon_language), wait_until="domcontentloaded", timeout=15000, ) if is_login_page(page): msg = ( "Amazon-Session ist waehrend der Pagination abgelaufen.\n" f"URL: {page.url}\n" f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n" "Bitte 'configure' erneut ausfuehren." ) try: send_notification( config, subject="Amazon Invoice Downloader: Session waehrend Download abgelaufen", body=msg, ) except Exception as notify_exc: print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}") raise SystemExit( "Session abgelaufen. Bitte 'configure' erneut ausfuehren." + (f" Benachrichtigung an {recipient} gesendet." if recipient else "") ) except PlaywrightTimeoutError: break downloaded = 0 seen_saved_urls = set() order_file_counters: dict[str, int] = {} for idx, order in enumerate(invoices, start=1): for link_idx, invoice_url in enumerate(order.invoice_links, start=1): if invoice_url in seen_saved_urls: continue try: response = context.request.get( invoice_url, headers={"referer": page.url}, timeout=20000, ) content_type = (response.headers.get("content-type", "") or "").lower() body = response.body() if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body): order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(body) downloaded += 1 seen_saved_urls.add(invoice_url) print(f"Gespeichert: {out}") continue if "html" in content_type: nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url) if args.debug: print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)") for pdf_link in nested_pdf_links: if pdf_link in seen_saved_urls: continue pdf_resp = context.request.get( pdf_link, headers={"referer": invoice_url}, timeout=20000, ) pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower() pdf_body = pdf_resp.body() if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body): continue order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(pdf_body) downloaded += 1 seen_saved_urls.add(pdf_link) print(f"Gespeichert: {out}") elif args.debug: print( f"[debug] Uebersprungen {invoice_url} " f"(status={response.status}, content-type={content_type})" ) except Exception as exc: if args.debug: print(f"[debug] Fehler bei {invoice_url}: {exc}") continue browser.close() debug_payload["totals"]["downloaded"] = downloaded if debug_json_target: debug_path = Path(debug_json_target).expanduser().resolve() write_debug_json(debug_path, debug_payload) print(f"Debug-JSON gespeichert: {debug_path}") print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen") sub = parser.add_subparsers(dest="command", required=True) p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern") p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk") p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen") p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode") p_config.add_argument("--locale", default="de-DE", help="Browser-Locale, z. B. de-DE") p_config.add_argument("--timezone", default="Europe/Berlin", help="Zeitzone, z. B. Europe/Berlin") p_config.add_argument("--currency", default="EUR", help="Waehrungshinweis fuer Konfiguration") p_config.add_argument("--amazon-language", default="de_DE", help="Amazon URL-Sprache, z. B. de_DE") p_config.add_argument("--notify-email", default="stefan.heyn@googlemail.com", help="Empfaenger fuer Ablauf-Benachrichtigungen") p_config.add_argument("--smtp-host", default="", help="SMTP-Server, z. B. smtp.gmail.com") p_config.add_argument("--smtp-port", type=int, default=587, help="SMTP-Port, Standard 587") p_config.add_argument("--smtp-user", default="", help="SMTP-Benutzer") p_config.add_argument("--smtp-password", default="", help="SMTP-Passwort oder App-Passwort") p_config.add_argument("--smtp-from", default="", help="Absenderadresse (optional)") p_config.add_argument("--smtp-ssl", action="store_true", help="SMTP ueber SSL (typisch Port 465)") p_config.add_argument("--smtp-no-starttls", action="store_true", help="STARTTLS deaktivieren") p_config.add_argument( "--login-wait-seconds", type=int, default=60, help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).", ) p_config.set_defaults(func=configure) p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen") p_dl.add_argument("--from", dest="date_from", help="Startdatum YYYY-MM-DD") p_dl.add_argument("--to", dest="date_to", help="Enddatum YYYY-MM-DD") p_dl.add_argument("--yesterday", action="store_true", help="Laedt nur Rechnungen von gestern") p_dl.add_argument("--output", help="Optionales Zielverzeichnis") p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten") p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None) p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks") p_dl.add_argument( "--debug-json", nargs="?", const=str(DEFAULT_DEBUG_JSON_PATH), default=None, help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).", ) p_dl.set_defaults(func=download) return parser def main() -> None: parser = build_parser() args = parser.parse_args() args.func(args) if __name__ == "__main__": main()