import contextlib
import json
import random
import time
from asyncio import Task, create_task, sleep
from collections import defaultdict, deque
from collections.abc import Awaitable
from copy import deepcopy
from functools import partial
from logging import getLogger
from typing import Any, TypedDict, cast
from uuid import uuid4
from weakref import WeakKeyDictionary
from scrapy import Request, Spider, signals
from scrapy.crawler import Crawler
from scrapy.exceptions import CloseSpider, IgnoreRequest
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.misc import load_object
from scrapy.utils.python import global_object_name
from tenacity import stop_after_attempt
from zyte_api import RequestError, RetryFactory
from .utils import ( # type: ignore[attr-defined]
_DOWNLOAD_NEEDS_SPIDER,
_build_from_crawler,
_close_spider,
_ensure_awaitable,
deferred_to_future,
)
try:
from typing import NotRequired # Python 3.11+
except ImportError:
from typing_extensions import NotRequired # Python 3.10
logger = getLogger(__name__)
SESSION_INIT_META_KEY = "_is_session_init_request"
ZYTE_API_META_KEYS = ("zyte_api", "zyte_api_automap", "zyte_api_provider")
def _troubleshoot(slug):
return (
f"https://scrapy-zyte-api.readthedocs.io/en/latest/usage/session.html"
f"#session-troubleshooting-{slug}"
)
[docs]
def get_request_session_id(request: Request) -> str | None:
"""Return the session ID of *request*, or ``None`` if it does not have a
session ID assigned."""
for meta_key in ZYTE_API_META_KEYS:
if meta_key not in request.meta:
continue
session_id = request.meta[meta_key].get("session", {}).get("id", None)
if session_id:
return session_id
logger.warning(
f"Request {request} had no session ID assigned, unexpectedly. "
f"If you are sure this issue is not caused by your own code, "
f"please report this at "
f"https://github.com/scrapy-plugins/scrapy-zyte-api/issues/new "
f"providing a minimal, reproducible example."
)
return None
[docs]
def is_session_init_request(request):
"""Return ``True`` if the request is a :ref:`session initialization request
<session-init>` or ``False`` otherwise."""
return request.meta.get(SESSION_INIT_META_KEY, False) is True
class SessionRetryFactory(RetryFactory):
download_error_stop = stop_after_attempt(1) # python-zyte-api >= 0.7.0
temporary_download_error_stop = stop_after_attempt(1) # python-zyte-api < 0.7.0
SESSION_DEFAULT_RETRY_POLICY = SessionRetryFactory().build()
try:
from zyte_api import AggressiveRetryFactory, stop_on_count
except ImportError:
SESSION_AGGRESSIVE_RETRY_POLICY = SESSION_DEFAULT_RETRY_POLICY
else:
class AggressiveSessionRetryFactory(AggressiveRetryFactory):
download_error_stop = stop_on_count(1)
SESSION_AGGRESSIVE_RETRY_POLICY = AggressiveSessionRetryFactory().build()
try:
from scrapy_poet import DummyResponse
except ImportError:
class DummyResponse: # type: ignore[no-redef]
pass
try:
from scrapy.downloadermiddlewares.retry import get_retry_request
except ImportError: # pragma: no cover
# https://github.com/scrapy/scrapy/blob/b1fe97dc6c8509d58b29c61cf7801eeee1b409a9/scrapy/downloadermiddlewares/retry.py#L57-L142
def get_retry_request( # type: ignore[misc]
request,
*,
spider,
reason="unspecified",
max_retry_times=None,
priority_adjust=None,
stats_base_key="retry",
):
settings = spider.crawler.settings
assert spider.crawler.stats
stats = spider.crawler.stats
retry_times = request.meta.get("retry_times", 0) + 1
if max_retry_times is None:
max_retry_times = request.meta.get("max_retry_times")
if max_retry_times is None:
max_retry_times = settings.getint("RETRY_TIMES")
if retry_times <= max_retry_times:
logger.debug(
"Retrying %(request)s (failed %(retry_times)d times): %(reason)s",
{"request": request, "retry_times": retry_times, "reason": reason},
extra={"spider": spider},
)
new_request: Request = request.copy()
new_request.meta["retry_times"] = retry_times
new_request.dont_filter = True
if priority_adjust is None:
priority_adjust = settings.getint("RETRY_PRIORITY_ADJUST")
new_request.priority = request.priority + priority_adjust
if callable(reason):
reason = reason()
if isinstance(reason, Exception):
reason = global_object_name(reason.__class__)
stats.inc_value(f"{stats_base_key}/count")
stats.inc_value(f"{stats_base_key}/reason_count/{reason}")
return new_request
stats.inc_value(f"{stats_base_key}/max_reached")
logger.error(
"Gave up retrying %(request)s (failed %(retry_times)d times): %(reason)s",
{"request": request, "retry_times": retry_times, "reason": reason},
extra={"spider": spider},
)
return None
try:
from scrapy.http.request import NO_CALLBACK
except ImportError:
def NO_CALLBACK(response): # type: ignore[misc]
pass # pragma: no cover
class PoolError(ValueError):
pass
class TooManyBadSessionInits(RuntimeError):
pass
class PoolConfig(TypedDict):
id: str
delay: NotRequired[float]
randomize_delay: NotRequired[bool]
size: NotRequired[int]
class PoolOptions(TypedDict):
delay: NotRequired[float]
randomize_delay: NotRequired[bool]
size: NotRequired[int]
QueueSession = tuple[str, float] # (session_id, next_use_timestamp)
[docs]
class SessionConfig:
"""Default session configuration for :ref:`scrapy-zyte-api sessions
<session>`."""
#: List of address fields to use when available, and their order, when
#: :ref:`creating a pool ID for a request <session-pools>` based on the
#: content of the :reqmeta:`zyte_api_session_location` metadata key. See
#: :meth:`pool`.
ADDRESS_FIELDS: list[str] = [
"addressCountry",
"addressRegion",
"postalCode",
"streetAddress",
]
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def __init__(self, crawler):
self.crawler = crawler
settings = crawler.settings
self._setting_location = settings.getdict("ZYTE_API_SESSION_LOCATION")
self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS")
checker_cls = settings.get("ZYTE_API_SESSION_CHECKER", None)
if checker_cls:
self._checker = _build_from_crawler(load_object(checker_cls), crawler)
else:
self._checker = None
self._enabled = crawler.settings.getbool("ZYTE_API_SESSION_ENABLED", False)
self._pool_counters = defaultdict(int)
self._param_pools: defaultdict[str, dict[str, int]] = defaultdict(dict)
[docs]
def enabled(self, request: Request) -> bool:
"""Return ``True`` if the request should use sessions from
:ref:`session management <session>` or ``False`` otherwise.
The default implementation is based on settings and request metadata
keys as described in :ref:`enable-sessions`.
"""
return request.meta.get("zyte_api_session_enabled", self._enabled)
[docs]
def process_request(self, request: Request) -> Request | None:
"""Process *request* after it has been assigned a session.
Return ``None`` to send the request as is, or return a new request
object to replace the original request.
The default implementation does not modify the request.
You can combine this method and :meth:`check` to modify requests based
on session initialization responses. For example:
#. In :meth:`__init__`, create a dictionary to store session data:
.. code-block:: python
def __init__(self, crawler):
super().__init__(crawler)
self.session_data = {}
#. In :meth:`check`, store data from the session initialization
response in ``session_data``:
.. code-block:: python
def check(self, response: Response, request: Request) -> bool:
if scrapy_zyte_api.is_session_init_request(request):
session_id = scrapy_zyte_api.get_request_session_id(request)
self.session_data[session_id] = {
"csrf_token": response.css(".csrf-token::text").get(),
}
return super().check(response, request)
#. In :meth:`process_request`, read the session data and act
accordingly, either modifying the request in place where possible,
e.g.:
.. code-block:: python
def process_request(self, request: Request) -> Optional[Request]:
session_id = scrapy_zyte_api.get_request_session_id(request)
csrf_token = self.session_data[session_id]["csrf_token"]
request.headers["CSRF-Token"] = csrf_token
Or returning an entirely new request, e.g.:
.. code-block:: python
def process_request(self, request: Request) -> Optional[Request]:
session_id = get_request_session_id(request)
csrf_token = self.session_data[session_id]["csrf_token"]
new_url = w3lib.url.add_or_replace_parameter(request.url, "csrf_token", csrf_token)
return request.replace(url=new_url)
"""
return None
[docs]
def pool(self, request: Request) -> str | PoolConfig:
"""Return the ID of the session pool to use for *request*, or a
:class:`dict` with additional session pool config.
The main aspects of the default implementation are described in
:ref:`session-pools`.
When the :reqmeta:`zyte_api_session_params` request metadata key is
used, the pool ID is the target domain followed by an integer between
brackets (e.g. ``example.com[0]``), and a log message indicates which
session initialization parameters are associated with that pool ID.
When the :reqmeta:`zyte_api_session_location` request metadata key is
used, the pool ID is the target domain followed by an at sign and the
comma-separated values of the non-empty fields from
:data:`ADDRESS_FIELDS` (e.g. ``example.com@US,NY,10001``).
Instead of a string, this method can also return a :class:`dict`
containing the pool ID under the ``id`` key, and optionally any other
key supported by :setting:`ZYTE_API_SESSION_POOLS`. For example:
.. code-block:: python
def pool(self, request):
if "ecommerce.example" in urlparse_cached(request).netloc:
return {
"id": "ecommerce.example",
"delay": 2.0,
"size": 16,
}
return super().pool(request)
The values of optional keys take precedence over the corresponding
pool-independent settings, e.g. ``delay`` takes precedence over
:setting:`ZYTE_API_SESSION_DELAY` for the corresponding pool ID, but do
not override those defined in :setting:`ZYTE_API_SESSION_POOLS`.
For any given pool ID, the values of optional keys are only taken into
account when the pool ID is first encountered. You cannot use this
method to change them at run time.
"""
meta_pool = request.meta.get("zyte_api_session_pool", "")
if meta_pool:
return meta_pool
netloc = urlparse_cached(request).netloc
meta_params = request.meta.get("zyte_api_session_params", None)
if meta_params:
param_key = json.dumps(meta_params, sort_keys=True)
try:
index = self._param_pools[netloc][param_key]
except KeyError:
index = self._pool_counters[netloc]
logger.info(
f"Session pool {netloc}[{index}] uses these session "
f"initialization parameters: {meta_params}"
)
self._pool_counters[netloc] += 1
self._param_pools[netloc][param_key] = index
return f"{netloc}[{index}]"
meta_location = request.meta.get("zyte_api_session_location", None)
if meta_location:
location_id = ",".join(
[meta_location[k] for k in self.ADDRESS_FIELDS if k in meta_location]
)
return f"{netloc}@{location_id}"
return netloc
[docs]
def location(self, request: Request) -> dict[str, str]:
"""Return the address :class:`dict` to use for location-based session
initialization for *request*.
The default implementation is based on settings and request metadata
keys as described in :ref:`session-init`.
When overriding this method, you should only return a custom value if
the default implementation returns an empty :class:`dict`, e.g.
.. code-block:: python
def location(self, request: Request) -> Dict[str, str]:
fallback = {"addressCountry": "US", "addressRegion": "NY", "postalCode": "10001"}
return super().location(request) or fallback
.. note:: An implementation of
:meth:`~scrapy_zyte_api.SessionConfig.location` can technically
override :reqmeta:`zyte_api_session_location` or
:setting:`ZYTE_API_SESSION_LOCATION`, but it is not recommended as
it breaks the :ref:`precedence chain that users expect
<session-init>`.
You should only override this method if you need a location to be
used even when no location is specified through request metadata or
settings. It can be specially useful if you can determine the right
location to use based on the request, e.g.
.. code-block:: python
def location(self, request: Request) -> Dict[str, str]:
fallback = {}
if postal_code := w3lib.url.url_query_parameter(request.url, "postalCode"):
fallback["postalCode"] = postal_code
return super().location(request) or fallback
Same as with :reqmeta:`zyte_api_session_location` and
:setting:`ZYTE_API_SESSION_LOCATION`, the returned location fields
should match those of the ``address`` parameter of the ``setLocation``
:http:`action <request:actions>` where possible, even when using an
implementation of :meth:`params` that does not rely on the
``setLocation`` action.
"""
return request.meta.get("zyte_api_session_location", self._setting_location)
[docs]
def params(self, request: Request) -> dict[str, Any] | Awaitable[dict[str, Any]]:
"""Return the Zyte API request parameters to use to initialize a
session for *request*.
The default implementation is based on settings and request metadata
keys as described in :ref:`session-init`.
When overriding this method, you should return parameters for the
target location, i.e. the output of :meth:`location`, unless that
output is an empty :class:`dict`, e.g.
.. code-block:: python
def params(self, request: Request) -> Dict[str, Any]:
if location := self.location(request):
return {
"url": "https://example.com/new-session/for-country",
"httpResponseBody": True,
"httpRequestMethod": "POST",
"httpRequestText": location["addressCountry"],
}
return {
"url": "https://example.com/new-session",
"httpResponseBody": True,
}
The returned parameters do not need to include :http:`request:url`. If
missing, it is picked from the request :ref:`triggering a session
initialization request <pool-size>`.
This method can be implemented as a coroutine function. For example:
.. code-block:: python
async def params(self, request: Request) -> Dict[str, Any]:
bootstrap_request = Request(
"https://example.com/api/get-session",
meta={
"zyte_api_session_enabled": False,
"zyte_api": {
"httpResponseBody": True,
"responseCookies": True,
},
},
)
response = await self.crawler.engine.download_async(bootstrap_request)
return {
"url": "https://example.com/new-session",
"httpResponseBody": True,
"requestCookies": response.raw_api_response["responseCookies"],
}
.. seealso:: :class:`~scrapy_zyte_api.LocationSessionConfig`
"""
if location := self.location(request):
return {
"browserHtml": True,
"actions": [
{
"action": "setLocation",
"address": location,
}
],
}
return {"browserHtml": True}
[docs]
def check(self, response: Response, request: Request) -> bool:
"""Return ``True`` if the session used to fetch *response* should be
kept, return ``False`` if it should be discarded, or raise
:exc:`~scrapy.exceptions.CloseSpider` if the spider should be closed.
The default implementation checks the outcome of the ``setLocation``
action if a location was defined, as described in :ref:`session-check`.
If you need to tell whether *request* is a :ref:`session initialization
request <session-init>` or not, use
:func:`~scrapy_zyte_api.is_session_init_request`.
.. seealso:: :class:`~scrapy_zyte_api.LocationSessionConfig`
"""
if self._checker:
return self._checker.check(response, request)
location = self.location(request)
if not location:
return True
for action in response.raw_api_response.get("actions", []): # type: ignore[attr-defined]
if action.get("action", None) != "setLocation":
continue
if action.get("error", "").startswith("Action setLocation not supported "):
logger.error(
f"Stopping the spider, tried to use the setLocation "
f"action on an unsupported website "
f"({urlparse_cached(request).netloc})."
)
raise CloseSpider("unsupported_set_location")
return action.get("status", None) == "success"
return True
try:
from web_poet import RulesRegistry
except ImportError:
class SessionConfigRulesRegistry:
def session_config_cls(self, request: Request) -> type[SessionConfig]:
return SessionConfig
def session_config(
self,
include,
*,
instead_of: type | None = SessionConfig,
exclude=None,
priority: int = 500,
**kwargs,
):
"""Mark the decorated :class:`SessionConfig` subclass as the
:ref:`session config <session-configs>` to use for the specified
URL patterns.
Usage example:
.. code-block:: python
from typing import Any, Dict
from scrapy import Request
from scrapy.http.response import Response
from scrapy_zyte_api import SessionConfig, session_config
@session_config(["ecommerce.de.example, ecommerce.us.example"])
class EcommerceExampleSessionConfig(SessionConfig):
def pool(self, request: Request) -> str:
return "ecommerce.example"
def params(self, request: Request) -> Dict[str, Any]:
if location := self.location(request):
return {
"url": request.url,
"browserHtml": True,
"actions": [
{
"action": "type",
"selector": {"type": "css", "value": ".zipcode"},
"text": location["postalCode"],
},
{
"action": "click",
"selector": {"type": "css", "value": "[type='submit']"},
},
],
}
return super().params(request)
def check(self, response: Response, request: Request) -> bool:
if location := self.location(request):
return response.css(".zipcode::text").get() == location["postalCode"]
return super().check(response, request)
Your :class:`~scrapy_zyte_api.SessionConfig` subclass must be
defined in a module that gets imported at run time. See
``SCRAPY_POET_DISCOVER`` in the :ref:`scrapy-poet setting reference
<scrapy-poet:settings>`.
The parameters of this decorator are those of
:func:`web_poet.handle_urls`, only *instead_of* is
:class:`SessionConfig` by default, *to_return* is not supported,
and session configs are registered in their own rule registry.
"""
raise RuntimeError(
"To use the @session_config decorator you first must install web-poet."
)
else:
from url_matcher import Patterns
from web_poet import ApplyRule
from web_poet.rules import Strings
class SessionConfigRulesRegistry(RulesRegistry): # type: ignore[no-redef]
def __init__(self):
rules = [ApplyRule(for_patterns=Patterns(include=[""]), use=SessionConfig)] # type: ignore[arg-type]
super().__init__(rules=rules)
def session_config_cls(self, request: Request) -> type[SessionConfig]:
cls = SessionConfig
overrides: dict[type[SessionConfig], type[SessionConfig]] = (
self.overrides_for(request.url) # type: ignore[assignment]
)
while cls in overrides:
cls = overrides[cls]
return cls
def session_config(
self,
include: Strings,
*,
instead_of: type[SessionConfig] | None = SessionConfig,
exclude: Strings | None = None,
priority: int = 500,
**kwargs,
):
return self.handle_urls(
include=include,
instead_of=instead_of, # type: ignore[arg-type]
exclude=exclude,
priority=priority,
**kwargs,
)
class FatalErrorHandler:
def __init__(self, crawler):
self.crawler = crawler
async def __aenter__(self):
return None
async def __aexit__(self, exc_type, exc, tb):
if exc_type is None:
return
close = partial(_close_spider, self.crawler)
if issubclass(exc_type, TooManyBadSessionInits):
close("bad_session_inits")
elif issubclass(exc_type, PoolError):
close("pool_error")
elif issubclass(exc_type, CloseSpider):
close(exc.reason)
session_config_registry = SessionConfigRulesRegistry()
session_config = session_config_registry.session_config
class _SessionManager:
def __init__(self, crawler: Crawler):
self._crawler = crawler
crawler.signals.connect(
self._handle_engine_start, signal=signals.engine_started
)
settings = crawler.settings
self._default_pool_delay = settings.getfloat(
"ZYTE_API_SESSION_DELAY", settings.getfloat("DOWNLOAD_DELAY")
)
self._randomize_delay = settings.getbool(
"ZYTE_API_SESSION_RANDOMIZE_DELAY",
settings.getbool("RANDOMIZE_DOWNLOAD_DELAY"),
)
self._default_pool_size = settings.getint("ZYTE_API_SESSION_POOL_SIZE", 8)
self._pending_initial_sessions: dict[str, int] = {}
self._pool_configs = settings.getdict("ZYTE_API_SESSION_POOLS")
pool_sizes = settings.getdict("ZYTE_API_SESSION_POOL_SIZES")
if pool_sizes:
logger.warning(
"ZYTE_API_SESSION_POOL_SIZES is deprecated, use "
"ZYTE_API_SESSION_POOLS instead"
)
for pool_id, pool_size in pool_sizes.items():
self._pool_configs.setdefault(pool_id, {}).setdefault("size", pool_size)
for pool, config in self._pool_configs.items():
if "size" in config:
self._pending_initial_sessions[pool] = config["size"]
self._max_check_failures = settings.getint(
"ZYTE_API_SESSION_MAX_CHECK_FAILURES", 1
)
self._check_failures: dict[str, int] = defaultdict(int)
self._max_errors = settings.getint("ZYTE_API_SESSION_MAX_ERRORS", 1)
self._errors: dict[str, int] = defaultdict(int)
max_bad_inits = settings.getint("ZYTE_API_SESSION_MAX_BAD_INITS", 8)
self._max_bad_inits: dict[str, int] = defaultdict(lambda: max_bad_inits)
max_bad_inits_per_pool = settings.getdict(
"ZYTE_API_SESSION_MAX_BAD_INITS_PER_POOL"
)
for pool, pool_max_bad_inits in max_bad_inits_per_pool.items():
self._max_bad_inits[pool] = pool_max_bad_inits
self._bad_inits: dict[str, int] = defaultdict(int)
# Transparent mode, needed to determine whether to set the session
# using ``zyte_api`` or ``zyte_api_automap``.
self._transparent_mode: bool = settings.getbool(
"ZYTE_API_TRANSPARENT_MODE", False
)
# Each pool contains the IDs of sessions that have not expired yet.
#
# While the initial sessions of a pool have not all been started, for
# every request needing a session, a new session is initialized and
# then added to the pool.
#
# Once a pool is full, sessions are picked from the pool queue, which
# should contain all pool sessions that have been initialized.
#
# As soon as a session expires, it is removed from its pool, and a task
# to initialize that new session is started.
self._pools: dict[str, set[str]] = defaultdict(set)
self._pool_cache: WeakKeyDictionary[Request, str] = WeakKeyDictionary()
# The queue is a rotating list of session IDs to use.
#
# The way to use the queue is to get a session ID with popleft(), and
# put it back to the end of the queue with append().
#
# The queue may contain session IDs from expired sessions. If the
# popped session ID cannot be found in the pool, then it should be
# discarded instead of being put back in the queue.
#
# When a new session ID is added to the pool, it is still not added to
# the queue until the session is actually initialized, when it is
# appended to the queue.
#
# If the queue is empty, sleep and try again. Sessions from the pool
# will be appended to the queue as they are initialized and ready to
# use.
self._queues: dict[str, deque[QueueSession]] = defaultdict(deque)
self._queue_max_attempts = settings.getint(
"ZYTE_API_SESSION_QUEUE_MAX_ATTEMPTS", 60
)
self._queue_wait_time = settings.getfloat(
"ZYTE_API_SESSION_QUEUE_WAIT_TIME", 1.0
)
# Contains the on-going tasks to create new sessions.
#
# Keeping a reference to those tasks until they are done is necessary
# to prevent garbage collection to remove the tasks.
self._init_tasks: set[Task] = set()
self._session_config_cache: WeakKeyDictionary[Request, SessionConfig] = (
WeakKeyDictionary()
)
self._session_config_map: dict[type[SessionConfig], SessionConfig] = {}
self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS")
self._fatal_error_handler = FatalErrorHandler(crawler)
self._stats_per_pool: bool = settings.getbool("ZYTE_API_SESSION_STATS_PER_POOL")
def _inc_stat(self, key: str, pool: str):
pool = f"pools/{pool}/" if self._stats_per_pool else ""
key = f"scrapy-zyte-api/sessions/{pool}{key}"
assert self._crawler.stats
self._crawler.stats.inc_value(key)
async def _handle_engine_start(self):
assert self._crawler.engine
self._download_async = getattr(self._crawler.engine, "download_async", None)
self._download = None if self._download_async else self._crawler.engine.download
def _get_session_config(self, request: Request) -> SessionConfig:
try:
return self._session_config_cache[request]
except KeyError:
cls = session_config_registry.session_config_cls(request)
if cls not in self._session_config_map:
self._session_config_map[cls] = _build_from_crawler(cls, self._crawler)
self._session_config_cache[request] = self._session_config_map[cls]
return self._session_config_map[cls]
def get_pool(self, request):
try:
return self._pool_cache[request]
except KeyError:
session_config = self._get_session_config(request)
try:
pool = session_config.pool(request)
except Exception as exception:
message = (
f"Exception raised on session config pool() method call "
f"for request {request}."
)
raise PoolError(message) from exception
options: PoolOptions
if isinstance(pool, str):
pool_id = pool
options = {}
else:
try:
pool_id = pool["id"]
except (KeyError, TypeError) as exception:
message = (
f'Exception raised when accessing pool["id"] on the '
f"return value of the session config pool() method call "
f"for request {request}."
)
raise PoolError(message) from exception
else:
options = cast(
"PoolOptions", {k: v for k, v in pool.items() if k != "id"}
)
delay = options.get("delay", self._default_pool_delay)
randomize_delay = options.get("randomize_delay", self._randomize_delay)
size = options.get("size", self._default_pool_size)
if pool_id not in self._pool_configs:
self._pool_configs[pool_id] = {
"delay": delay,
"size": size,
"randomize_delay": randomize_delay,
}
self._pending_initial_sessions[pool_id] = size
else:
config = self._pool_configs[pool_id]
config.setdefault("delay", delay)
config.setdefault("randomize_delay", randomize_delay)
if "size" not in config:
self._pending_initial_sessions[pool_id] = size
config.setdefault("size", size)
self._pool_cache[request] = pool_id
return pool_id
async def _init_session(self, session_id: str, request: Request, pool: str) -> bool:
assert self._crawler.engine
session_config = self._get_session_config(request)
if meta_params := request.meta.get("zyte_api_session_params", None):
session_params = meta_params
elif (
not request.meta.get("zyte_api_session_location", None)
and self._setting_params
):
session_params = self._setting_params
else:
try:
session_params = await _ensure_awaitable(session_config.params(request))
except Exception:
self._inc_stat("init/param-error", pool)
logger.exception(
f"Unexpected exception raised while obtaining session "
f"initialization parameters for request {request}."
)
return False
session_params = deepcopy(session_params)
session_init_url = session_params.pop("url", request.url)
session_init_request = Request(
session_init_url,
meta={
SESSION_INIT_META_KEY: True,
"dont_merge_cookies": True,
"zyte_api": {**session_params, "session": {"id": session_id}},
**{
k: v
for k, v in request.meta.items()
if k in {"zyte_api_session_location", "zyte_api_session_params"}
},
},
callback=NO_CALLBACK,
)
if self._download_async is not None: # Scrapy >= 2.14
assert self._download_async
download = self._download_async(session_init_request)
elif not _DOWNLOAD_NEEDS_SPIDER:
assert self._download
deferred = self._download(session_init_request)
download = deferred_to_future(deferred)
else:
assert self._download
deferred = self._download( # type: ignore[call-arg]
session_init_request, spider=self._crawler.spider
)
download = deferred_to_future(deferred)
try:
response = await download
except Exception:
self._inc_stat("init/failed", pool)
return False
else:
try:
result = session_config.check(response, session_init_request)
except CloseSpider:
raise
except Exception:
self._inc_stat("init/check-error", pool)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
return False
outcome = "passed" if result else "failed"
self._inc_stat(f"init/check-{outcome}", pool)
return result
async def _create_session(self, request: Request, pool: str) -> str:
async with self._fatal_error_handler:
while True:
session_id = str(uuid4())
session_init_succeeded = await self._init_session(
session_id, request, pool
)
if session_init_succeeded:
self._pools[pool].add(session_id)
self._bad_inits[pool] = 0
break
self._bad_inits[pool] += 1
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
raise TooManyBadSessionInits
pool_config = self._pool_configs[pool]
delay = pool_config["delay"]
sleep_delay = next_use_delay = delay
if pool_config["randomize_delay"]:
next_use_delay *= random.uniform(0.5, 1.5) # noqa: S311
sleep_delay *= random.uniform(0.5, 1.5) # noqa: S311
await sleep(sleep_delay)
next_use = time.time() + next_use_delay
self._queues[pool].append((session_id, next_use))
return session_id
async def _next_from_queue(self, request: Request, pool: str) -> str:
attempts = 0
while True:
try:
session_id, next_use = self._queues[pool].popleft()
except IndexError as ex: # No ready-to-use session available.
attempts += 1
if attempts >= self._queue_max_attempts:
raise RuntimeError(
f"Could not get a session ID from the session "
f"rotation queue after {attempts} attempts, waiting "
f"at least {self._queue_wait_time} seconds between "
f"attempts. See "
f"{_troubleshoot('could-not-get-session-id')}"
) from ex
await sleep(self._queue_wait_time)
continue
if session_id not in self._pools[pool]:
continue # Invalid session
now = time.time()
if next_use > now:
wait = next_use - now
logger.debug(
f"Waiting {wait:.3f} seconds for session {session_id} "
f"from pool {pool!r} to become available"
)
await sleep(wait)
now = time.time()
pool_config = self._pool_configs[pool]
next_use_delay = pool_config["delay"]
if pool_config["randomize_delay"]:
next_use_delay *= random.uniform(0.5, 1.5) # noqa: S311
self._queues[pool].append((session_id, now + next_use_delay))
return session_id
async def _next(self, request) -> str:
"""Return the ID of the next working session in the session pool
rotation.
*request* is needed to determine the URL to use for request
initialization.
"""
pool = self.get_pool(request)
if self._pending_initial_sessions[pool] >= 1:
self._pending_initial_sessions[pool] -= 1
session_id = await self._create_session(request, pool)
else:
session_id = await self._next_from_queue(request, pool)
return session_id
def is_init_request(self, request: Request) -> bool:
"""Return ``True`` if the request is one of the requests being used
to initialize a session, or ``False`` otherwise.
If ``True`` is returned for a request, the session ID of that request
should not be modified, or it will break the session management logic.
"""
return request.meta.get(SESSION_INIT_META_KEY, False)
def _start_session_refresh(self, session_id: str, request: Request, pool: str):
try:
self._pools[pool].remove(session_id)
except KeyError:
# More than 1 request was using the same session concurrently. Do
# not refresh the session again.
pass
else:
task = create_task(self._create_session(request, pool))
self._init_tasks.add(task)
task.add_done_callback(self._init_tasks.discard)
with contextlib.suppress(KeyError):
del self._errors[session_id]
def _start_request_session_refresh(self, request: Request, pool: str):
session_id = get_request_session_id(request)
if session_id is None:
return
self._start_session_refresh(session_id, request, pool)
@staticmethod
def allow_new_session_assignments(request):
# Since a response has been received or an exception raised, allow new
# session assignments for this request, e.g. if a new request based on
# this one (e.g. requests.replace()) is returned by the
# process_response or process_exception methods of a later downloader
# middleware.
request.meta.pop("_zyte_api_session_assigned", None)
async def check(self, response: Response, request: Request) -> bool:
"""Check the response for signs of session expiration, update the
internal session pool accordingly, and return ``False`` if the session
has expired or ``True`` if the session passed validation."""
async with self._fatal_error_handler:
if self.is_init_request(request):
return True
session_config = self._get_session_config(request)
if not session_config.enabled(request):
return True
pool = self.get_pool(request)
try:
passed = session_config.check(response, request)
except CloseSpider:
raise
except Exception:
self._inc_stat("use/check-error", pool)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
else:
outcome = "passed" if passed else "failed"
self._inc_stat(f"use/check-{outcome}", pool)
if passed:
return True
session_id = get_request_session_id(request)
if session_id is not None:
self._check_failures[session_id] += 1
if self._check_failures[session_id] < self._max_check_failures:
return False
self._start_request_session_refresh(request, pool)
return False
async def assign(self, request: Request) -> Request | None:
"""Assign a working session to *request*.
If the session config creates a new request instead of modifying the
request in place, return that new request, to replace the received
request.
"""
async with self._fatal_error_handler:
if self.is_init_request(request) or request.meta.get(
"_zyte_api_session_assigned", False
):
return None
session_config = self._get_session_config(request)
if not session_config.enabled(request):
assert self._crawler.stats
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
return None
session_id = await self._next(request)
# Note: If there is a session set already (e.g. a request being
# retried), it is overridden.
request.meta.setdefault("zyte_api_provider", {})["session"] = {
"id": session_id
}
if (
"zyte_api" in request.meta
or request.meta.get("zyte_api_automap", None) is False
or (
"zyte_api_automap" not in request.meta
and self._transparent_mode is False
)
):
meta_key = "zyte_api"
else:
meta_key = "zyte_api_automap"
request.meta.setdefault(meta_key, {})
if not isinstance(request.meta[meta_key], dict):
request.meta[meta_key] = {}
request.meta[meta_key]["session"] = {"id": session_id}
request.meta.setdefault("dont_merge_cookies", True)
# Mark this request as having a session assigned already, so that
# if a later downloader middleware process_request call returns a
# new request object (with a shallow copy of its meta), a new call
# to the process_request method of the session management
# middleware does not assign a new session again.
request.meta.setdefault("_zyte_api_session_assigned", True)
return session_config.process_request(request)
def is_enabled(self, request: Request) -> bool:
session_config = self._get_session_config(request)
return session_config.enabled(request)
async def handle_error(self, request: Request):
async with self._fatal_error_handler:
pool = self.get_pool(request)
self._inc_stat("use/failed", pool)
session_id = get_request_session_id(request)
if session_id is not None:
self._errors[session_id] += 1
if self._errors[session_id] < self._max_errors:
return
self._start_request_session_refresh(request, pool)
async def handle_expiration(self, request: Request):
async with self._fatal_error_handler:
pool = self.get_pool(request)
self._inc_stat("use/expired", pool)
self._start_request_session_refresh(request, pool)
class ScrapyZyteAPISessionDownloaderMiddleware:
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(crawler)
def __init__(self, crawler: Crawler):
self._crawler = crawler
self._sessions = _SessionManager(crawler)
async def process_request(
self, request: Request, spider: Spider | None = None
) -> Request | None:
return await self._sessions.assign(request)
async def process_response(
self, request: Request, response: Response, spider: Spider | None = None
) -> Request | Response | None:
if isinstance(response, DummyResponse):
return response
self._sessions.allow_new_session_assignments(request)
passed = await self._sessions.check(response, request)
if not passed:
assert self._crawler.spider
new_request_or_none = get_retry_request(
request,
spider=self._crawler.spider,
reason="session_expired",
)
if not new_request_or_none:
raise IgnoreRequest
return new_request_or_none
return response
async def process_exception(
self, request: Request, exception: Exception, spider: Spider | None = None
) -> Request | None:
if (
not isinstance(exception, RequestError)
or self._sessions.is_init_request(request)
or not self._sessions.is_enabled(request)
):
return None
self._sessions.allow_new_session_assignments(request)
if exception.parsed.type == "/problem/session-expired":
await self._sessions.handle_expiration(request)
reason = "session_expired"
elif exception.status in {520, 521}:
await self._sessions.handle_error(request)
reason = "download_error"
else:
return None
assert self._crawler.spider
return get_retry_request(
request,
spider=self._crawler.spider,
reason=reason,
)
def get_pool(self, request: Request) -> PoolConfig | str | None:
return (
self._sessions.get_pool(request)
if self._sessions.is_enabled(request)
else None
)
[docs]
class LocationSessionConfig(SessionConfig):
""":class:`~scrapy_zyte_api.SessionConfig` subclass to minimize boilerplate
when implementing location-specific session configs, i.e. session configs
where the default values should be used unless a location is set.
Provides counterparts to some :class:`~scrapy_zyte_api.SessionConfig`
methods that are only called when a location is set, and get that location
as a parameter.
"""
def params(self, request: Request) -> dict[str, Any] | Awaitable[dict[str, Any]]:
if not (location := self.location(request)):
return super().params(request)
return self.location_params(request, location)
def check(self, response: Response, request: Request) -> bool:
if not (location := self.location(request)):
return super().check(response, request)
return self.location_check(response, request, location)
[docs]
def location_params(
self, request: Request, location: dict[str, Any]
) -> dict[str, Any] | Awaitable[dict[str, Any]]:
"""Like :class:`SessionConfig.params
<scrapy_zyte_api.SessionConfig.params>`, but it is only called when a
location is set, and gets that *location* as a parameter."""
return super().params(request)
[docs]
def location_check(
self, response: Response, request: Request, location: dict[str, Any]
) -> bool:
"""Like :class:`SessionConfig.check
<scrapy_zyte_api.SessionConfig.check>`, but it is only called when a
location is set, and gets that *location* as a parameter."""
return super().check(response, request)