662 lines
26 KiB
Python
662 lines
26 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime
|
|
from email.message import EmailMessage
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
|
|
|
|
import dateparser
|
|
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
APP_DIR = Path.home() / ".amazon_invoice_downloader"
|
|
CONFIG_PATH = APP_DIR / "config.json"
|
|
STORAGE_STATE_PATH = APP_DIR / "storage_state.json"
|
|
DEFAULT_DEBUG_JSON_PATH = APP_DIR / "debug_last_run.json"
|
|
|
|
INVOICE_KEYWORDS = [
|
|
"invoice",
|
|
"rechnung",
|
|
"faktura",
|
|
"vat",
|
|
"steuer",
|
|
"bill",
|
|
"beleg",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class OrderInvoice:
|
|
order_date: date
|
|
order_id: str
|
|
invoice_links: list[str]
|
|
|
|
|
|
def ensure_app_dir() -> None:
|
|
APP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_config() -> dict:
|
|
if not CONFIG_PATH.exists():
|
|
raise SystemExit("Konfiguration fehlt. Bitte zuerst 'configure' ausfuehren.")
|
|
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
|
|
|
|
|
def save_config(config: dict) -> None:
|
|
ensure_app_dir()
|
|
CONFIG_PATH.write_text(json.dumps(config, indent=2), encoding="utf-8")
|
|
|
|
|
|
def build_context_options(config: dict) -> dict:
|
|
locale = config.get("locale", "de-DE")
|
|
timezone = config.get("timezone", "Europe/Berlin")
|
|
return {
|
|
"locale": locale,
|
|
"timezone_id": timezone,
|
|
"extra_http_headers": {
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
},
|
|
}
|
|
|
|
def strtobool(value: str) -> bool:
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def get_notification_settings(config: dict) -> dict:
|
|
smtp_cfg = config.get("smtp", {})
|
|
return {
|
|
"recipient": os.getenv("NOTIFY_EMAIL", config.get("notify_email", "stefan.heyn@googlemail.com")),
|
|
"smtp_host": os.getenv("SMTP_HOST", smtp_cfg.get("host", "")),
|
|
"smtp_port": int(os.getenv("SMTP_PORT", str(smtp_cfg.get("port", 587)))),
|
|
"smtp_user": os.getenv("SMTP_USER", smtp_cfg.get("user", "")),
|
|
"smtp_password": os.getenv("SMTP_PASSWORD", smtp_cfg.get("password", "")),
|
|
"smtp_from": os.getenv("SMTP_FROM", smtp_cfg.get("from_addr", "")),
|
|
"smtp_starttls": strtobool(os.getenv("SMTP_STARTTLS", str(smtp_cfg.get("starttls", True)))),
|
|
"smtp_ssl": strtobool(os.getenv("SMTP_SSL", str(smtp_cfg.get("ssl", False)))),
|
|
}
|
|
|
|
|
|
def send_notification(config: dict, subject: str, body: str) -> None:
|
|
settings = get_notification_settings(config)
|
|
recipient = settings["recipient"]
|
|
host = settings["smtp_host"]
|
|
if not host or not recipient:
|
|
return
|
|
|
|
sender = settings["smtp_from"] or settings["smtp_user"] or recipient
|
|
msg = EmailMessage()
|
|
msg["Subject"] = subject
|
|
msg["From"] = sender
|
|
msg["To"] = recipient
|
|
msg.set_content(body)
|
|
|
|
smtp_cls = smtplib.SMTP_SSL if settings["smtp_ssl"] else smtplib.SMTP
|
|
with smtp_cls(host, settings["smtp_port"], timeout=20) as smtp:
|
|
if not settings["smtp_ssl"] and settings["smtp_starttls"]:
|
|
smtp.starttls()
|
|
if settings["smtp_user"]:
|
|
smtp.login(settings["smtp_user"], settings["smtp_password"])
|
|
smtp.send_message(msg)
|
|
|
|
|
|
def is_login_page(page) -> bool:
|
|
url = page.url.lower()
|
|
if any(part in url for part in ["/ap/signin", "/signin", "openid.oa"]):
|
|
return True
|
|
email_fields = page.locator('input[type="email"], input[name="email"], #ap_email')
|
|
return email_fields.count() > 0
|
|
|
|
def parse_iso_date(value: str) -> date:
|
|
try:
|
|
return datetime.strptime(value, "%Y-%m-%d").date()
|
|
except ValueError as exc:
|
|
raise SystemExit(f"Ungueltiges Datum '{value}'. Erwartet: YYYY-MM-DD") from exc
|
|
|
|
|
|
def parse_date_from_text(text: str) -> Optional[date]:
|
|
patterns = [
|
|
r"\b\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4}\b",
|
|
r"\b[A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4}\b",
|
|
r"\b\d{1,2}/\d{1,2}/\d{4}\b",
|
|
r"\b\d{4}-\d{2}-\d{2}\b",
|
|
]
|
|
for pattern in patterns:
|
|
m = re.search(pattern, text)
|
|
if not m:
|
|
continue
|
|
parsed = dateparser.parse(
|
|
m.group(0),
|
|
languages=["de", "en"],
|
|
settings={"DATE_ORDER": "DMY"},
|
|
)
|
|
if parsed:
|
|
return parsed.date()
|
|
return None
|
|
|
|
|
|
def parse_order_date_from_text(text: str) -> Optional[date]:
|
|
focused_patterns = [
|
|
r"(Bestellt am|Bestelldatum)\s*[:\-]?\s*(\d{1,2}\.\s*[A-Za-zäöüÄÖÜ]+\.?\s*\d{4})",
|
|
r"(Order placed(?: on)?)\s*[:\-]?\s*([A-Za-zäöüÄÖÜ]+\s+\d{1,2},\s*\d{4})",
|
|
r"(Order placed(?: on)?)\s*[:\-]?\s*(\d{1,2}/\d{1,2}/\d{4})",
|
|
]
|
|
for pattern in focused_patterns:
|
|
m = re.search(pattern, text, flags=re.IGNORECASE)
|
|
if not m:
|
|
continue
|
|
parsed = dateparser.parse(
|
|
m.group(2),
|
|
languages=["de", "en"],
|
|
settings={"DATE_ORDER": "DMY"},
|
|
)
|
|
if parsed:
|
|
return parsed.date()
|
|
return parse_date_from_text(text)
|
|
|
|
|
|
def parse_order_id_from_text(text: str) -> Optional[str]:
|
|
m = re.search(r"\b\d{3}-\d{7}-\d{7}\b", text)
|
|
if m:
|
|
return m.group(0)
|
|
return None
|
|
|
|
|
|
def years_for_range(start_date: date, end_date: date) -> list[int]:
|
|
return list(range(end_date.year, start_date.year - 1, -1))
|
|
|
|
|
|
def with_amazon_language(url: str, amazon_language: str) -> str:
|
|
parsed = urlparse(url)
|
|
query = dict(parse_qsl(parsed.query, keep_blank_values=True))
|
|
query["language"] = amazon_language
|
|
return urlunparse(parsed._replace(query=urlencode(query)))
|
|
|
|
|
|
def build_orders_url(marketplace: str, year: int, amazon_language: str) -> str:
|
|
base = f"https://www.amazon.{marketplace}/your-orders/orders?timeFilter=year-{year}"
|
|
return with_amazon_language(base, amazon_language)
|
|
|
|
|
|
def text_contains_invoice_hint(text: str) -> bool:
|
|
t = text.lower()
|
|
return any(k in t for k in INVOICE_KEYWORDS)
|
|
|
|
|
|
def collect_order_detail_links(page, base_url: str) -> list[str]:
|
|
links = page.locator("a")
|
|
out: list[str] = []
|
|
seen = set()
|
|
for i in range(links.count()):
|
|
link = links.nth(i)
|
|
href = link.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
lower = href.lower()
|
|
if (
|
|
"order-details" not in lower
|
|
and "orderid=" not in lower
|
|
and "/your-orders/order-details" not in lower
|
|
):
|
|
continue
|
|
absolute = urljoin(base_url, href)
|
|
if absolute in seen:
|
|
continue
|
|
out.append(absolute)
|
|
seen.add(absolute)
|
|
return out
|
|
|
|
|
|
def find_next_page_url(page, base_url: str) -> Optional[str]:
|
|
a_last = page.locator("ul.a-pagination li.a-last a").first
|
|
if a_last.count() > 0:
|
|
href = a_last.get_attribute("href")
|
|
if href:
|
|
return urljoin(base_url, href)
|
|
|
|
fallback = page.locator('a:has-text("Weiter"), a:has-text("Next"), a:has-text("Nächste"), a:has-text("Naechste")').first
|
|
if fallback.count() > 0:
|
|
href = fallback.get_attribute("href")
|
|
if href:
|
|
return urljoin(base_url, href)
|
|
return None
|
|
|
|
|
|
def extract_invoice_links_from_scope(scope, base_url: str) -> list[str]:
|
|
links = scope.locator("a")
|
|
out: list[str] = []
|
|
seen = set()
|
|
for i in range(links.count()):
|
|
link = links.nth(i)
|
|
text = (link.inner_text(timeout=1000) or "").strip()
|
|
href = link.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
|
|
absolute = urljoin(base_url, href)
|
|
if absolute in seen:
|
|
continue
|
|
|
|
lower_href = absolute.lower()
|
|
if (
|
|
text_contains_invoice_hint(text)
|
|
or any(k in lower_href for k in ["invoice", "rechnung", "tax", "bill", "summary", "print", "beleg"])
|
|
):
|
|
out.append(absolute)
|
|
seen.add(absolute)
|
|
return out
|
|
|
|
|
|
def extract_orders_from_overview(page, base_url: str, debug: bool = False) -> list[OrderInvoice]:
|
|
cards = page.locator("div.order-card")
|
|
results: list[OrderInvoice] = []
|
|
for i in range(cards.count()):
|
|
card = cards.nth(i)
|
|
card_text = card.inner_text(timeout=2000)
|
|
order_date = parse_order_date_from_text(card_text)
|
|
if order_date is None:
|
|
if debug:
|
|
print(f"[debug] order-card ohne Datum (index={i + 1})")
|
|
continue
|
|
order_id = parse_order_id_from_text(card_text) or f"UNKNOWN-{order_date.isoformat()}"
|
|
|
|
invoice_links = extract_invoice_links_from_scope(card, base_url)
|
|
if not invoice_links:
|
|
continue
|
|
results.append(OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links))
|
|
return results
|
|
|
|
|
|
def extract_invoice_candidates_from_detail(
|
|
context,
|
|
detail_url: str,
|
|
base_url: str,
|
|
amazon_language: str,
|
|
debug: bool = False,
|
|
) -> Optional[OrderInvoice]:
|
|
detail_page = context.new_page()
|
|
try:
|
|
detail_page.goto(
|
|
with_amazon_language(detail_url, amazon_language),
|
|
wait_until="domcontentloaded",
|
|
timeout=15000,
|
|
)
|
|
detail_page.wait_for_timeout(1200)
|
|
body_text = detail_page.inner_text("body", timeout=4000)
|
|
order_date = parse_order_date_from_text(body_text)
|
|
if order_date is None:
|
|
if debug:
|
|
print(f"[debug] Kein Datum in Detailseite: {detail_url}")
|
|
return None
|
|
|
|
invoice_links = extract_invoice_links_from_scope(detail_page, base_url)
|
|
if debug:
|
|
print(f"[debug] Detailseite {detail_url} -> {len(invoice_links)} Rechnungskandidat(en)")
|
|
if not invoice_links:
|
|
return None
|
|
|
|
order_id = parse_order_id_from_text(body_text) or f"UNKNOWN-{order_date.isoformat()}"
|
|
return OrderInvoice(order_date=order_date, order_id=order_id, invoice_links=invoice_links)
|
|
except PlaywrightTimeoutError:
|
|
if debug:
|
|
print(f"[debug] Timeout bei Detailseite: {detail_url}")
|
|
return None
|
|
finally:
|
|
detail_page.close()
|
|
|
|
|
|
def extract_pdf_links_from_html(html: str, source_url: str) -> list[str]:
|
|
pdfs = set()
|
|
for m in re.finditer(r'href=["\']([^"\']+)["\']', html, flags=re.IGNORECASE):
|
|
href = m.group(1)
|
|
absolute = urljoin(source_url, href)
|
|
low = absolute.lower()
|
|
if ".pdf" in low or "download" in low:
|
|
pdfs.add(absolute)
|
|
return list(pdfs)
|
|
|
|
|
|
def write_debug_json(debug_json_path: Path, payload: dict) -> None:
|
|
debug_json_path.parent.mkdir(parents=True, exist_ok=True)
|
|
debug_json_path.write_text(
|
|
json.dumps(payload, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def looks_like_pdf(content: bytes) -> bool:
|
|
return content.startswith(b"%PDF-")
|
|
|
|
|
|
def build_output_path(download_dir: Path, order_date: date, order_id: str, sequence_no: int) -> Path:
|
|
safe_order_id = re.sub(r"[^A-Za-z0-9-]", "_", order_id)
|
|
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
|
|
out = download_dir / filename
|
|
while out.exists():
|
|
sequence_no += 1
|
|
filename = f"{order_date.isoformat()}_{safe_order_id}_{sequence_no:03d}.pdf"
|
|
out = download_dir / filename
|
|
return out
|
|
|
|
|
|
def configure(args) -> None:
|
|
config = {
|
|
"marketplace": args.marketplace,
|
|
"download_dir": str(Path(args.download_dir).expanduser().resolve()),
|
|
"headless": args.headless,
|
|
"locale": args.locale,
|
|
"timezone": args.timezone,
|
|
"currency": args.currency,
|
|
"amazon_language": args.amazon_language,
|
|
"notify_email": args.notify_email,
|
|
"smtp": {
|
|
"host": args.smtp_host,
|
|
"port": args.smtp_port,
|
|
"user": args.smtp_user,
|
|
"password": args.smtp_password,
|
|
"from_addr": args.smtp_from,
|
|
"starttls": not args.smtp_no_starttls,
|
|
"ssl": args.smtp_ssl,
|
|
},
|
|
}
|
|
save_config(config)
|
|
ensure_app_dir()
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False)
|
|
context = browser.new_context(**build_context_options(config))
|
|
page = context.new_page()
|
|
page.goto(
|
|
with_amazon_language(
|
|
f"https://www.amazon.{args.marketplace}/your-orders/orders",
|
|
args.amazon_language,
|
|
),
|
|
wait_until="domcontentloaded",
|
|
)
|
|
print("Bitte im Browser bei Amazon einloggen.")
|
|
if args.login_wait_seconds > 0:
|
|
print(
|
|
f"Warte {args.login_wait_seconds} Sekunden auf Login, "
|
|
"danach wird die Session automatisch gespeichert."
|
|
)
|
|
time.sleep(args.login_wait_seconds)
|
|
else:
|
|
print("Wenn Bestellungen sichtbar sind, Enter druecken.")
|
|
input()
|
|
context.storage_state(path=str(STORAGE_STATE_PATH))
|
|
browser.close()
|
|
|
|
print(f"Konfiguration gespeichert: {CONFIG_PATH}")
|
|
print(f"Session gespeichert: {STORAGE_STATE_PATH}")
|
|
|
|
|
|
def download(args) -> None:
|
|
config = load_config()
|
|
if not STORAGE_STATE_PATH.exists():
|
|
raise SystemExit("Session fehlt. Bitte zuerst 'configure' ausfuehren.")
|
|
|
|
start_date = parse_iso_date(args.date_from)
|
|
end_date = parse_iso_date(args.date_to)
|
|
if start_date > end_date:
|
|
raise SystemExit("'from' muss kleiner/gleich 'to' sein.")
|
|
|
|
marketplace = config["marketplace"]
|
|
amazon_language = config.get("amazon_language", "de_DE")
|
|
download_dir = Path(args.output or config["download_dir"]).expanduser().resolve()
|
|
download_dir.mkdir(parents=True, exist_ok=True)
|
|
debug_json_target = args.debug_json or (str(DEFAULT_DEBUG_JSON_PATH) if args.debug else None)
|
|
recipient = get_notification_settings(config).get("recipient", "")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=args.headless if args.headless is not None else bool(config.get("headless", True)))
|
|
context_options = build_context_options(config)
|
|
context_options["storage_state"] = str(STORAGE_STATE_PATH)
|
|
context = browser.new_context(**context_options)
|
|
page = context.new_page()
|
|
|
|
base_orders_url = f"https://www.amazon.{marketplace}/your-orders/orders"
|
|
invoices: list[OrderInvoice] = []
|
|
seen_invoice_urls = set()
|
|
years = years_for_range(start_date, end_date)
|
|
debug_payload = {
|
|
"requested_range": {"from": start_date.isoformat(), "to": end_date.isoformat()},
|
|
"years": years,
|
|
"pages": [],
|
|
"totals": {
|
|
"order_cards": 0,
|
|
"candidates_with_date": 0,
|
|
"candidates_in_range": 0,
|
|
},
|
|
}
|
|
|
|
for year in years:
|
|
filtered_url = build_orders_url(marketplace, year, amazon_language)
|
|
if args.debug:
|
|
print(f"[debug] Wechsle auf Jahresfilter {year}: {filtered_url}")
|
|
page.goto(filtered_url, wait_until="domcontentloaded", timeout=15000)
|
|
if is_login_page(page):
|
|
msg = (
|
|
"Amazon-Session ist abgelaufen oder Login wurde angefordert.\n"
|
|
f"URL: {page.url}\n"
|
|
f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n"
|
|
"Bitte 'configure' erneut ausfuehren."
|
|
)
|
|
try:
|
|
send_notification(
|
|
config,
|
|
subject="Amazon Invoice Downloader: Session abgelaufen",
|
|
body=msg,
|
|
)
|
|
except Exception as notify_exc:
|
|
print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}")
|
|
raise SystemExit(
|
|
"Session abgelaufen. Bitte 'configure' erneut ausfuehren."
|
|
+ (f" Benachrichtigung an {recipient} gesendet." if recipient else "")
|
|
)
|
|
visited_page_urls = set()
|
|
|
|
for page_idx in range(args.max_pages):
|
|
if page.url in visited_page_urls:
|
|
if args.debug:
|
|
print(f"[debug] Abbruch wegen wiederholter URL: {page.url}")
|
|
break
|
|
visited_page_urls.add(page.url)
|
|
page.wait_for_timeout(1500)
|
|
page_cards = page.locator("div.order-card")
|
|
card_count = page_cards.count()
|
|
overview_candidates = extract_orders_from_overview(page, base_orders_url, debug=args.debug)
|
|
debug_payload["pages"].append(
|
|
{
|
|
"year": year,
|
|
"page": page_idx + 1,
|
|
"url": page.url,
|
|
"order_cards": card_count,
|
|
"overview_candidates": len(overview_candidates),
|
|
}
|
|
)
|
|
debug_payload["totals"]["order_cards"] += card_count
|
|
|
|
if args.debug:
|
|
print(f"[debug] Jahr {year}, Seite {page_idx + 1}: {card_count} order-card(s), {len(overview_candidates)} Kandidat(en)")
|
|
|
|
for candidate in overview_candidates:
|
|
debug_payload["totals"]["candidates_with_date"] += 1
|
|
if start_date <= candidate.order_date <= end_date:
|
|
filtered_links = [u for u in candidate.invoice_links if u not in seen_invoice_urls]
|
|
if not filtered_links:
|
|
continue
|
|
seen_invoice_urls.update(filtered_links)
|
|
invoices.append(
|
|
OrderInvoice(
|
|
order_date=candidate.order_date,
|
|
order_id=candidate.order_id,
|
|
invoice_links=filtered_links,
|
|
)
|
|
)
|
|
debug_payload["totals"]["candidates_in_range"] += 1
|
|
if args.debug:
|
|
print(
|
|
f"[debug] Treffer {candidate.order_date.isoformat()} mit {len(filtered_links)} Link(s)"
|
|
)
|
|
|
|
next_page_url = find_next_page_url(page, base_orders_url)
|
|
if not next_page_url:
|
|
break
|
|
try:
|
|
page.goto(
|
|
with_amazon_language(next_page_url, amazon_language),
|
|
wait_until="domcontentloaded",
|
|
timeout=15000,
|
|
)
|
|
if is_login_page(page):
|
|
msg = (
|
|
"Amazon-Session ist waehrend der Pagination abgelaufen.\n"
|
|
f"URL: {page.url}\n"
|
|
f"Zeitraum: {start_date.isoformat()} bis {end_date.isoformat()}\n"
|
|
"Bitte 'configure' erneut ausfuehren."
|
|
)
|
|
try:
|
|
send_notification(
|
|
config,
|
|
subject="Amazon Invoice Downloader: Session waehrend Download abgelaufen",
|
|
body=msg,
|
|
)
|
|
except Exception as notify_exc:
|
|
print(f"[warn] E-Mail-Benachrichtigung fehlgeschlagen: {notify_exc}")
|
|
raise SystemExit(
|
|
"Session abgelaufen. Bitte 'configure' erneut ausfuehren."
|
|
+ (f" Benachrichtigung an {recipient} gesendet." if recipient else "")
|
|
)
|
|
except PlaywrightTimeoutError:
|
|
break
|
|
|
|
downloaded = 0
|
|
seen_saved_urls = set()
|
|
order_file_counters: dict[str, int] = {}
|
|
for idx, order in enumerate(invoices, start=1):
|
|
for link_idx, invoice_url in enumerate(order.invoice_links, start=1):
|
|
if invoice_url in seen_saved_urls:
|
|
continue
|
|
try:
|
|
response = context.request.get(
|
|
invoice_url,
|
|
headers={"referer": page.url},
|
|
timeout=20000,
|
|
)
|
|
content_type = (response.headers.get("content-type", "") or "").lower()
|
|
body = response.body()
|
|
|
|
if "pdf" in content_type or invoice_url.lower().endswith(".pdf") or looks_like_pdf(body):
|
|
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
|
|
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
|
|
out.write_bytes(body)
|
|
downloaded += 1
|
|
seen_saved_urls.add(invoice_url)
|
|
print(f"Gespeichert: {out}")
|
|
continue
|
|
|
|
if "html" in content_type:
|
|
nested_pdf_links = extract_pdf_links_from_html(response.text(), invoice_url)
|
|
if args.debug:
|
|
print(f"[debug] HTML-Seite {invoice_url} -> {len(nested_pdf_links)} PDF-Link(s)")
|
|
for pdf_link in nested_pdf_links:
|
|
if pdf_link in seen_saved_urls:
|
|
continue
|
|
pdf_resp = context.request.get(
|
|
pdf_link,
|
|
headers={"referer": invoice_url},
|
|
timeout=20000,
|
|
)
|
|
pdf_type = (pdf_resp.headers.get("content-type", "") or "").lower()
|
|
pdf_body = pdf_resp.body()
|
|
if "pdf" not in pdf_type and not pdf_link.lower().endswith(".pdf") and not looks_like_pdf(pdf_body):
|
|
continue
|
|
order_file_counters[order.order_id] = order_file_counters.get(order.order_id, 0) + 1
|
|
out = build_output_path(download_dir, order.order_date, order.order_id, order_file_counters[order.order_id])
|
|
out.write_bytes(pdf_body)
|
|
downloaded += 1
|
|
seen_saved_urls.add(pdf_link)
|
|
print(f"Gespeichert: {out}")
|
|
elif args.debug:
|
|
print(
|
|
f"[debug] Uebersprungen {invoice_url} "
|
|
f"(status={response.status}, content-type={content_type})"
|
|
)
|
|
except Exception as exc:
|
|
if args.debug:
|
|
print(f"[debug] Fehler bei {invoice_url}: {exc}")
|
|
continue
|
|
|
|
browser.close()
|
|
|
|
debug_payload["totals"]["downloaded"] = downloaded
|
|
if debug_json_target:
|
|
debug_path = Path(debug_json_target).expanduser().resolve()
|
|
write_debug_json(debug_path, debug_payload)
|
|
print(f"Debug-JSON gespeichert: {debug_path}")
|
|
|
|
print(f"Fertig. Heruntergeladene Rechnungen: {downloaded}")
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Amazon-Rechnungen eines Zeitraums herunterladen")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p_config = sub.add_parser("configure", help="Marketplace/Download konfigurieren und Login-Session speichern")
|
|
p_config.add_argument("--marketplace", default="de", help="z. B. de, com, co.uk")
|
|
p_config.add_argument("--download-dir", default="~/Downloads/amazon_rechnungen")
|
|
p_config.add_argument("--headless", action="store_true", help="Standard fuer Download-Lauf im Headless-Mode")
|
|
p_config.add_argument("--locale", default="de-DE", help="Browser-Locale, z. B. de-DE")
|
|
p_config.add_argument("--timezone", default="Europe/Berlin", help="Zeitzone, z. B. Europe/Berlin")
|
|
p_config.add_argument("--currency", default="EUR", help="Waehrungshinweis fuer Konfiguration")
|
|
p_config.add_argument("--amazon-language", default="de_DE", help="Amazon URL-Sprache, z. B. de_DE")
|
|
p_config.add_argument("--notify-email", default="stefan.heyn@googlemail.com", help="Empfaenger fuer Ablauf-Benachrichtigungen")
|
|
p_config.add_argument("--smtp-host", default="", help="SMTP-Server, z. B. smtp.gmail.com")
|
|
p_config.add_argument("--smtp-port", type=int, default=587, help="SMTP-Port, Standard 587")
|
|
p_config.add_argument("--smtp-user", default="", help="SMTP-Benutzer")
|
|
p_config.add_argument("--smtp-password", default="", help="SMTP-Passwort oder App-Passwort")
|
|
p_config.add_argument("--smtp-from", default="", help="Absenderadresse (optional)")
|
|
p_config.add_argument("--smtp-ssl", action="store_true", help="SMTP ueber SSL (typisch Port 465)")
|
|
p_config.add_argument("--smtp-no-starttls", action="store_true", help="STARTTLS deaktivieren")
|
|
p_config.add_argument(
|
|
"--login-wait-seconds",
|
|
type=int,
|
|
default=60,
|
|
help="Optional: wartet X Sekunden vor Session-Speicherung (fuer noVNC/Serverbetrieb).",
|
|
)
|
|
p_config.set_defaults(func=configure)
|
|
|
|
p_dl = sub.add_parser("download", help="Rechnungen nach Zeitraum herunterladen")
|
|
p_dl.add_argument("--from", dest="date_from", required=True, help="Startdatum YYYY-MM-DD")
|
|
p_dl.add_argument("--to", dest="date_to", required=True, help="Enddatum YYYY-MM-DD")
|
|
p_dl.add_argument("--output", help="Optionales Zielverzeichnis")
|
|
p_dl.add_argument("--max-pages", type=int, default=25, help="Maximal zu scannende Bestellseiten")
|
|
p_dl.add_argument("--headless", type=lambda s: s.lower() in {"1", "true", "yes"}, nargs="?", const=True, default=None)
|
|
p_dl.add_argument("--debug", action="store_true", help="Zeigt gefundene Detail- und Rechnungslinks")
|
|
p_dl.add_argument(
|
|
"--debug-json",
|
|
nargs="?",
|
|
const=str(DEFAULT_DEBUG_JSON_PATH),
|
|
default=None,
|
|
help="Schreibt Laufdetails als JSON (optional mit Pfad, sonst Standarddatei).",
|
|
)
|
|
p_dl.set_defaults(func=download)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|