import argparse import json import re import time from dataclasses import dataclass from datetime import date, datetime from pathlib import Path from typing import Optional from urllib.parse import urljoin import dateparser from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright APP_DIR = Path.home() / ".amazon_invoice_downloader" CONFIG_PATH = APP_DIR / "config.json" STORAGE_STATE_PATH = APP_DIR / "storage_state.json" DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json" INVOICE_KEYWORDS = [ "invoice", "rechnung", "faktura", "vat", "steuer", "bill", "beleg", ] @dataclass class OrderInvoice: order_date: date order_id: str invoice_links: list[str] def ensure_app_dir() -> None: APP_DIR.mkdir(parents=True, exist_ok=True) def load_config() -> dict: if not CONFIG_PATH.exists(): raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.") return json.loads(CONFIG_PATH.read_text(encoding="utf-8")) def save_config(config: dict) -> None: ensure_app_dir() CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8") def parse_iso_date(value: str) -> date: try: return datetime.strptime(value, "%Y-%m-%d").date() except ValueError as exc: raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc def parse_date_from_text(text: str) -> Optional[date]: patterns = [ r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b", r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b", r"\b\d{1,2}/\d{1,2}/\d{4}\b", r"\b\d{4}-\d{2}-\d{2}\b", ] for pattern in patterns: m = re.search(pattern, text) if not m: continue parsed = dateparser.parse( m.group(0), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return None def parse_order_date_from_text(text: str) -> Optional[date]: focused_patterns = [ r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})", r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})", ] for pattern in focused_patterns: m = re.search(pattern, text, flags=re.IGNORECASE) if not m: continue parsed = dateparser.parse( m.group(2), languages=["de", "en"], settings={"DATE_ORDER": "DMY"}, ) if parsed: return parsed.date() return parse_date_from_text(text) def parse_order_id_from_text(text: str) -> Optional[str]: m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text) if m: return m.group(0) return None def years_for_range(start_date: date, end_date: date) -> list[int]: return list(range(end_date.year, start_date.year - 1, -1)) def build_orders_url(marketplace: str, year: int) -> str: return f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}" def text_contains_invoice_hint(text: str) -> bool: t = text.lower() return any(k in t for k in INVOICE_KEYWORDS) def collect_order_detail_links(page, base_url: str) -> list[str]: links = page.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) href = link.get_attribute("href") if not href: continue lower = href.lower() if ( "order-details" not in lower and "orderid=" not in lower and "/your-orders/order-details" not in lower ): continue absolute = urljoin(base_url, href) if absolute in seen: continue out.append(absolute) seen.add(absolute) return out def find_next_page_url(page, base_url: str) -> Optional[str]: a_last = page.locator("ul.a-pagination li.a-last a").first if a_last.count() > 0: href = a_last.get_attribute("href") if href: return urljoin(base_url, href) fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first if fallback.count() > 0: href = fallback.get_attribute("href") if href: return urljoin(base_url, href) return None def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]: links = scope.locator("a") out: list[str] = [] seen = set() for i in range(links.count()): link = links.nth(i) text = (link.inner_text(timeout=1000) or "").strip() href = link.get_attribute("href") if not href: continue absolute = urljoin(base_url, href) if absolute in seen: continue lower_href = absolute.lower() if ( text_contains_invoice_hint(text) or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"]) ): out.append(absolute) seen.add(absolute) return out def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]: cards = page.locator("div.order-card") results: list[OrderInvoice] = [] for i in range(cards.count()): card = cards.nth(i) card_text = card.inner_text(timeout=2000) order_date = parse_order_date_from_text(card_text) if order_date is None: if debug: print(f"[debug] order-card ohne Datum (index={i + 1})") continue order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}" invoice_links = extract_invoice_links_from_scope(card, base_url) if not invoice_links: continue results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)) return results def extract_invoice_candidates_from_detail(context, detail_url: str, base_url: str, debug: bool = False) -> Optional[OrderInvoice]: detail_page = context.new_page() try: detail_page.goto(detail_url, wait_until="domcontentloaded", timeout=15000) detail_page.wait_for_timeout(1200) body_text = detail_page.inner_text("body", timeout=4000) order_date = parse_order_date_from_text(body_text) if order_date is None: if debug: print(f"[debug] Kein Datum in Detailseite: {detail_url}") return None invoice_links = extract_invoice_links_from_scope(detail_page, base_url) if debug: print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)") if not invoice_links: return None order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}" return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links) except PlaywrightTimeoutError: if debug: print(f"[debug] Timeout bei Detailseite: {detail_url}") return None finally: detail_page.close() def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]: pdfs = set() for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE): href = m.group(1) absolute = urljoin(source_url, href) low = absolute.lower() if ".pdf" in low or "download" in low: pdfs.add(absolute) return list(pdfs) def write_debug_json(debug_json_path: Path, payload: dict) -> None: debug_json_path.parent.mkdir(parents=True, exist_ok=True) debug_json_path.write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8", ) def looks_like_pdf(content: bytes) -> bool: return content.startswith(b"%PDF-") def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path: safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id) filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename while out.exists(): sequence_no += 1 filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" out = download_dir / filename return out def configure(args) -> None: config = { "marketplace": args.marketplace, "download_dir": str(Path(args.download_dir).expanduser().resolve()), "headless": args.headless, } save_config(config) ensure_app_dir() with sync_playwright() as p: browser = p.chromium.launch(headless=False) context = browser.new_context() page = context.new_page() page.goto(f"https://www.amazon.{args.marketplace}/your-orders/orders", wait_until="domcontentloaded") print("Bitte im Browser bei Amazon einloggen.") if args.login_wait_seconds > 0: print( f"Warte {args.login_wait_seconds} Sekunden auf Login, " "danach wird die Session automatisch gespeichert." ) time.sleep(args.login_wait_seconds) else: print("Wenn Bestellungen sichtbar sind, Enter druecken.") input() context.storage_state(path=str(STORAGE_STATE_PATH)) browser.close() print(f"Konfiguration gespeichert: {CONFIG_PATH}") print(f"Session gespeichert: {STORAGE_STATE_PATH}") def download(args) -> None: config = load_config() if not STORAGE_STATE_PATH.exists(): raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.") start_date = parse_iso_date(args.date_from) end_date = parse_iso_date(args.date_to) if start_date > end_date: raise SystemExit("'from' muss kleiner/gleich 'to' sein.") marketplace = config["marketplace"] download_dir = Path(args.output or config["download_dir"]).expanduser().resolve() download_dir.mkdir(parents=True, exist_ok=True) debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None) with sync_playwright() as p: browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True))) context = browser.new_context(storage_state=str(STORAGE_STATE_PATH)) page = context.new_page() base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders" invoices: list[OrderInvoice] = [] seen_invoice_urls = set() years = years_for_range(start_date, end_date) debug_payload = { "requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()}, "years": years, "pages": [], "totals": { "order_cards": 0, "candidates_with_date": 0, "candidates_in_range": 0, }, } for year in years: filtered_url = build_orders_url(marketplace, year) if args.debug: print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}") page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000) visited_page_urls = set() for page_idx in range(args.max_pages): if page.url in visited_page_urls: if args.debug: print(f"[debug] Abbruch wegen wiederholter URL: {page.url}") break visited_page_urls.add(page.url) page.wait_for_timeout(1500) page_cards = page.locator("div.order-card") card_count = page_cards.count() overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug) debug_payload["pages"].append( { "year": year, "page": page_idx + 1, "url": page.url, "order_cards": card_count, "overview_candidates": len(overview_candidates), } ) debug_payload["totals"]["order_cards"] += card_count if args.debug: print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)") for candidate in overview_candidates: debug_payload["totals"]["candidates_with_date"] += 1 if start_date <= candidate.order_date <= end_date: filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls] if not filtered_links: continue seen_invoice_urls.update(filtered_links) invoices.append( OrderInvoice( order_date=candidate.order_date, order_id=candidate.order_id, invoice_links=filtered_links, ) ) debug_payload["totals"]["candidates_in_range"] += 1 if args.debug: print( f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)" ) next_page_url = find_next_page_url(page, base_orders_url) if not next_page_url: break try: page.goto(next_page_url, wait_until="domcontentloaded", timeout=15000) except PlaywrightTimeoutError: break downloaded = 0 seen_saved_urls = set() order_file_counters: dict[str, int] = {} for idx, order in enumerate(invoices, start=1): for link_idx, invoice_url in enumerate(order.invoice_links, start=1): if invoice_url in seen_saved_urls: continue try: response = context.request.get( invoice_url, headers={"referer": page.url}, timeout=20000, ) content_type = (response.headers.get("content-type", "") or "").lower() body = response.body() if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body): order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(body) downloaded += 1 seen_saved_urls.add(invoice_url) print(f"Gespeichert: {out}") continue if "html" in content_type: nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url) if args.debug: print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)") for pdf_link in nested_pdf_links: if pdf_link in seen_saved_urls: continue pdf_resp = context.request.get( pdf_link, headers={"referer": invoice_url}, timeout=20000, ) pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower() pdf_body = pdf_resp.body() if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body): continue order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) out.write_bytes(pdf_body) downloaded += 1 seen_saved_urls.add(pdf_link) print(f"Gespeichert: {out}") elif args.debug: print( f"[debug] Uebersprungen {invoice_url} " f"(status={response.status}, content-type={content_type})" ) except Exception as exc: if args.debug: print(f"[debug] Fehler bei {invoice_url}: {exc}") continue browser.close() debug_payload["totals"]["downloaded"] = downloaded if debug_json_target: debug_path = Path(debug_json_target).expanduser().resolve() write_debug_json(debug_path, debug_payload) print(f"Debug-JSON gespeichert: {debug_path}") print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen") sub = parser.add_subparsers(dest="command", required=True) p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern") p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk") p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen") p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode") p_config.add_argument( "--login-wait-seconds", type=int, default=0, help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).", ) p_config.set_defaults(func=configure) p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen") p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD") p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD") p_dl.add_argument("--output", help="Optionales Zielverzeichnis") p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten") p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None) p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks") p_dl.add_argument( "--debug-json", nargs="?", const=str(DEFAULT_DEBUG_JSON_PATH), default=None, help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).", ) p_dl.set_defaults(func=download) return parser def main() -> None: parser = build_parser() args = parser.parse_args() args.func(args) if __name__ == "__main__": main()