commit cafa2f7e9bdbdc923cc820ad175c2f39d9cce683 Author: Stefan Heyn Date: Wed Mar 4 14:02:29 2026 +0100 Initial commit: amazon invoice downloader diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f03db00 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo + +# Virtual environments +.venv/ +venv/ + +# Local debug and runtime artifacts +debug-run.json +*.log + +# IDE +.vscode/*.code-workspace + +# OS +.DS_Store +Thumbs.db diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..5a82bbd --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,27 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Amazon Downloader: Januar 2025 Debug", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/main.py", + "cwd": "${workspaceFolder}", + "console": "integratedTerminal", + "args": [ + "download", + "--from", + "2025-01-01", + "--to", + "2025-01-31", + "--debug", + "--headless", + "false" + ] + } + ] +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..1734a3c --- /dev/null +++ b/README.md @@ -0,0 +1,53 @@ +# Amazon Invoice Downloader + +Ladet Amazon-Rechnungen fuer einen konfigurierbaren Zeitraum herunter. + +## Voraussetzungen + +- Windows/macOS/Linux +- Python 3.10+ +- Ein Amazon-Konto + +## Installation + +```powershell +cd c:\projekte\amazon_invoice_downloader +python -m venv .venv +.\.venv\Scripts\Activate.ps1 +pip install -r requirements.txt +python -m playwright install chromium +``` + +## Einmalige Konfiguration + +```powershell +python main.py configure --marketplace de --download-dir "C:\Users\\Downloads\amazon_rechnungen" +``` + +Dann oeffnet sich ein Browser. Dort bei Amazon anmelden und auf Enter im Terminal druecken. +Die Session wird lokal gespeichert in: + +- `~/.amazon_invoice_downloader/config.json` +- `~/.amazon_invoice_downloader/storage_state.json` + +## Rechnungen herunterladen + +```powershell +python main.py download --from 2025-01-01 --to 2025-12-31 +``` + +Optionen: + +- `--output `: anderes Zielverzeichnis +- `--max-pages 25`: Anzahl Bestellseiten, die durchsucht werden +- `--headless true|false`: Browser sichtbar oder unsichtbar +- `--debug`: zeigt, wie viele Detailseiten und Rechnungslinks gefunden werden +- `--debug-json [pfad]`: schreibt Laufdetails als JSON (ohne Pfad: Standarddatei) + +## Hinweise + +- Das Skript setzt pro Jahr den Amazon-Filter `timeFilter=year-YYYY` auf `your-orders/orders`, damit Zeitraeume wie Januar 2025 korrekt durchlaufen werden. +- Die Datumspruefung erfolgt direkt auf der Uebersichtsseite in den `order-card`-Elementen; Detailseiten sind nicht mehr der Primaerpfad. +- Amazon-HTML kann sich aendern. Falls keine Rechnungen gefunden werden, `--debug --debug-json` nutzen. +- Der Login ist absichtlich sessionbasiert statt Passwortspeicherung. +- Bitte nur gemaess Amazon-AGB nutzen. diff --git a/main.py b/main.py new file mode 100644 index 0000000..f786666 --- /dev/null +++ b/main.py @@ -0,0 +1,488 @@ +import argparse +import json +import re +from dataclasses import dataclass +from datetime import date, datetime +from pathlib import Path +from typing import Optional +from urllib.parse import urljoin + +import dateparser +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError +from playwright.sync_api import sync_playwright + +APP_DIR = Path.home() / ".amazon_invoice_downloader" +CONFIG_PATH = APP_DIR / "config.json" +STORAGE_STATE_PATH = APP_DIR / "storage_state.json" +DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json" + +INVOICE_KEYWORDS = [ + "invoice", + "rechnung", + "faktura", + "vat", + "steuer", + "bill", + "beleg", +] + + +@dataclass +class OrderInvoice: + order_date: date + order_id: str + invoice_links: list[str] + + +def ensure_app_dir() -> None: + APP_DIR.mkdir(parents=True, exist_ok=True) + + +def load_config() -> dict: + if not CONFIG_PATH.exists(): + raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.") + return json.loads(CONFIG_PATH.read_text(encoding="utf-8")) + + +def save_config(config: dict) -> None: + ensure_app_dir() + CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8") + + +def parse_iso_date(value: str) -> date: + try: + return datetime.strptime(value, "%Y-%m-%d").date() + except ValueError as exc: + raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc + + +def parse_date_from_text(text: str) -> Optional[date]: + patterns = [ + r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b", + r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b", + r"\b\d{1,2}/\d{1,2}/\d{4}\b", + r"\b\d{4}-\d{2}-\d{2}\b", + ] + for pattern in patterns: + m = re.search(pattern, text) + if not m: + continue + parsed = dateparser.parse( + m.group(0), + languages=["de", "en"], + settings={"DATE_ORDER": "DMY"}, + ) + if parsed: + return parsed.date() + return None + + +def parse_order_date_from_text(text: str) -> Optional[date]: + focused_patterns = [ + r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})", + r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})", + r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})", + ] + for pattern in focused_patterns: + m = re.search(pattern, text, flags=re.IGNORECASE) + if not m: + continue + parsed = dateparser.parse( + m.group(2), + languages=["de", "en"], + settings={"DATE_ORDER": "DMY"}, + ) + if parsed: + return parsed.date() + return parse_date_from_text(text) + + +def parse_order_id_from_text(text: str) -> Optional[str]: + m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text) + if m: + return m.group(0) + return None + + +def years_for_range(start_date: date, end_date: date) -> list[int]: + return list(range(end_date.year, start_date.year - 1, -1)) + + +def build_orders_url(marketplace: str, year: int) -> str: + return f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}" + + +def text_contains_invoice_hint(text: str) -> bool: + t = text.lower() + return any(k in t for k in INVOICE_KEYWORDS) + + +def collect_order_detail_links(page, base_url: str) -> list[str]: + links = page.locator("a") + out: list[str] = [] + seen = set() + for i in range(links.count()): + link = links.nth(i) + href = link.get_attribute("href") + if not href: + continue + lower = href.lower() + if ( + "order-details" not in lower + and "orderid=" not in lower + and "/your-orders/order-details" not in lower + ): + continue + absolute = urljoin(base_url, href) + if absolute in seen: + continue + out.append(absolute) + seen.add(absolute) + return out + + +def find_next_page_url(page, base_url: str) -> Optional[str]: + a_last = page.locator("ul.a-pagination li.a-last a").first + if a_last.count() > 0: + href = a_last.get_attribute("href") + if href: + return urljoin(base_url, href) + + fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first + if fallback.count() > 0: + href = fallback.get_attribute("href") + if href: + return urljoin(base_url, href) + return None + + +def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]: + links = scope.locator("a") + out: list[str] = [] + seen = set() + for i in range(links.count()): + link = links.nth(i) + text = (link.inner_text(timeout=1000) or "").strip() + href = link.get_attribute("href") + if not href: + continue + + absolute = urljoin(base_url, href) + if absolute in seen: + continue + + lower_href = absolute.lower() + if ( + text_contains_invoice_hint(text) + or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"]) + ): + out.append(absolute) + seen.add(absolute) + return out + + +def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]: + cards = page.locator("div.order-card") + results: list[OrderInvoice] = [] + for i in range(cards.count()): + card = cards.nth(i) + card_text = card.inner_text(timeout=2000) + order_date = parse_order_date_from_text(card_text) + if order_date is None: + if debug: + print(f"[debug] order-card ohne Datum (index={i + 1})") + continue + order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}" + + invoice_links = extract_invoice_links_from_scope(card, base_url) + if not invoice_links: + continue + results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)) + return results + + +def extract_invoice_candidates_from_detail(context, detail_url: str, base_url: str, debug: bool = False) -> Optional[OrderInvoice]: + detail_page = context.new_page() + try: + detail_page.goto(detail_url, wait_until="domcontentloaded", timeout=15000) + detail_page.wait_for_timeout(1200) + body_text = detail_page.inner_text("body", timeout=4000) + order_date = parse_order_date_from_text(body_text) + if order_date is None: + if debug: + print(f"[debug] Kein Datum in Detailseite: {detail_url}") + return None + + invoice_links = extract_invoice_links_from_scope(detail_page, base_url) + if debug: + print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)") + if not invoice_links: + return None + + order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}" + return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links) + except PlaywrightTimeoutError: + if debug: + print(f"[debug] Timeout bei Detailseite: {detail_url}") + return None + finally: + detail_page.close() + + +def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]: + pdfs = set() + for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE): + href = m.group(1) + absolute = urljoin(source_url, href) + low = absolute.lower() + if ".pdf" in low or "download" in low: + pdfs.add(absolute) + return list(pdfs) + + +def write_debug_json(debug_json_path: Path, payload: dict) -> None: + debug_json_path.parent.mkdir(parents=True, exist_ok=True) + debug_json_path.write_text( + json.dumps(payload, indent=2, ensure_ascii=False), + encoding="utf-8", + ) + + +def looks_like_pdf(content: bytes) -> bool: + return content.startswith(b"%PDF-") + + +def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path: + safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id) + filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" + out = download_dir / filename + while out.exists(): + sequence_no += 1 + filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf" + out = download_dir / filename + return out + + +def configure(args) -> None: + config = { + "marketplace": args.marketplace, + "download_dir": str(Path(args.download_dir).expanduser().resolve()), + "headless": args.headless, + } + save_config(config) + ensure_app_dir() + + with sync_playwright() as p: + browser = p.chromium.launch(headless=False) + context = browser.new_context() + page = context.new_page() + page.goto(f"https://www.amazon.{args.marketplace}/your-orders/orders", wait_until="domcontentloaded") + print("Bitte im Browser bei Amazon einloggen.") + print("Wenn Bestellungen sichtbar sind, Enter druecken.") + input() + context.storage_state(path=str(STORAGE_STATE_PATH)) + browser.close() + + print(f"Konfiguration gespeichert: {CONFIG_PATH}") + print(f"Session gespeichert: {STORAGE_STATE_PATH}") + + +def download(args) -> None: + config = load_config() + if not STORAGE_STATE_PATH.exists(): + raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.") + + start_date = parse_iso_date(args.date_from) + end_date = parse_iso_date(args.date_to) + if start_date > end_date: + raise SystemExit("'from' muss kleiner/gleich 'to' sein.") + + marketplace = config["marketplace"] + download_dir = Path(args.output or config["download_dir"]).expanduser().resolve() + download_dir.mkdir(parents=True, exist_ok=True) + debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None) + + with sync_playwright() as p: + browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True))) + context = browser.new_context(storage_state=str(STORAGE_STATE_PATH)) + page = context.new_page() + + base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders" + invoices: list[OrderInvoice] = [] + seen_invoice_urls = set() + years = years_for_range(start_date, end_date) + debug_payload = { + "requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()}, + "years": years, + "pages": [], + "totals": { + "order_cards": 0, + "candidates_with_date": 0, + "candidates_in_range": 0, + }, + } + + for year in years: + filtered_url = build_orders_url(marketplace, year) + if args.debug: + print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}") + page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000) + visited_page_urls = set() + + for page_idx in range(args.max_pages): + if page.url in visited_page_urls: + if args.debug: + print(f"[debug] Abbruch wegen wiederholter URL: {page.url}") + break + visited_page_urls.add(page.url) + page.wait_for_timeout(1500) + page_cards = page.locator("div.order-card") + card_count = page_cards.count() + overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug) + debug_payload["pages"].append( + { + "year": year, + "page": page_idx + 1, + "url": page.url, + "order_cards": card_count, + "overview_candidates": len(overview_candidates), + } + ) + debug_payload["totals"]["order_cards"] += card_count + + if args.debug: + print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)") + + for candidate in overview_candidates: + debug_payload["totals"]["candidates_with_date"] += 1 + if start_date <= candidate.order_date <= end_date: + filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls] + if not filtered_links: + continue + seen_invoice_urls.update(filtered_links) + invoices.append( + OrderInvoice( + order_date=candidate.order_date, + order_id=candidate.order_id, + invoice_links=filtered_links, + ) + ) + debug_payload["totals"]["candidates_in_range"] += 1 + if args.debug: + print( + f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)" + ) + + next_page_url = find_next_page_url(page, base_orders_url) + if not next_page_url: + break + try: + page.goto(next_page_url, wait_until="domcontentloaded", timeout=15000) + except PlaywrightTimeoutError: + break + + downloaded = 0 + seen_saved_urls = set() + order_file_counters: dict[str, int] = {} + for idx, order in enumerate(invoices, start=1): + for link_idx, invoice_url in enumerate(order.invoice_links, start=1): + if invoice_url in seen_saved_urls: + continue + try: + response = context.request.get( + invoice_url, + headers={"referer": page.url}, + timeout=20000, + ) + content_type = (response.headers.get("content-type", "") or "").lower() + body = response.body() + + if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body): + order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 + out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) + out.write_bytes(body) + downloaded += 1 + seen_saved_urls.add(invoice_url) + print(f"Gespeichert: {out}") + continue + + if "html" in content_type: + nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url) + if args.debug: + print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)") + for pdf_link in nested_pdf_links: + if pdf_link in seen_saved_urls: + continue + pdf_resp = context.request.get( + pdf_link, + headers={"referer": invoice_url}, + timeout=20000, + ) + pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower() + pdf_body = pdf_resp.body() + if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body): + continue + order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1 + out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id]) + out.write_bytes(pdf_body) + downloaded += 1 + seen_saved_urls.add(pdf_link) + print(f"Gespeichert: {out}") + elif args.debug: + print( + f"[debug] Uebersprungen {invoice_url} " + f"(status={response.status}, content-type={content_type})" + ) + except Exception as exc: + if args.debug: + print(f"[debug] Fehler bei {invoice_url}: {exc}") + continue + + browser.close() + + debug_payload["totals"]["downloaded"] = downloaded + if debug_json_target: + debug_path = Path(debug_json_target).expanduser().resolve() + write_debug_json(debug_path, debug_payload) + print(f"Debug-JSON gespeichert: {debug_path}") + + print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen") + sub = parser.add_subparsers(dest="command", required=True) + + p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern") + p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk") + p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen") + p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode") + p_config.set_defaults(func=configure) + + p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen") + p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD") + p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD") + p_dl.add_argument("--output", help="Optionales Zielverzeichnis") + p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten") + p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None) + p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks") + p_dl.add_argument( + "--debug-json", + nargs="?", + const=str(DEFAULT_DEBUG_JSON_PATH), + default=None, + help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).", + ) + p_dl.set_defaults(func=download) + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..facef3c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +playwright>=1.52.0 +requests>=2.32.0 +dateparser>=1.2.0