Initial commit: amazon invoice downloader

This commit is contained in:
Stefan Heyn 2026-03-04 14:02:29 +01:00
commit cafa2f7e9b
5 changed files with 590 additions and 0 deletions

19
.gitignore vendored Normal file
View file

@ -0,0 +1,19 @@
# Python
__pycache__/
*.py[cod]
*.pyo
# Virtual environments
.venv/
venv/
# Local debug and runtime artifacts
debug-run.json
*.log
# IDE
.vscode/*.code-workspace
# OS
.DS_Store
Thumbs.db

27
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,27 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Amazon Downloader: Januar 2025 Debug",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/main.py",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"args": [
"download",
"--from",
"2025-01-01",
"--to",
"2025-01-31",
"--debug",
"--headless",
"false"
]
}
]
}

53
README.md Normal file
View file

@ -0,0 +1,53 @@
# Amazon Invoice Downloader
Ladet Amazon-Rechnungen fuer einen konfigurierbaren Zeitraum herunter.
## Voraussetzungen
- Windows/macOS/Linux
- Python 3.10+
- Ein Amazon-Konto
## Installation
```powershell
cd c:\projekte\amazon_invoice_downloader
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt
python -m playwright install chromium
```
## Einmalige Konfiguration
```powershell
python main.py configure --marketplace de --download-dir "C:\Users\<USER>\Downloads\amazon_rechnungen"
```
Dann oeffnet sich ein Browser. Dort bei Amazon anmelden und auf Enter im Terminal druecken.
Die Session wird lokal gespeichert in:
- `~/.amazon_invoice_downloader/config.json`
- `~/.amazon_invoice_downloader/storage_state.json`
## Rechnungen herunterladen
```powershell
python main.py download --from 2025-01-01 --to 2025-12-31
```
Optionen:
- `--output <pfad>`: anderes Zielverzeichnis
- `--max-pages 25`: Anzahl Bestellseiten, die durchsucht werden
- `--headless true|false`: Browser sichtbar oder unsichtbar
- `--debug`: zeigt, wie viele Detailseiten und Rechnungslinks gefunden werden
- `--debug-json [pfad]`: schreibt Laufdetails als JSON (ohne Pfad: Standarddatei)
## Hinweise
- Das Skript setzt pro Jahr den Amazon-Filter `timeFilter=year-YYYY` auf `your-orders/orders`, damit Zeitraeume wie Januar 2025 korrekt durchlaufen werden.
- Die Datumspruefung erfolgt direkt auf der Uebersichtsseite in den `order-card`-Elementen; Detailseiten sind nicht mehr der Primaerpfad.
- Amazon-HTML kann sich aendern. Falls keine Rechnungen gefunden werden, `--debug --debug-json` nutzen.
- Der Login ist absichtlich sessionbasiert statt Passwortspeicherung.
- Bitte nur gemaess Amazon-AGB nutzen.

488
main.py Normal file
View file

@ -0,0 +1,488 @@
import argparse
import json
import re
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin
import dateparser
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
APP_DIR = Path.home() / ".amazon_invoice_downloader"
CONFIG_PATH = APP_DIR / "config.json"
STORAGE_STATE_PATH = APP_DIR / "storage_state.json"
DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json"
INVOICE_KEYWORDS = [
"invoice",
"rechnung",
"faktura",
"vat",
"steuer",
"bill",
"beleg",
]
@dataclass
class OrderInvoice:
order_date: date
order_id: str
invoice_links: list[str]
def ensure_app_dir() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
if not CONFIG_PATH.exists():
raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.")
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
def save_config(config: dict) -> None:
ensure_app_dir()
CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8")
def parse_iso_date(value: str) -> date:
try:
return datetime.strptime(value, "%Y-%m-%d").date()
except ValueError as exc:
raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc
def parse_date_from_text(text: str) -> Optional[date]:
patterns = [
r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b",
r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b",
r"\b\d{1,2}/\d{1,2}/\d{4}\b",
r"\b\d{4}-\d{2}-\d{2}\b",
]
for pattern in patterns:
m = re.search(pattern, text)
if not m:
continue
parsed = dateparser.parse(
m.group(0),
languages=["de", "en"],
settings={"DATE_ORDER": "DMY"},
)
if parsed:
return parsed.date()
return None
def parse_order_date_from_text(text: str) -> Optional[date]:
focused_patterns = [
r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})",
r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})",
r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})",
]
for pattern in focused_patterns:
m = re.search(pattern, text, flags=re.IGNORECASE)
if not m:
continue
parsed = dateparser.parse(
m.group(2),
languages=["de", "en"],
settings={"DATE_ORDER": "DMY"},
)
if parsed:
return parsed.date()
return parse_date_from_text(text)
def parse_order_id_from_text(text: str) -> Optional[str]:
m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text)
if m:
return m.group(0)
return None
def years_for_range(start_date: date, end_date: date) -> list[int]:
return list(range(end_date.year, start_date.year - 1, -1))
def build_orders_url(marketplace: str, year: int) -> str:
return f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}"
def text_contains_invoice_hint(text: str) -> bool:
t = text.lower()
return any(k in t for k in INVOICE_KEYWORDS)
def collect_order_detail_links(page, base_url: str) -> list[str]:
links = page.locator("a")
out: list[str] = []
seen = set()
for i in range(links.count()):
link = links.nth(i)
href = link.get_attribute("href")
if not href:
continue
lower = href.lower()
if (
"order-details" not in lower
and "orderid=" not in lower
and "/your-orders/order-details" not in lower
):
continue
absolute = urljoin(base_url, href)
if absolute in seen:
continue
out.append(absolute)
seen.add(absolute)
return out
def find_next_page_url(page, base_url: str) -> Optional[str]:
a_last = page.locator("ul.a-pagination li.a-last a").first
if a_last.count() > 0:
href = a_last.get_attribute("href")
if href:
return urljoin(base_url, href)
fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first
if fallback.count() > 0:
href = fallback.get_attribute("href")
if href:
return urljoin(base_url, href)
return None
def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]:
links = scope.locator("a")
out: list[str] = []
seen = set()
for i in range(links.count()):
link = links.nth(i)
text = (link.inner_text(timeout=1000) or "").strip()
href = link.get_attribute("href")
if not href:
continue
absolute = urljoin(base_url, href)
if absolute in seen:
continue
lower_href = absolute.lower()
if (
text_contains_invoice_hint(text)
or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"])
):
out.append(absolute)
seen.add(absolute)
return out
def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]:
cards = page.locator("div.order-card")
results: list[OrderInvoice] = []
for i in range(cards.count()):
card = cards.nth(i)
card_text = card.inner_text(timeout=2000)
order_date = parse_order_date_from_text(card_text)
if order_date is None:
if debug:
print(f"[debug] order-card ohne Datum (index={i + 1})")
continue
order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}"
invoice_links = extract_invoice_links_from_scope(card, base_url)
if not invoice_links:
continue
results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links))
return results
def extract_invoice_candidates_from_detail(context, detail_url: str, base_url: str, debug: bool = False) -> Optional[OrderInvoice]:
detail_page = context.new_page()
try:
detail_page.goto(detail_url, wait_until="domcontentloaded", timeout=15000)
detail_page.wait_for_timeout(1200)
body_text = detail_page.inner_text("body", timeout=4000)
order_date = parse_order_date_from_text(body_text)
if order_date is None:
if debug:
print(f"[debug] Kein Datum in Detailseite: {detail_url}")
return None
invoice_links = extract_invoice_links_from_scope(detail_page, base_url)
if debug:
print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)")
if not invoice_links:
return None
order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}"
return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)
except PlaywrightTimeoutError:
if debug:
print(f"[debug] Timeout bei Detailseite: {detail_url}")
return None
finally:
detail_page.close()
def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]:
pdfs = set()
for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE):
href = m.group(1)
absolute = urljoin(source_url, href)
low = absolute.lower()
if ".pdf" in low or "download" in low:
pdfs.add(absolute)
return list(pdfs)
def write_debug_json(debug_json_path: Path, payload: dict) -> None:
debug_json_path.parent.mkdir(parents=True, exist_ok=True)
debug_json_path.write_text(
json.dumps(payload, indent=2, ensure_ascii=False),
encoding="utf-8",
)
def looks_like_pdf(content: bytes) -> bool:
return content.startswith(b"%PDF-")
def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path:
safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id)
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
out = download_dir / filename
while out.exists():
sequence_no += 1
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
out = download_dir / filename
return out
def configure(args) -> None:
config = {
"marketplace": args.marketplace,
"download_dir": str(Path(args.download_dir).expanduser().resolve()),
"headless": args.headless,
}
save_config(config)
ensure_app_dir()
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
page.goto(f"https://www.amazon.{args.marketplace}/your-orders/orders", wait_until="domcontentloaded")
print("Bitte im Browser bei Amazon einloggen.")
print("Wenn Bestellungen sichtbar sind, Enter druecken.")
input()
context.storage_state(path=str(STORAGE_STATE_PATH))
browser.close()
print(f"Konfiguration gespeichert: {CONFIG_PATH}")
print(f"Session gespeichert: {STORAGE_STATE_PATH}")
def download(args) -> None:
config = load_config()
if not STORAGE_STATE_PATH.exists():
raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.")
start_date = parse_iso_date(args.date_from)
end_date = parse_iso_date(args.date_to)
if start_date > end_date:
raise SystemExit("'from' muss kleiner/gleich 'to' sein.")
marketplace = config["marketplace"]
download_dir = Path(args.output or config["download_dir"]).expanduser().resolve()
download_dir.mkdir(parents=True, exist_ok=True)
debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None)
with sync_playwright() as p:
browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True)))
context = browser.new_context(storage_state=str(STORAGE_STATE_PATH))
page = context.new_page()
base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders"
invoices: list[OrderInvoice] = []
seen_invoice_urls = set()
years = years_for_range(start_date, end_date)
debug_payload = {
"requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()},
"years": years,
"pages": [],
"totals": {
"order_cards": 0,
"candidates_with_date": 0,
"candidates_in_range": 0,
},
}
for year in years:
filtered_url = build_orders_url(marketplace, year)
if args.debug:
print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}")
page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000)
visited_page_urls = set()
for page_idx in range(args.max_pages):
if page.url in visited_page_urls:
if args.debug:
print(f"[debug] Abbruch wegen wiederholter URL: {page.url}")
break
visited_page_urls.add(page.url)
page.wait_for_timeout(1500)
page_cards = page.locator("div.order-card")
card_count = page_cards.count()
overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug)
debug_payload["pages"].append(
{
"year": year,
"page": page_idx + 1,
"url": page.url,
"order_cards": card_count,
"overview_candidates": len(overview_candidates),
}
)
debug_payload["totals"]["order_cards"] += card_count
if args.debug:
print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)")
for candidate in overview_candidates:
debug_payload["totals"]["candidates_with_date"] += 1
if start_date <= candidate.order_date <= end_date:
filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls]
if not filtered_links:
continue
seen_invoice_urls.update(filtered_links)
invoices.append(
OrderInvoice(
order_date=candidate.order_date,
order_id=candidate.order_id,
invoice_links=filtered_links,
)
)
debug_payload["totals"]["candidates_in_range"] += 1
if args.debug:
print(
f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)"
)
next_page_url = find_next_page_url(page, base_orders_url)
if not next_page_url:
break
try:
page.goto(next_page_url, wait_until="domcontentloaded", timeout=15000)
except PlaywrightTimeoutError:
break
downloaded = 0
seen_saved_urls = set()
order_file_counters: dict[str, int] = {}
for idx, order in enumerate(invoices, start=1):
for link_idx, invoice_url in enumerate(order.invoice_links, start=1):
if invoice_url in seen_saved_urls:
continue
try:
response = context.request.get(
invoice_url,
headers={"referer": page.url},
timeout=20000,
)
content_type = (response.headers.get("content-type", "") or "").lower()
body = response.body()
if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body):
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
out.write_bytes(body)
downloaded += 1
seen_saved_urls.add(invoice_url)
print(f"Gespeichert: {out}")
continue
if "html" in content_type:
nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url)
if args.debug:
print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)")
for pdf_link in nested_pdf_links:
if pdf_link in seen_saved_urls:
continue
pdf_resp = context.request.get(
pdf_link,
headers={"referer": invoice_url},
timeout=20000,
)
pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower()
pdf_body = pdf_resp.body()
if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body):
continue
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
out.write_bytes(pdf_body)
downloaded += 1
seen_saved_urls.add(pdf_link)
print(f"Gespeichert: {out}")
elif args.debug:
print(
f"[debug] Uebersprungen {invoice_url} "
f"(status={response.status}, content-type={content_type})"
)
except Exception as exc:
if args.debug:
print(f"[debug] Fehler bei {invoice_url}: {exc}")
continue
browser.close()
debug_payload["totals"]["downloaded"] = downloaded
if debug_json_target:
debug_path = Path(debug_json_target).expanduser().resolve()
write_debug_json(debug_path, debug_payload)
print(f"Debug-JSON gespeichert: {debug_path}")
print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen")
sub = parser.add_subparsers(dest="command", required=True)
p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern")
p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk")
p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen")
p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode")
p_config.set_defaults(func=configure)
p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen")
p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD")
p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD")
p_dl.add_argument("--output", help="Optionales Zielverzeichnis")
p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten")
p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None)
p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks")
p_dl.add_argument(
"--debug-json",
nargs="?",
const=str(DEFAULT_DEBUG_JSON_PATH),
default=None,
help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).",
)
p_dl.set_defaults(func=download)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

3
requirements.txt Normal file
View file

@ -0,0 +1,3 @@
playwright>=1.52.0
requests>=2.32.0
dateparser>=1.2.0