import argparse import json import re import time from dataclasses import dataclass from datetime import date, datetime from pathlib import Path from typing import Optional from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse import dateparser from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright APP_DIR = Path.home() / ".amazon_invoice_downloader" CONFIG_PATH = APP_DIR / "config.json" STORAGE_STATE_PATH = APP_DIR / "storage_state.json" DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json" INVOICE_KEYWORDS = [ "invoice", "rechnung", "faktura", "vat", "steuer", "bill", "beleg", ] @dataclass class OrderInvoice: order_date: date order_id: str invoice_links: list[str] def ensure_app_dir() -> None: APP_DIR.mkdir(parents=True, exist_ok=True) def load_config() -> dict: if not CONFIG_PATH.exists(): raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.") return json.loads(CONFIG_PATH.read_text(encoding="utf-8")) def save_config(config: dict) -> None: ensure_app_dir() CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8") def build_context_options(config: dict) -> dict: locale = config.get("locale", "de-DE") timezone = config.get("timezone", "Europe/Berlin") return { "locale": locale, "timezone_id": timezone, "extra_http_headers": { "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", }, } def parse_iso_date(value: str) -> date: try: return datetime.strptime(value, "%Y-%m-%d").date() except ValueError as exc: raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc def parse_date_from_text(text: str) -> Optional[date]: patterns = [ r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b", r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b", r"\b\d{1,2}/\d{1,2}/\d{4}\b", r"\b\d{4}-\d{2}-\d{2}\b", ] for pattern in patterns: m = re.search(pattern, text) if not m: continue parsed = dateparser.parse( m.group(0), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return None def parse_order_date_from_text(text: str) -> Optional[date]: focused_patterns = [ r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})", ] for pattern in focused_patterns: m = re.search(pattern, text, flags=re.IGNORECASE) if not m: continue parsed = dateparser.parse( m.group(2), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return parse_date_from_text(text) def parse_order_id_from_text(text: str) -> Optional[str]: m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text) if m: return m.group(0) return None def years_for_range(start_date: date, end_date: date) -> list[int]: return list(range(end_date.year, start_date.year - 1, -1)) def with_amazon_language(url: str, amazon_language: str) -> str: parsed = urlparse(url) query = dict(parse_qsl(parsed.query, keep_blank_values=True)) query["language"] = amazon_language return urlunparse(parsed._replace(query=urlencode(query))) def build_orders_url(marketplace: str, year: int, amazon_language: str) -> str: base = f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}" return with_amazon_language(base, amazon_language) def text_contains_invoice_hint(text: str) -> bool: t = text.lower() return any(k in t for k in INVOICE_KEYWORDS) def collect_order_detail_links(page, base_url: str) -> list[str]: links = page.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) href = link.get_attribute("href") if not href: continue lower = href.lower() if ( "order-details" not in lower and "orderid=" not in lower and "/your-orders/order-details" not in lower ): continue absolute = urljoin(base_url, href) if absolute in seen: continue out.append(absolute) seen.add(absolute) return out def find_next_page_url(page, base_url: str) -> Optional[str]: a_last = page.locator("ul.a-pagination li.a-last a").first if a_last.count() > 0: href = a_last.get_attribute("href") if href: return urljoin(base_url, href) fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first if fallback.count() > 0: href = fallback.get_attribute("href") if href: return urljoin(base_url, href) return None def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]: links = scope.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) text = (link.inner_text(timeout=1000) or "").strip() href = link.get_attribute("href") if not href: continue absolute = urljoin(base_url, href) if absolute in seen: continue lower_href = absolute.lower() if ( text_contains_invoice_hint(text) or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"]) ): out.append(absolute) seen.add(absolute) return out def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]: cards = page.locator("div.order-card") results: list[OrderInvoice] = [] for i in range(cards.count()): card = cards.nth(i) card_text = card.inner_text(timeout=2000) order_date = parse_order_date_from_text(card_text) if order_date is None: if debug: print(f"[debug] order-card ohne Datum (index={i + 1})") continue order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}" invoice_links = extract_invoice_links_from_scope(card, base_url) if not invoice_links: continue results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)) return results def extract_invoice_candidates_from_detail( context, detail_url: str, base_url: str, amazon_language: str, debug: bool = False, ) -> Optional[OrderInvoice]: detail_page = context.new_page() try: detail_page.goto( with_amazon_language(detail_url, amazon_language), wait_until="domcontentloaded", timeout=15000, ) detail_page.wait_for_timeout(1200) body_text = detail_page.inner_text("body", timeout=4000) order_date = parse_order_date_from_text(body_text) if order_date is None: if debug: print(f"[debug] Kein Datum in Detailseite: {detail_url}") return None invoice_links = extract_invoice_links_from_scope(detail_page, base_url) if debug: print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)") if not invoice_links: return None order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}" return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links) except PlaywrightTimeoutError: if debug: print(f"[debug] Timeout bei Detailseite: {detail_url}") return None finally: detail_page.close() def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]: pdfs = set() for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE): href = m.group(1) absolute = urljoin(source_url, href) low = absolute.lower() if ".pdf" in low or "download" in low: pdfs.add(absolute) return list(pdfs) def write_debug_json(debug_json_path: Path, payload: dict) -> None: debug_json_path.parent.mkdir(parents=True, exist_ok=True) debug_json_path.write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8", ) def looks_like_pdf(content: bytes) -> bool: return content.startswith(b"%PDF-") def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path: safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id) filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename while out.exists(): sequence_no += 1 filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename return out def configure(args) -> None: config = { "marketplace": args.marketplace, "download_dir": str(Path(args.download_dir).expanduser().resolve()), "headless": args.headless, "locale": args.locale, "timezone": args.timezone, "currency": args.currency, "amazon_language": args.amazon_language, } save_config(config) ensure_app_dir() with sync_playwright() as p: browser = p.chromium.launch(headless=False) context = browser.new_context(**build_context_options(config)) page = context.new_page() page.goto( with_amazon_language( f"https://www.amazon.{args.marketplace}/your-orders/orders", args.amazon_language, ), wait_until="domcontentloaded", ) print("Bitte im Browser bei Amazon einloggen.") if args.login_wait_seconds > 0: print( f"Warte {args.login_wait_seconds} Sekunden auf Login, " "danach wird die Session automatisch gespeichert." ) time.sleep(args.login_wait_seconds) else: print("Wenn Bestellungen sichtbar sind, Enter druecken.") input() context.storage_state(path=str(STORAGE_STATE_PATH)) browser.close() print(f"Konfiguration gespeichert: {CONFIG_PATH}") print(f"Session gespeichert: {STORAGE_STATE_PATH}") def download(args) -> None: config = load_config() if not STORAGE_STATE_PATH.exists(): raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.") start_date = parse_iso_date(args.date_from) end_date = parse_iso_date(args.date_to) if start_date > end_date: raise SystemExit("'from' muss kleiner/gleich 'to' sein.") marketplace = config["marketplace"] amazon_language = config.get("amazon_language", "de_DE") download_dir = Path(args.output or config["download_dir"]).expanduser().resolve() download_dir.mkdir(parents=True, exist_ok=True) debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None) with sync_playwright() as p: browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True))) context_options = build_context_options(config) context_options["storage_state"] = str(STORAGE_STATE_PATH) context = browser.new_context(**context_options) page = context.new_page() base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders" invoices: list[OrderInvoice] = [] seen_invoice_urls = set() years = years_for_range(start_date, end_date) debug_payload = { "requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()}, "years": years, "pages": [], "totals": { "order_cards": 0, "candidates_with_date": 0, "candidates_in_range": 0, }, } for year in years: filtered_url = build_orders_url(marketplace, year, amazon_language) if args.debug: print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}") page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000) visited_page_urls = set() for page_idx in range(args.max_pages): if page.url in visited_page_urls: if args.debug: print(f"[debug] Abbruch wegen wiederholter URL: {page.url}") break visited_page_urls.add(page.url) page.wait_for_timeout(1500) page_cards = page.locator("div.order-card") card_count = page_cards.count() overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug) debug_payload["pages"].append( { "year": year, "page": page_idx + 1, "url": page.url, "order_cards": card_count, "overview_candidates": len(overview_candidates), } ) debug_payload["totals"]["order_cards"] += card_count if args.debug: print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)") for candidate in overview_candidates: debug_payload["totals"]["candidates_with_date"] += 1 if start_date <= candidate.order_date <= end_date: filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls] if not filtered_links: continue seen_invoice_urls.update(filtered_links) invoices.append( OrderInvoice( order_date=candidate.order_date, order_id=candidate.order_id, invoice_links=filtered_links, ) ) debug_payload["totals"]["candidates_in_range"] += 1 if args.debug: print( f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)" ) next_page_url = find_next_page_url(page, base_orders_url) if not next_page_url: break try: page.goto( with_amazon_language(next_page_url, amazon_language), wait_until="domcontentloaded", timeout=15000, ) except PlaywrightTimeoutError: break downloaded = 0 seen_saved_urls = set() order_file_counters: dict[str, int] = {} for idx, order in enumerate(invoices, start=1): for link_idx, invoice_url in enumerate(order.invoice_links, start=1): if invoice_url in seen_saved_urls: continue try: response = context.request.get( invoice_url, headers={"referer": page.url}, timeout=20000, ) content_type = (response.headers.get("content-type", "") or "").lower() body = response.body() if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body): order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(body) downloaded += 1 seen_saved_urls.add(invoice_url) print(f"Gespeichert: {out}") continue if "html" in content_type: nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url) if args.debug: print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)") for pdf_link in nested_pdf_links: if pdf_link in seen_saved_urls: continue pdf_resp = context.request.get( pdf_link, headers={"referer": invoice_url}, timeout=20000, ) pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower() pdf_body = pdf_resp.body() if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body): continue order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(pdf_body) downloaded += 1 seen_saved_urls.add(pdf_link) print(f"Gespeichert: {out}") elif args.debug: print( f"[debug] Uebersprungen {invoice_url} " f"(status={response.status}, content-type={content_type})" ) except Exception as exc: if args.debug: print(f"[debug] Fehler bei {invoice_url}: {exc}") continue browser.close() debug_payload["totals"]["downloaded"] = downloaded if debug_json_target: debug_path = Path(debug_json_target).expanduser().resolve() write_debug_json(debug_path, debug_payload) print(f"Debug-JSON gespeichert: {debug_path}") print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen") sub = parser.add_subparsers(dest="command", required=True) p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern") p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk") p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen") p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode") p_config.add_argument("--locale", default="de-DE", help="Browser-Locale, z. B. de-DE") p_config.add_argument("--timezone", default="Europe/Berlin", help="Zeitzone, z. B. Europe/Berlin") p_config.add_argument("--currency", default="EUR", help="Waehrungshinweis fuer Konfiguration") p_config.add_argument("--amazon-language", default="de_DE", help="Amazon URL-Sprache, z. B. de_DE") p_config.add_argument( "--login-wait-seconds", type=int, default=60, help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).", ) p_config.set_defaults(func=configure) p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen") p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD") p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD") p_dl.add_argument("--output", help="Optionales Zielverzeichnis") p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten") p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None) p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks") p_dl.add_argument( "--debug-json", nargs="?", const=str(DEFAULT_DEBUG_JSON_PATH), default=None, help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).", ) p_dl.set_defaults(func=download) return parser def main() -> None: parser = build_parser() args = parser.parse_args() args.func(args) if __name__ == "__main__": main()