Skip to content

Collection

utils

Collection utilities for parallel API data loading.

This module provides both legacy and enhanced parallel loading capabilities, supporting both the original t3api-based functions and new httpx-based clients.

parallel_load_collection

parallel_load_collection(method: Callable[P, MetrcCollectionResponse], max_workers: int | None = None, *args: args, **kwargs: kwargs) -> List[MetrcCollectionResponse]

Fetch paginated responses in parallel using a thread pool.

Makes an initial request to determine total records and page size, then fetches all remaining pages concurrently via a thread pool executor.

Parameters:

Name Type Description Default
method Callable[P, MetrcCollectionResponse]

A callable that returns a MetrcCollectionResponse. Must accept a page keyword argument for pagination.

required
max_workers int | None

Maximum number of threads for concurrent fetching. Passed directly to ThreadPoolExecutor; None lets the executor choose a default.

None
*args args

Positional arguments forwarded to method.

()
**kwargs kwargs

Keyword arguments forwarded to method.

{}

Returns:

Type Description
List[MetrcCollectionResponse]

A list of MetrcCollectionResponse objects, one per page, ordered

List[MetrcCollectionResponse]

by page number.

Raises:

Type Description
ValueError

If the first response is missing the total field or the page size cannot be determined (missing or zero).

Source code in t3api_utils/collection/utils.py
def parallel_load_collection(
    method: Callable[P, MetrcCollectionResponse],
    max_workers: int | None = None,
    *args: P.args,
    **kwargs: P.kwargs,
) -> List[MetrcCollectionResponse]:
    """Fetch paginated responses in parallel using a thread pool.

    Makes an initial request to determine total records and page size, then
    fetches all remaining pages concurrently via a thread pool executor.

    Args:
        method: A callable that returns a ``MetrcCollectionResponse``. Must
            accept a ``page`` keyword argument for pagination.
        max_workers: Maximum number of threads for concurrent fetching.
            Passed directly to ``ThreadPoolExecutor``; ``None`` lets the
            executor choose a default.
        *args: Positional arguments forwarded to ``method``.
        **kwargs: Keyword arguments forwarded to ``method``.

    Returns:
        A list of ``MetrcCollectionResponse`` objects, one per page, ordered
        by page number.

    Raises:
        ValueError: If the first response is missing the ``total`` field or
            the page size cannot be determined (missing or zero).
    """
    logger.info("Starting parallel data load")
    first_response = method(*args, **kwargs)

    if first_response["total"] is None:
        raise ValueError("Response missing required `total` attribute.")

    total = first_response["total"]
    responses: List[MetrcCollectionResponse | None] = [None] * 1  # seed with first response

    page_size = first_response.get("pageSize")
    if page_size is None:
        page_size = len(first_response["data"])
    if page_size is None or page_size == 0:
        raise ValueError("Unable to determine page size from first response.")

    num_pages = (total + page_size - 1) // page_size
    logger.info(
        f"Total records: {total}, page size: {page_size}, total pages: {num_pages}"
    )

    responses = [None] * num_pages
    responses[0] = first_response

    def fetch_page(page_number: int) -> tuple[int, MetrcCollectionResponse]:
        logger.debug(f"Fetching page {page_number + 1}")
        response = method(*args, **kwargs, page=page_number + 1)  # type: ignore
        return page_number, response

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(fetch_page, i) for i in range(1, num_pages)]
        for count, future in enumerate(as_completed(futures), start=1):
            page_number, response = future.result()
            responses[page_number] = response
            logger.info(f"Loaded page {page_number + 1} ({count}/{num_pages - 1})")

    logger.info("Finished loading all pages")
    return [r for r in responses if r is not None]

extract_data

extract_data(*, responses: List[MetrcCollectionResponse]) -> List[MetrcObject]

Flatten a list of MetrcCollectionResponse objects that each have a .data property into a single list of MetrcObject items.

Parameters:

Name Type Description Default
responses List[MetrcCollectionResponse]

A list of MetrcCollectionResponse objects.

required

Returns:

Type Description
List[MetrcObject]

List[MetrcObject]: A flattened list of all items from the .data attributes.

Example

extract_data([Response1(data=[1, 2]), Response2(data=[3])]) [1, 2, 3]

Source code in t3api_utils/collection/utils.py
def extract_data(*, responses: List[MetrcCollectionResponse]) -> List[MetrcObject]:
    """
    Flatten a list of MetrcCollectionResponse objects that each have a `.data` property
    into a single list of MetrcObject items.

    Args:
        responses: A list of MetrcCollectionResponse objects.

    Returns:
        List[MetrcObject]: A flattened list of all items from the `.data` attributes.

    Example:
        >>> extract_data([Response1(data=[1, 2]), Response2(data=[3])])
        [1, 2, 3]
    """
    # Use nested list comprehension to flatten all `.data` lists into one
    return [item for response in responses for item in response["data"]]