Source code for scrapy_zyte_api._session

import json
from asyncio import Task, create_task, sleep
from collections import defaultdict, deque
from copy import deepcopy
from functools import partial
from logging import getLogger
from typing import (
    Any,
    DefaultDict,
    Deque,
    Dict,
    List,
    Optional,
    Set,
    Type,
    TypeVar,
    Union,
    cast,
)
from uuid import uuid4
from weakref import WeakKeyDictionary

from scrapy import Request, Spider
from scrapy.crawler import Crawler
from scrapy.exceptions import CloseSpider, IgnoreRequest
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.misc import create_instance, load_object
from scrapy.utils.python import global_object_name
from tenacity import stop_after_attempt
from zyte_api import RequestError, RetryFactory

from scrapy_zyte_api.utils import _DOWNLOAD_NEEDS_SPIDER

logger = getLogger(__name__)
SESSION_INIT_META_KEY = "_is_session_init_request"
ZYTE_API_META_KEYS = ("zyte_api", "zyte_api_automap", "zyte_api_provider")


[docs] def is_session_init_request(request): """Return ``True`` if the request is a :ref:`session initialization request <session-init>` or ``False`` otherwise.""" return request.meta.get(SESSION_INIT_META_KEY, False) is True
class SessionRetryFactory(RetryFactory): temporary_download_error_stop = stop_after_attempt(1) SESSION_DEFAULT_RETRY_POLICY = SessionRetryFactory().build() try: from zyte_api import AggressiveRetryFactory, stop_on_count except ImportError: SESSION_AGGRESSIVE_RETRY_POLICY = SESSION_DEFAULT_RETRY_POLICY else: class AggressiveSessionRetryFactory(AggressiveRetryFactory): download_error_stop = stop_on_count(1) SESSION_AGGRESSIVE_RETRY_POLICY = AggressiveSessionRetryFactory().build() try: from scrapy_poet import DummyResponse except ImportError: class DummyResponse: # type: ignore[no-redef] pass try: from scrapy.downloadermiddlewares.retry import get_retry_request except ImportError: # pragma: no cover # https://github.com/scrapy/scrapy/blob/b1fe97dc6c8509d58b29c61cf7801eeee1b409a9/scrapy/downloadermiddlewares/retry.py#L57-L142 def get_retry_request( # type: ignore[misc] request, *, spider, reason="unspecified", max_retry_times=None, priority_adjust=None, stats_base_key="retry", ): settings = spider.crawler.settings assert spider.crawler.stats stats = spider.crawler.stats retry_times = request.meta.get("retry_times", 0) + 1 if max_retry_times is None: max_retry_times = request.meta.get("max_retry_times") if max_retry_times is None: max_retry_times = settings.getint("RETRY_TIMES") if retry_times <= max_retry_times: logger.debug( "Retrying %(request)s (failed %(retry_times)d times): %(reason)s", {"request": request, "retry_times": retry_times, "reason": reason}, extra={"spider": spider}, ) new_request: Request = request.copy() new_request.meta["retry_times"] = retry_times new_request.dont_filter = True if priority_adjust is None: priority_adjust = settings.getint("RETRY_PRIORITY_ADJUST") new_request.priority = request.priority + priority_adjust if callable(reason): reason = reason() if isinstance(reason, Exception): reason = global_object_name(reason.__class__) stats.inc_value(f"{stats_base_key}/count") stats.inc_value(f"{stats_base_key}/reason_count/{reason}") return new_request stats.inc_value(f"{stats_base_key}/max_reached") logger.error( "Gave up retrying %(request)s (failed %(retry_times)d times): " "%(reason)s", {"request": request, "retry_times": retry_times, "reason": reason}, extra={"spider": spider}, ) return None try: from scrapy.http.request import NO_CALLBACK except ImportError: def NO_CALLBACK(response): # type: ignore[misc] pass # pragma: no cover try: from scrapy.utils.defer import deferred_to_future except ImportError: # pragma: no cover import asyncio from warnings import catch_warnings, filterwarnings # https://github.com/scrapy/scrapy/blob/b1fe97dc6c8509d58b29c61cf7801eeee1b409a9/scrapy/utils/reactor.py#L119-L147 def set_asyncio_event_loop(): try: with catch_warnings(): # In Python 3.10.9, 3.11.1, 3.12 and 3.13, a DeprecationWarning # is emitted about the lack of a current event loop, because in # Python 3.14 and later `get_event_loop` will raise a # RuntimeError in that event. Because our code is already # prepared for that future behavior, we ignore the deprecation # warning. filterwarnings( "ignore", message="There is no current event loop", category=DeprecationWarning, ) event_loop = asyncio.get_event_loop() except RuntimeError: # `get_event_loop` raises RuntimeError when called with no asyncio # event loop yet installed in the following scenarios: # - Previsibly on Python 3.14 and later. # https://github.com/python/cpython/issues/100160#issuecomment-1345581902 event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) return event_loop # https://github.com/scrapy/scrapy/blob/b1fe97dc6c8509d58b29c61cf7801eeee1b409a9/scrapy/utils/reactor.py#L115-L116 def _get_asyncio_event_loop(): return set_asyncio_event_loop() # https://github.com/scrapy/scrapy/blob/b1fe97dc6c8509d58b29c61cf7801eeee1b409a9/scrapy/utils/defer.py#L360-L379 def deferred_to_future(d): # type: ignore[misc] return d.asFuture(_get_asyncio_event_loop()) try: from scrapy.utils.misc import build_from_crawler except ImportError: T = TypeVar("T") def build_from_crawler( objcls: Type[T], crawler: Crawler, /, *args: Any, **kwargs: Any ) -> T: return create_instance(objcls, settings=None, crawler=crawler, *args, **kwargs) # type: ignore[misc] class PoolError(ValueError): pass class TooManyBadSessionInits(RuntimeError): pass
[docs] class SessionConfig: """Default session configuration for :ref:`scrapy-zyte-api sessions <session>`.""" #: List of address fields to use when available, and their order, when #: :ref:`creating a pool ID for a request <session-pools>` based on the #: content of the :reqmeta:`zyte_api_session_location` metadata key. See #: :meth:`pool`. ADDRESS_FIELDS: List[str] = [ "addressCountry", "addressRegion", "postalCode", "streetAddress", ] @classmethod def from_crawler(cls, crawler): return cls(crawler) def __init__(self, crawler): self.crawler = crawler settings = crawler.settings self._setting_location = settings.getdict("ZYTE_API_SESSION_LOCATION") self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS") checker_cls = settings.get("ZYTE_API_SESSION_CHECKER", None) if checker_cls: self._checker = build_from_crawler(load_object(checker_cls), crawler) else: self._checker = None self._enabled = crawler.settings.getbool("ZYTE_API_SESSION_ENABLED", False) self._pool_counters = defaultdict(int) self._param_pools: DefaultDict[str, Dict[str, int]] = defaultdict(dict)
[docs] def enabled(self, request: Request) -> bool: """Return ``True`` if the request should use sessions from :ref:`session management <session>` or ``False`` otherwise. The default implementation is based on settings and request metadata keys as described in :ref:`enable-sessions`. """ return request.meta.get("zyte_api_session_enabled", self._enabled)
[docs] def pool(self, request: Request) -> str: """Return the ID of the session pool to use for *request*. The main aspects of the default implementation are described in :ref:`session-pools`. When the :reqmeta:`zyte_api_session_params` request metadata key is used, the pool ID is the target domain followed by an integer between brackets (e.g. ``example.com[0]``), and a log message indicates which session initialization parameters are associated with that pool ID. When the :reqmeta:`zyte_api_session_location` request metadata key is used, the pool ID is the target domain followed by an at sign and the comma-separated values of the non-empty fields from :data:`ADDRESS_FIELDS` (e.g. ``example.com@US,NY,10001``). """ meta_pool = request.meta.get("zyte_api_session_pool", "") if meta_pool: return meta_pool netloc = urlparse_cached(request).netloc meta_params = request.meta.get("zyte_api_session_params", None) if meta_params: param_key = json.dumps(meta_params, sort_keys=True) try: index = self._param_pools[netloc][param_key] except KeyError: index = self._pool_counters[netloc] logger.info( f"Session pool {netloc}[{index}] uses these session " f"initialization parameters: {meta_params}" ) self._pool_counters[netloc] += 1 self._param_pools[netloc][param_key] = index return f"{netloc}[{index}]" meta_location = request.meta.get("zyte_api_session_location", None) if meta_location: location_id = ",".join( [meta_location[k] for k in self.ADDRESS_FIELDS if k in meta_location] ) return f"{netloc}@{location_id}" return netloc
[docs] def location(self, request: Request) -> Dict[str, str]: """Return the address :class:`dict` to use for location-based session initialization for *request*. The default implementation is based on settings and request metadata keys as described in :ref:`session-init`. When overriding this method, you should only return a custom value if the default implementation returns an empty :class:`dict`, e.g. .. code-block:: python def location(self, request: Request) -> Dict[str, str]: fallback = {"addressCountry": "US", "addressRegion": "NY", "postalCode": "10001"} return super().location(request) or fallback .. note:: An implementation of :meth:`~scrapy_zyte_api.SessionConfig.location` can technically override :reqmeta:`zyte_api_session_location` or :setting:`ZYTE_API_SESSION_LOCATION`, but it is not recommended as it breaks the :ref:`precedence chain that users expect <session-init>`. You should only override this method if you need a location to be used even when no location is specified through request metadata or settings. It can be specially useful if you can determine the right location to use based on the request, e.g. .. code-block:: python def location(self, request: Request) -> Dict[str, str]: fallback = {} if postal_code := w3lib.url.url_query_parameter(request.url, "postalCode"): fallback["postalCode"] = postal_code return super().location(request) or fallback Same as with :reqmeta:`zyte_api_session_location` and :setting:`ZYTE_API_SESSION_LOCATION`, the returned location fields should match those of the ``address`` parameter of the ``setLocation`` :http:`action <request:actions>` where possible, even when using an implementation of :meth:`params` that does not rely on the ``setLocation`` action. """ return request.meta.get("zyte_api_session_location", self._setting_location)
[docs] def params(self, request: Request) -> Dict[str, Any]: """Return the Zyte API request parameters to use to initialize a session for *request*. The default implementation is based on settings and request metadata keys as described in :ref:`session-init`. When overriding this method, you should return parameters for the target location, i.e. the output of :meth:`location`, unless that output is an empty :class:`dict`, e.g. .. code-block:: python def params(self, request: Request) -> Dict[str, Any]: if location := self.location(request): return { "url": "https://example.com/new-session/for-country", "httpResponseBody": True, "httpRequestMethod": "POST", "httpRequestText": location["addressCountry"], } return { "url": "https://example.com/new-session", "httpResponseBody": True, } The returned parameters do not need to include :http:`request:url`. If missing, it is picked from the request :ref:`triggering a session initialization request <pool-size>`. .. seealso:: :class:`~scrapy_zyte_api.LocationSessionConfig` """ if location := self.location(request): return { "browserHtml": True, "actions": [ { "action": "setLocation", "address": location, } ], } return {"browserHtml": True}
[docs] def check(self, response: Response, request: Request) -> bool: """Return ``True`` if the session used to fetch *response* should be kept, return ``False`` if it should be discarded, or raise :exc:`~scrapy.exceptions.CloseSpider` if the spider should be closed. The default implementation checks the outcome of the ``setLocation`` action if a location was defined, as described in :ref:`session-check`. If you need to tell whether *request* is a :ref:`session initialization request <session-init>` or not, use :func:`~scrapy_zyte_api.is_session_init_request`. .. seealso:: :class:`~scrapy_zyte_api.LocationSessionConfig` """ if self._checker: return self._checker.check(response, request) location = self.location(request) if not location: return True for action in response.raw_api_response.get("actions", []): # type: ignore[attr-defined] if action.get("action", None) != "setLocation": continue if action.get("error", "").startswith("Action setLocation not supported "): logger.error( f"Stopping the spider, tried to use the setLocation " f"action on an unsupported website " f"({urlparse_cached(request).netloc})." ) raise CloseSpider("unsupported_set_location") return action.get("status", None) == "success" return True
try: from web_poet import RulesRegistry except ImportError: class SessionConfigRulesRegistry: def session_config_cls(self, request: Request) -> Type[SessionConfig]: return SessionConfig def session_config( self, include, *, instead_of: Optional[Type] = SessionConfig, exclude=None, priority: int = 500, **kwargs, ): """Mark the decorated :class:`SessionConfig` subclass as the :ref:`session config <session-configs>` to use for the specified URL patterns. Usage example: .. code-block:: python from typing import Any, Dict from scrapy import Request from scrapy.http.response import Response from scrapy_zyte_api import SessionConfig, session_config @session_config(["ecommerce.de.example, ecommerce.us.example"]) class EcommerceExampleSessionConfig(SessionConfig): def pool(self, request: Request) -> str: return "ecommerce.example" def params(self, request: Request) -> Dict[str, Any]: if location := self.location(request): return { "url": request.url, "browserHtml": True, "actions": [ { "action": "type", "selector": {"type": "css", "value": ".zipcode"}, "text": location["postalCode"], }, { "action": "click", "selector": {"type": "css", "value": "[type='submit']"}, }, ], } return super().params(request) def check(self, response: Response, request: Request) -> bool: if location := self.location(request): return response.css(".zipcode::text").get() == location["postalCode"] return super().check(response, request) Your :class:`~scrapy_zyte_api.SessionConfig` subclass must be defined in a module that gets imported at run time. See ``SCRAPY_POET_DISCOVER`` in the :ref:`scrapy-poet setting reference <scrapy-poet:settings>`. The parameters of this decorator are those of :func:`web_poet.handle_urls`, only *instead_of* is :class:`SessionConfig` by default, *to_return* is not supported, and session configs are registered in their own rule registry. """ raise RuntimeError( "To use the @session_config decorator you first must install " "web-poet." ) else: from url_matcher import Patterns from web_poet import ApplyRule from web_poet.rules import Strings class SessionConfigRulesRegistry(RulesRegistry): # type: ignore[no-redef] def __init__(self): rules = [ApplyRule(for_patterns=Patterns(include=[""]), use=SessionConfig)] # type: ignore[arg-type] super().__init__(rules=rules) def session_config_cls(self, request: Request) -> Type[SessionConfig]: cls = SessionConfig overrides: Dict[Type[SessionConfig], Type[SessionConfig]] = self.overrides_for(request.url) # type: ignore[assignment] while cls in overrides: cls = overrides[cls] return cls def session_config( self, include: Strings, *, instead_of: Optional[Type[SessionConfig]] = SessionConfig, exclude: Optional[Strings] = None, priority: int = 500, **kwargs, ): return self.handle_urls( include=include, instead_of=instead_of, # type: ignore[arg-type] exclude=exclude, priority=priority, **kwargs, ) class FatalErrorHandler: def __init__(self, crawler): self.crawler = crawler def __enter__(self): return None def __exit__(self, exc_type, exc, tb): if exc_type is None: return from twisted.internet import reactor from twisted.internet.interfaces import IReactorCore reactor = cast(IReactorCore, reactor) close = partial( reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider ) if issubclass(exc_type, TooManyBadSessionInits): close("bad_session_inits") elif issubclass(exc_type, PoolError): close("pool_error") elif issubclass(exc_type, CloseSpider): close(exc.reason) session_config_registry = SessionConfigRulesRegistry() session_config = session_config_registry.session_config class _SessionManager: def __init__(self, crawler: Crawler): self._crawler = crawler settings = crawler.settings pool_size = settings.getint("ZYTE_API_SESSION_POOL_SIZE", 8) self._pending_initial_sessions: Dict[str, int] = defaultdict(lambda: pool_size) pool_sizes = settings.getdict("ZYTE_API_SESSION_POOL_SIZES", {}) for pool, size in pool_sizes.items(): self._pending_initial_sessions[pool] = size self._max_errors = settings.getint("ZYTE_API_SESSION_MAX_ERRORS", 1) self._errors: Dict[str, int] = defaultdict(int) max_bad_inits = settings.getint("ZYTE_API_SESSION_MAX_BAD_INITS", 8) self._max_bad_inits: Dict[str, int] = defaultdict(lambda: max_bad_inits) max_bad_inits_per_pool = settings.getdict( "ZYTE_API_SESSION_MAX_BAD_INITS_PER_POOL", {} ) for pool, pool_max_bad_inits in max_bad_inits_per_pool.items(): self._max_bad_inits[pool] = pool_max_bad_inits self._bad_inits: Dict[str, int] = defaultdict(int) # Transparent mode, needed to determine whether to set the session # using ``zyte_api`` or ``zyte_api_automap``. self._transparent_mode: bool = settings.getbool( "ZYTE_API_TRANSPARENT_MODE", False ) # Each pool contains the IDs of sessions that have not expired yet. # # While the initial sessions of a pool have not all been started, for # every request needing a session, a new session is initialized and # then added to the pool. # # Once a pool is full, sessions are picked from the pool queue, which # should contain all pool sessions that have been initialized. # # As soon as a session expires, it is removed from its pool, and a task # to initialize that new session is started. self._pools: Dict[str, Set[str]] = defaultdict(set) self._pool_cache: WeakKeyDictionary[Request, str] = WeakKeyDictionary() # The queue is a rotating list of session IDs to use. # # The way to use the queue is to get a session ID with popleft(), and # put it back to the end of the queue with append(). # # The queue may contain session IDs from expired sessions. If the # popped session ID cannot be found in the pool, then it should be # discarded instead of being put back in the queue. # # When a new session ID is added to the pool, it is still not added to # the queue until the session is actually initialized, when it is # appended to the queue. # # If the queue is empty, sleep and try again. Sessions from the pool # will be appended to the queue as they are initialized and ready to # use. self._queues: Dict[str, Deque[str]] = defaultdict(deque) self._queue_max_attempts = settings.getint( "ZYTE_API_SESSION_QUEUE_MAX_ATTEMPTS", 60 ) self._queue_wait_time = settings.getfloat( "ZYTE_API_SESSION_QUEUE_WAIT_TIME", 1.0 ) # Contains the on-going tasks to create new sessions. # # Keeping a reference to those tasks until they are done is necessary # to prevent garbage collection to remove the tasks. self._init_tasks: Set[Task] = set() self._session_config_cache: WeakKeyDictionary[Request, SessionConfig] = ( WeakKeyDictionary() ) self._session_config_map: Dict[Type[SessionConfig], SessionConfig] = {} self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS") self._fatal_error_handler = FatalErrorHandler(crawler) def _get_session_config(self, request: Request) -> SessionConfig: try: return self._session_config_cache[request] except KeyError: cls = session_config_registry.session_config_cls(request) if cls not in self._session_config_map: self._session_config_map[cls] = build_from_crawler(cls, self._crawler) self._session_config_cache[request] = self._session_config_map[cls] return self._session_config_map[cls] def _get_pool(self, request): try: return self._pool_cache[request] except KeyError: session_config = self._get_session_config(request) try: pool = session_config.pool(request) except Exception: raise PoolError self._pool_cache[request] = pool return pool async def _init_session(self, session_id: str, request: Request, pool: str) -> bool: assert self._crawler.engine assert self._crawler.stats session_config = self._get_session_config(request) if meta_params := request.meta.get("zyte_api_session_params", None): session_params = meta_params elif ( not request.meta.get("zyte_api_session_location", None) and self._setting_params ): session_params = self._setting_params else: try: session_params = session_config.params(request) except Exception: self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/init/param-error" ) logger.exception( f"Unexpected exception raised while obtaining session " f"initialization parameters for request {request}." ) return False session_params = deepcopy(session_params) session_init_url = session_params.pop("url", request.url) spider = self._crawler.spider session_init_request = Request( session_init_url, meta={ SESSION_INIT_META_KEY: True, "dont_merge_cookies": True, "zyte_api": {**session_params, "session": {"id": session_id}}, **{ k: v for k, v in request.meta.items() if k in {"zyte_api_session_location", "zyte_api_session_params"} }, }, callback=NO_CALLBACK, ) if _DOWNLOAD_NEEDS_SPIDER: deferred = self._crawler.engine.download( # type: ignore[call-arg] session_init_request, spider=spider ) else: deferred = self._crawler.engine.download(session_init_request) try: response = await deferred_to_future(deferred) except Exception: self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/init/failed" ) return False else: try: result = session_config.check(response, session_init_request) except CloseSpider: raise except Exception: self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/init/check-error" ) logger.exception( f"Unexpected exception raised while checking session " f"validity on response {response}." ) return False outcome = "passed" if result else "failed" self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/init/check-{outcome}" ) return result async def _create_session(self, request: Request, pool: str) -> str: with self._fatal_error_handler: while True: session_id = str(uuid4()) session_init_succeeded = await self._init_session( session_id, request, pool ) if session_init_succeeded: self._pools[pool].add(session_id) self._bad_inits[pool] = 0 break self._bad_inits[pool] += 1 if self._bad_inits[pool] >= self._max_bad_inits[pool]: raise TooManyBadSessionInits self._queues[pool].append(session_id) return session_id async def _next_from_queue(self, request: Request, pool: str) -> str: session_id = None attempts = 0 while session_id not in self._pools[pool]: # After 1st loop: invalid session. try: session_id = self._queues[pool].popleft() except IndexError: # No ready-to-use session available. attempts += 1 if attempts >= self._queue_max_attempts: raise RuntimeError( f"Could not get a session ID from the session " f"rotation queue after {attempts} attempts, waiting " f"at least {self._queue_wait_time} seconds between " f"attempts. Either the values of the " f"ZYTE_API_SESSION_QUEUE_MAX_ATTEMPTS and " f"ZYTE_API_SESSION_QUEUE_WAIT_TIME settings are too " f"low for your scenario, in which case you can modify " f"them accordingly, or there might be a bug with " f"scrapy-zyte-api session management. If you think it " f"could be the later, please report the issue at " f"https://github.com/scrapy-plugins/scrapy-zyte-api/issues/new " f"providing a minimal reproducible example if " f"possible, or debug logs and stats otherwise." ) await sleep(self._queue_wait_time) assert session_id is not None self._queues[pool].append(session_id) return session_id async def _next(self, request) -> str: """Return the ID of the next working session in the session pool rotation. *request* is needed to determine the URL to use for request initialization. """ pool = self._get_pool(request) if self._pending_initial_sessions[pool] >= 1: self._pending_initial_sessions[pool] -= 1 session_id = await self._create_session(request, pool) else: session_id = await self._next_from_queue(request, pool) return session_id def is_init_request(self, request: Request) -> bool: """Return ``True`` if the request is one of the requests being used to initialize a session, or ``False`` otherwise. If ``True`` is returned for a request, the session ID of that request should not be modified, or it will break the session management logic. """ return request.meta.get(SESSION_INIT_META_KEY, False) def _get_request_session_id(self, request: Request) -> Optional[str]: for meta_key in ZYTE_API_META_KEYS: if meta_key not in request.meta: continue session_id = request.meta[meta_key].get("session", {}).get("id", None) if session_id: return session_id logger.warning( f"Request {request} had no session ID assigned, unexpectedly. " f"If you are sure this issue is not caused by your own code, " f"please report this at " f"https://github.com/scrapy-plugins/scrapy-zyte-api/issues/new " f"providing a minimal, reproducible example." ) return None def _start_session_refresh(self, session_id: str, request: Request, pool: str): try: self._pools[pool].remove(session_id) except KeyError: # More than 1 request was using the same session concurrently. Do # not refresh the session again. pass else: task = create_task(self._create_session(request, pool)) self._init_tasks.add(task) task.add_done_callback(self._init_tasks.discard) try: del self._errors[session_id] except KeyError: pass def _start_request_session_refresh(self, request: Request, pool: str): session_id = self._get_request_session_id(request) if session_id is None: return self._start_session_refresh(session_id, request, pool) async def check(self, response: Response, request: Request) -> bool: """Check the response for signs of session expiration, update the internal session pool accordingly, and return ``False`` if the session has expired or ``True`` if the session passed validation.""" assert self._crawler.stats with self._fatal_error_handler: if self.is_init_request(request): return True session_config = self._get_session_config(request) if not session_config.enabled(request): return True pool = self._get_pool(request) try: passed = session_config.check(response, request) except CloseSpider: raise except Exception: self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error" ) logger.exception( f"Unexpected exception raised while checking session " f"validity on response {response}." ) else: outcome = "passed" if passed else "failed" self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}" ) if passed: return True self._start_request_session_refresh(request, pool) return False async def assign(self, request: Request): """Assign a working session to *request*.""" assert self._crawler.stats with self._fatal_error_handler: if self.is_init_request(request): return session_config = self._get_session_config(request) if not session_config.enabled(request): self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled") return session_id = await self._next(request) # Note: If there is a session set already (e.g. a request being # retried), it is overridden. request.meta.setdefault("zyte_api_provider", {})["session"] = { "id": session_id } if ( "zyte_api" in request.meta or request.meta.get("zyte_api_automap", None) is False or ( "zyte_api_automap" not in request.meta and self._transparent_mode is False ) ): meta_key = "zyte_api" else: meta_key = "zyte_api_automap" request.meta.setdefault(meta_key, {}) if not isinstance(request.meta[meta_key], dict): request.meta[meta_key] = {} request.meta[meta_key]["session"] = {"id": session_id} request.meta.setdefault("dont_merge_cookies", True) def is_enabled(self, request: Request) -> bool: session_config = self._get_session_config(request) return session_config.enabled(request) def handle_error(self, request: Request): assert self._crawler.stats with self._fatal_error_handler: pool = self._get_pool(request) self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/use/failed" ) session_id = self._get_request_session_id(request) if session_id is not None: self._errors[session_id] += 1 if self._errors[session_id] < self._max_errors: return self._start_request_session_refresh(request, pool) def handle_expiration(self, request: Request): assert self._crawler.stats with self._fatal_error_handler: pool = self._get_pool(request) self._crawler.stats.inc_value( f"scrapy-zyte-api/sessions/pools/{pool}/use/expired" ) self._start_request_session_refresh(request, pool) class ScrapyZyteAPISessionDownloaderMiddleware: @classmethod def from_crawler(cls, crawler: Crawler): return cls(crawler) def __init__(self, crawler: Crawler): self._crawler = crawler self._sessions = _SessionManager(crawler) async def process_request(self, request: Request, spider: Spider) -> None: await self._sessions.assign(request) async def process_response( self, request: Request, response: Response, spider: Spider ) -> Union[Request, Response, None]: if isinstance(response, DummyResponse): return response passed = await self._sessions.check(response, request) if not passed: new_request_or_none = get_retry_request( request, spider=spider, reason="session_expired", ) if not new_request_or_none: raise IgnoreRequest return new_request_or_none return response async def process_exception( self, request: Request, exception: Exception, spider: Spider ) -> Union[Request, None]: if ( not isinstance(exception, RequestError) or self._sessions.is_init_request(request) or not self._sessions.is_enabled(request) ): return None if exception.parsed.type == "/problem/session-expired": self._sessions.handle_expiration(request) reason = "session_expired" elif exception.status in {520, 521}: self._sessions.handle_error(request) reason = "download_error" else: return None return get_retry_request( request, spider=spider, reason=reason, )
[docs] class LocationSessionConfig(SessionConfig): """:class:`~scrapy_zyte_api.SessionConfig` subclass to minimize boilerplate when implementing location-specific session configs, i.e. session configs where the default values should be used unless a location is set. Provides counterparts to some :class:`~scrapy_zyte_api.SessionConfig` methods that are only called when a location is set, and get that location as a parameter. """ def params(self, request: Request) -> Dict[str, Any]: if not (location := self.location(request)): return super().params(request) return self.location_params(request, location) def check(self, response: Response, request: Request) -> bool: if not (location := self.location(request)): return super().check(response, request) return self.location_check(response, request, location)
[docs] def location_params( self, request: Request, location: Dict[str, Any] ) -> Dict[str, Any]: """Like :class:`SessionConfig.params <scrapy_zyte_api.SessionConfig.params>`, but it is only called when a location is set, and gets that *location* as a parameter.""" return super().params(request)
[docs] def location_check( self, response: Response, request: Request, location: Dict[str, Any] ) -> bool: """Like :class:`SessionConfig.check <scrapy_zyte_api.SessionConfig.check>`, but it is only called when a location is set, and gets that *location* as a parameter.""" return super().check(response, request)