553 lines
21 KiB
Python
553 lines
21 KiB
Python
import argparse
|
|
import json
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
|
|
|
|
import dateparser
|
|
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
APP_DIR = Path.home() / ".amazon_invoice_downloader"
|
|
CONFIG_PATH = APP_DIR / "config.json"
|
|
STORAGE_STATE_PATH = APP_DIR / "storage_state.json"
|
|
DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json"
|
|
|
|
INVOICE_KEYWORDS = [
|
|
"invoice",
|
|
"rechnung",
|
|
"faktura",
|
|
"vat",
|
|
"steuer",
|
|
"bill",
|
|
"beleg",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class OrderInvoice:
|
|
order_date: date
|
|
order_id: str
|
|
invoice_links: list[str]
|
|
|
|
|
|
def ensure_app_dir() -> None:
|
|
APP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_config() -> dict:
|
|
if not CONFIG_PATH.exists():
|
|
raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.")
|
|
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
|
|
|
|
|
def save_config(config: dict) -> None:
|
|
ensure_app_dir()
|
|
CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8")
|
|
|
|
|
|
def build_context_options(config: dict) -> dict:
|
|
locale = config.get("locale", "de-DE")
|
|
timezone = config.get("timezone", "Europe/Berlin")
|
|
return {
|
|
"locale": locale,
|
|
"timezone_id": timezone,
|
|
"extra_http_headers": {
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
},
|
|
}
|
|
|
|
|
|
def parse_iso_date(value: str) -> date:
|
|
try:
|
|
return datetime.strptime(value, "%Y-%m-%d").date()
|
|
except ValueError as exc:
|
|
raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc
|
|
|
|
|
|
def parse_date_from_text(text: str) -> Optional[date]:
|
|
patterns = [
|
|
r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b",
|
|
r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b",
|
|
r"\b\d{1,2}/\d{1,2}/\d{4}\b",
|
|
r"\b\d{4}-\d{2}-\d{2}\b",
|
|
]
|
|
for pattern in patterns:
|
|
m = re.search(pattern, text)
|
|
if not m:
|
|
continue
|
|
parsed = dateparser.parse(
|
|
m.group(0),
|
|
languages=["de", "en"],
|
|
settings={"DATE_ORDER": "DMY"},
|
|
)
|
|
if parsed:
|
|
return parsed.date()
|
|
return None
|
|
|
|
|
|
def parse_order_date_from_text(text: str) -> Optional[date]:
|
|
focused_patterns = [
|
|
r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})",
|
|
r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})",
|
|
r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})",
|
|
]
|
|
for pattern in focused_patterns:
|
|
m = re.search(pattern, text, flags=re.IGNORECASE)
|
|
if not m:
|
|
continue
|
|
parsed = dateparser.parse(
|
|
m.group(2),
|
|
languages=["de", "en"],
|
|
settings={"DATE_ORDER": "DMY"},
|
|
)
|
|
if parsed:
|
|
return parsed.date()
|
|
return parse_date_from_text(text)
|
|
|
|
|
|
def parse_order_id_from_text(text: str) -> Optional[str]:
|
|
m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text)
|
|
if m:
|
|
return m.group(0)
|
|
return None
|
|
|
|
|
|
def years_for_range(start_date: date, end_date: date) -> list[int]:
|
|
return list(range(end_date.year, start_date.year - 1, -1))
|
|
|
|
|
|
def with_amazon_language(url: str, amazon_language: str) -> str:
|
|
parsed = urlparse(url)
|
|
query = dict(parse_qsl(parsed.query, keep_blank_values=True))
|
|
query["language"] = amazon_language
|
|
return urlunparse(parsed._replace(query=urlencode(query)))
|
|
|
|
|
|
def build_orders_url(marketplace: str, year: int, amazon_language: str) -> str:
|
|
base = f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}"
|
|
return with_amazon_language(base, amazon_language)
|
|
|
|
|
|
def text_contains_invoice_hint(text: str) -> bool:
|
|
t = text.lower()
|
|
return any(k in t for k in INVOICE_KEYWORDS)
|
|
|
|
|
|
def collect_order_detail_links(page, base_url: str) -> list[str]:
|
|
links = page.locator("a")
|
|
out: list[str] = []
|
|
seen = set()
|
|
for i in range(links.count()):
|
|
link = links.nth(i)
|
|
href = link.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
lower = href.lower()
|
|
if (
|
|
"order-details" not in lower
|
|
and "orderid=" not in lower
|
|
and "/your-orders/order-details" not in lower
|
|
):
|
|
continue
|
|
absolute = urljoin(base_url, href)
|
|
if absolute in seen:
|
|
continue
|
|
out.append(absolute)
|
|
seen.add(absolute)
|
|
return out
|
|
|
|
|
|
def find_next_page_url(page, base_url: str) -> Optional[str]:
|
|
a_last = page.locator("ul.a-pagination li.a-last a").first
|
|
if a_last.count() > 0:
|
|
href = a_last.get_attribute("href")
|
|
if href:
|
|
return urljoin(base_url, href)
|
|
|
|
fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first
|
|
if fallback.count() > 0:
|
|
href = fallback.get_attribute("href")
|
|
if href:
|
|
return urljoin(base_url, href)
|
|
return None
|
|
|
|
|
|
def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]:
|
|
links = scope.locator("a")
|
|
out: list[str] = []
|
|
seen = set()
|
|
for i in range(links.count()):
|
|
link = links.nth(i)
|
|
text = (link.inner_text(timeout=1000) or "").strip()
|
|
href = link.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
|
|
absolute = urljoin(base_url, href)
|
|
if absolute in seen:
|
|
continue
|
|
|
|
lower_href = absolute.lower()
|
|
if (
|
|
text_contains_invoice_hint(text)
|
|
or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"])
|
|
):
|
|
out.append(absolute)
|
|
seen.add(absolute)
|
|
return out
|
|
|
|
|
|
def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]:
|
|
cards = page.locator("div.order-card")
|
|
results: list[OrderInvoice] = []
|
|
for i in range(cards.count()):
|
|
card = cards.nth(i)
|
|
card_text = card.inner_text(timeout=2000)
|
|
order_date = parse_order_date_from_text(card_text)
|
|
if order_date is None:
|
|
if debug:
|
|
print(f"[debug] order-card ohne Datum (index={i + 1})")
|
|
continue
|
|
order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}"
|
|
|
|
invoice_links = extract_invoice_links_from_scope(card, base_url)
|
|
if not invoice_links:
|
|
continue
|
|
results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links))
|
|
return results
|
|
|
|
|
|
def extract_invoice_candidates_from_detail(
|
|
context,
|
|
detail_url: str,
|
|
base_url: str,
|
|
amazon_language: str,
|
|
debug: bool = False,
|
|
) -> Optional[OrderInvoice]:
|
|
detail_page = context.new_page()
|
|
try:
|
|
detail_page.goto(
|
|
with_amazon_language(detail_url, amazon_language),
|
|
wait_until="domcontentloaded",
|
|
timeout=15000,
|
|
)
|
|
detail_page.wait_for_timeout(1200)
|
|
body_text = detail_page.inner_text("body", timeout=4000)
|
|
order_date = parse_order_date_from_text(body_text)
|
|
if order_date is None:
|
|
if debug:
|
|
print(f"[debug] Kein Datum in Detailseite: {detail_url}")
|
|
return None
|
|
|
|
invoice_links = extract_invoice_links_from_scope(detail_page, base_url)
|
|
if debug:
|
|
print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)")
|
|
if not invoice_links:
|
|
return None
|
|
|
|
order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}"
|
|
return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)
|
|
except PlaywrightTimeoutError:
|
|
if debug:
|
|
print(f"[debug] Timeout bei Detailseite: {detail_url}")
|
|
return None
|
|
finally:
|
|
detail_page.close()
|
|
|
|
|
|
def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]:
|
|
pdfs = set()
|
|
for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE):
|
|
href = m.group(1)
|
|
absolute = urljoin(source_url, href)
|
|
low = absolute.lower()
|
|
if ".pdf" in low or "download" in low:
|
|
pdfs.add(absolute)
|
|
return list(pdfs)
|
|
|
|
|
|
def write_debug_json(debug_json_path: Path, payload: dict) -> None:
|
|
debug_json_path.parent.mkdir(parents=True, exist_ok=True)
|
|
debug_json_path.write_text(
|
|
json.dumps(payload, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def looks_like_pdf(content: bytes) -> bool:
|
|
return content.startswith(b"%PDF-")
|
|
|
|
|
|
def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path:
|
|
safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id)
|
|
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
|
|
out = download_dir / filename
|
|
while out.exists():
|
|
sequence_no += 1
|
|
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
|
|
out = download_dir / filename
|
|
return out
|
|
|
|
|
|
def configure(args) -> None:
|
|
config = {
|
|
"marketplace": args.marketplace,
|
|
"download_dir": str(Path(args.download_dir).expanduser().resolve()),
|
|
"headless": args.headless,
|
|
"locale": args.locale,
|
|
"timezone": args.timezone,
|
|
"currency": args.currency,
|
|
"amazon_language": args.amazon_language,
|
|
}
|
|
save_config(config)
|
|
ensure_app_dir()
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False)
|
|
context = browser.new_context(**build_context_options(config))
|
|
page = context.new_page()
|
|
page.goto(
|
|
with_amazon_language(
|
|
f"https://www.amazon.{args.marketplace}/your-orders/orders",
|
|
args.amazon_language,
|
|
),
|
|
wait_until="domcontentloaded",
|
|
)
|
|
print("Bitte im Browser bei Amazon einloggen.")
|
|
if args.login_wait_seconds > 0:
|
|
print(
|
|
f"Warte {args.login_wait_seconds} Sekunden auf Login, "
|
|
"danach wird die Session automatisch gespeichert."
|
|
)
|
|
time.sleep(args.login_wait_seconds)
|
|
else:
|
|
print("Wenn Bestellungen sichtbar sind, Enter druecken.")
|
|
input()
|
|
context.storage_state(path=str(STORAGE_STATE_PATH))
|
|
browser.close()
|
|
|
|
print(f"Konfiguration gespeichert: {CONFIG_PATH}")
|
|
print(f"Session gespeichert: {STORAGE_STATE_PATH}")
|
|
|
|
|
|
def download(args) -> None:
|
|
config = load_config()
|
|
if not STORAGE_STATE_PATH.exists():
|
|
raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.")
|
|
|
|
start_date = parse_iso_date(args.date_from)
|
|
end_date = parse_iso_date(args.date_to)
|
|
if start_date > end_date:
|
|
raise SystemExit("'from' muss kleiner/gleich 'to' sein.")
|
|
|
|
marketplace = config["marketplace"]
|
|
amazon_language = config.get("amazon_language", "de_DE")
|
|
download_dir = Path(args.output or config["download_dir"]).expanduser().resolve()
|
|
download_dir.mkdir(parents=True, exist_ok=True)
|
|
debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None)
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True)))
|
|
context_options = build_context_options(config)
|
|
context_options["storage_state"] = str(STORAGE_STATE_PATH)
|
|
context = browser.new_context(**context_options)
|
|
page = context.new_page()
|
|
|
|
base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders"
|
|
invoices: list[OrderInvoice] = []
|
|
seen_invoice_urls = set()
|
|
years = years_for_range(start_date, end_date)
|
|
debug_payload = {
|
|
"requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()},
|
|
"years": years,
|
|
"pages": [],
|
|
"totals": {
|
|
"order_cards": 0,
|
|
"candidates_with_date": 0,
|
|
"candidates_in_range": 0,
|
|
},
|
|
}
|
|
|
|
for year in years:
|
|
filtered_url = build_orders_url(marketplace, year, amazon_language)
|
|
if args.debug:
|
|
print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}")
|
|
page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000)
|
|
visited_page_urls = set()
|
|
|
|
for page_idx in range(args.max_pages):
|
|
if page.url in visited_page_urls:
|
|
if args.debug:
|
|
print(f"[debug] Abbruch wegen wiederholter URL: {page.url}")
|
|
break
|
|
visited_page_urls.add(page.url)
|
|
page.wait_for_timeout(1500)
|
|
page_cards = page.locator("div.order-card")
|
|
card_count = page_cards.count()
|
|
overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug)
|
|
debug_payload["pages"].append(
|
|
{
|
|
"year": year,
|
|
"page": page_idx + 1,
|
|
"url": page.url,
|
|
"order_cards": card_count,
|
|
"overview_candidates": len(overview_candidates),
|
|
}
|
|
)
|
|
debug_payload["totals"]["order_cards"] += card_count
|
|
|
|
if args.debug:
|
|
print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)")
|
|
|
|
for candidate in overview_candidates:
|
|
debug_payload["totals"]["candidates_with_date"] += 1
|
|
if start_date <= candidate.order_date <= end_date:
|
|
filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls]
|
|
if not filtered_links:
|
|
continue
|
|
seen_invoice_urls.update(filtered_links)
|
|
invoices.append(
|
|
OrderInvoice(
|
|
order_date=candidate.order_date,
|
|
order_id=candidate.order_id,
|
|
invoice_links=filtered_links,
|
|
)
|
|
)
|
|
debug_payload["totals"]["candidates_in_range"] += 1
|
|
if args.debug:
|
|
print(
|
|
f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)"
|
|
)
|
|
|
|
next_page_url = find_next_page_url(page, base_orders_url)
|
|
if not next_page_url:
|
|
break
|
|
try:
|
|
page.goto(
|
|
with_amazon_language(next_page_url, amazon_language),
|
|
wait_until="domcontentloaded",
|
|
timeout=15000,
|
|
)
|
|
except PlaywrightTimeoutError:
|
|
break
|
|
|
|
downloaded = 0
|
|
seen_saved_urls = set()
|
|
order_file_counters: dict[str, int] = {}
|
|
for idx, order in enumerate(invoices, start=1):
|
|
for link_idx, invoice_url in enumerate(order.invoice_links, start=1):
|
|
if invoice_url in seen_saved_urls:
|
|
continue
|
|
try:
|
|
response = context.request.get(
|
|
invoice_url,
|
|
headers={"referer": page.url},
|
|
timeout=20000,
|
|
)
|
|
content_type = (response.headers.get("content-type", "") or "").lower()
|
|
body = response.body()
|
|
|
|
if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body):
|
|
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
|
|
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
|
|
out.write_bytes(body)
|
|
downloaded += 1
|
|
seen_saved_urls.add(invoice_url)
|
|
print(f"Gespeichert: {out}")
|
|
continue
|
|
|
|
if "html" in content_type:
|
|
nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url)
|
|
if args.debug:
|
|
print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)")
|
|
for pdf_link in nested_pdf_links:
|
|
if pdf_link in seen_saved_urls:
|
|
continue
|
|
pdf_resp = context.request.get(
|
|
pdf_link,
|
|
headers={"referer": invoice_url},
|
|
timeout=20000,
|
|
)
|
|
pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower()
|
|
pdf_body = pdf_resp.body()
|
|
if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body):
|
|
continue
|
|
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
|
|
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
|
|
out.write_bytes(pdf_body)
|
|
downloaded += 1
|
|
seen_saved_urls.add(pdf_link)
|
|
print(f"Gespeichert: {out}")
|
|
elif args.debug:
|
|
print(
|
|
f"[debug] Uebersprungen {invoice_url} "
|
|
f"(status={response.status}, content-type={content_type})"
|
|
)
|
|
except Exception as exc:
|
|
if args.debug:
|
|
print(f"[debug] Fehler bei {invoice_url}: {exc}")
|
|
continue
|
|
|
|
browser.close()
|
|
|
|
debug_payload["totals"]["downloaded"] = downloaded
|
|
if debug_json_target:
|
|
debug_path = Path(debug_json_target).expanduser().resolve()
|
|
write_debug_json(debug_path, debug_payload)
|
|
print(f"Debug-JSON gespeichert: {debug_path}")
|
|
|
|
print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}")
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern")
|
|
p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk")
|
|
p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen")
|
|
p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode")
|
|
p_config.add_argument("--locale", default="de-DE", help="Browser-Locale, z. B. de-DE")
|
|
p_config.add_argument("--timezone", default="Europe/Berlin", help="Zeitzone, z. B. Europe/Berlin")
|
|
p_config.add_argument("--currency", default="EUR", help="Waehrungshinweis fuer Konfiguration")
|
|
p_config.add_argument("--amazon-language", default="de_DE", help="Amazon URL-Sprache, z. B. de_DE")
|
|
p_config.add_argument(
|
|
"--login-wait-seconds",
|
|
type=int,
|
|
default=60,
|
|
help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).",
|
|
)
|
|
p_config.set_defaults(func=configure)
|
|
|
|
p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen")
|
|
p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD")
|
|
p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD")
|
|
p_dl.add_argument("--output", help="Optionales Zielverzeichnis")
|
|
p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten")
|
|
p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None)
|
|
p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks")
|
|
p_dl.add_argument(
|
|
"--debug-json",
|
|
nargs="?",
|
|
const=str(DEFAULT_DEBUG_JSON_PATH),
|
|
default=None,
|
|
help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).",
|
|
)
|
|
p_dl.set_defaults(func=download)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|