Skip to content

Http

This module contains async implementation of HTTP step.

koheesio.asyncio.http.AsyncHttpGetStep #

Represents an asynchronous HTTP GET step.

This class inherits from the AsyncHttpStep class and specifies the HTTP method as GET.

Attributes: method (HttpMethod): The HTTP method for the step, set to HttpMethod.GET.

method class-attribute instance-attribute #

method: HttpMethod = GET

koheesio.asyncio.http.AsyncHttpStep #

Asynchronous HTTP step for making HTTP requests using aiohttp.

Parameters:

Name Type Description Default
client_session Optional[ClientSession]

Aiohttp ClientSession.

required
url List[URL]

List of yarl.URL.

required
retry_options Optional[RetryOptionsBase]

Retry options for the request.

required
connector Optional[BaseConnector]

Connector for the aiohttp request.

required
headers Optional[Dict[str, Union[str, SecretStr]]]

Request headers.

required
Output

responses_urls : Optional[List[Tuple[Dict[str, Any], yarl.URL]]] List of responses from the API and request URL.

Examples:

import asyncio
from aiohttp import ClientSession
from aiohttp.connector import TCPConnector
from aiohttp_retry import ExponentialRetry
from koheesio.asyncio.http import AsyncHttpStep
from yarl import URL
from typing import Dict, Any, Union, List, Tuple


# Initialize the AsyncHttpStep
async def main():
    session = ClientSession()
    urls = [URL("https://example.com/api/1"), URL("https://example.com/api/2")]
    retry_options = ExponentialRetry()
    connector = TCPConnector(limit=10)
    headers = {"Content-Type": "application/json"}
    step = AsyncHttpStep(
        client_session=session,
        url=urls,
        retry_options=retry_options,
        connector=connector,
        headers=headers,
    )

    # Execute the step
    responses_urls = await step.get()

    return responses_urls


# Run the main function
responses_urls = asyncio.run(main())

client_session class-attribute instance-attribute #

client_session: Optional[ClientSession] = Field(
    default=None,
    description="Aiohttp ClientSession",
    exclude=True,
)

connector class-attribute instance-attribute #

connector: Optional[BaseConnector] = Field(
    default=None,
    description="Connector for the aiohttp request",
    exclude=True,
)

headers class-attribute instance-attribute #

headers: Dict[str, Union[str, SecretStr]] = Field(
    default_factory=dict,
    description="Request headers",
    alias="header",
    exclude=True,
)

method class-attribute instance-attribute #

method: Union[str, HttpMethod] = Field(
    default=GET,
    description="What type of Http call to perform. One of 'get', 'post', 'put', 'delete'. Defaults to 'get'.",
)

retry_options class-attribute instance-attribute #

retry_options: Optional[RetryOptionsBase] = Field(
    default=None,
    description="Retry options for the request",
    exclude=True,
)

timeout class-attribute instance-attribute #

timeout: None = Field(
    default=None, description="[Optional] Request timeout"
)

url class-attribute instance-attribute #

url: List[URL] = Field(
    default_factory=list,
    alias="urls",
    description="Expecting list, as there is no value in executing async request for one value.\n        yarl.URL is preferable, because params/data can be injected into URL instance",
    exclude=True,
)

Output #

Output class for Step

responses_urls class-attribute instance-attribute #

responses_urls: Optional[
    List[Tuple[Dict[str, Any], URL]]
] = Field(
    default=None,
    description="List of responses from the API and request URL",
    repr=False,
)

delete async #

delete() -> List[Tuple[Dict[str, Any], URL]]

Make DELETE requests.

Returns:

Type Description
List[Tuple[Dict[str, Any], URL]]

A list of response data and corresponding request URLs.

Source code in src/koheesio/asyncio/http.py
async def delete(self) -> List[Tuple[Dict[str, Any], yarl.URL]]:
    """
    Make DELETE requests.

    Returns
    -------
    List[Tuple[Dict[str, Any], yarl.URL]]
        A list of response data and corresponding request URLs.
    """
    tasks = self.__tasks_generator(method=HttpMethod.DELETE)
    responses_urls = await self._execute(tasks=tasks)

    return responses_urls

execute #

execute() -> None

Execute the step.

Raises:

Type Description
ValueError

If the specified HTTP method is not implemented in AsyncHttpStep.

Source code in src/koheesio/asyncio/http.py
def execute(self) -> None:
    """
    Execute the step.

    Raises
    ------
    ValueError
        If the specified HTTP method is not implemented in AsyncHttpStep.
    """
    # By design asyncio does not allow its event loop to be nested. This presents a practical problem:
    #   When in an environment where the event loop is already running
    #   it’s impossible to run tasks and wait for the result.
    #   Trying to do so will give the error “RuntimeError: This event loop is already running”.
    #   The issue pops up in various environments, such as web servers, GUI applications and in
    #   Jupyter/DataBricks notebooks.
    nest_asyncio.apply()

    map_method_func = {
        HttpMethod.GET: self.get,
        HttpMethod.POST: self.post,
        HttpMethod.PUT: self.put,
        HttpMethod.DELETE: self.delete,
    }

    if self.method not in map_method_func:
        raise ValueError(f"Method {self.method} not implemented in AsyncHttpStep.")

    self.output.responses_urls = asyncio.run(map_method_func[self.method]())  # type: ignore[index]

get async #

get() -> List[Tuple[Dict[str, Any], URL]]

Make GET requests.

Returns:

Type Description
List[Tuple[Dict[str, Any], URL]]

A list of response data and corresponding request URLs.

Source code in src/koheesio/asyncio/http.py
async def get(self) -> List[Tuple[Dict[str, Any], yarl.URL]]:
    """
    Make GET requests.

    Returns
    -------
    List[Tuple[Dict[str, Any], yarl.URL]]
        A list of response data and corresponding request URLs.
    """
    tasks = self.__tasks_generator(method=HttpMethod.GET)
    responses_urls = await self._execute(tasks=tasks)

    return responses_urls

get_headers #

get_headers() -> Union[None, dict]

Get the request headers.

Returns:

Type Description
Optional[Dict[str, Union[str, SecretStr]]]

The request headers.

Source code in src/koheesio/asyncio/http.py
def get_headers(self) -> Union[None, dict]:
    """
    Get the request headers.

    Returns
    -------
    Optional[Dict[str, Union[str, SecretStr]]]
        The request headers.
    """
    _headers = None

    if self.headers:
        _headers = {k: v.get_secret_value() if isinstance(v, SecretStr) else v for k, v in self.headers.items()}

        for k, v in self.headers.items():
            if isinstance(v, SecretStr):
                self.headers[k] = v.get_secret_value()

    return _headers or self.headers

get_options #

get_options() -> None

Get the options of the step.

Source code in src/koheesio/asyncio/http.py
def get_options(self) -> None:
    """
    Get the options of the step.
    """
    warnings.warn("get_options is not implemented in AsyncHttpStep.")

post async #

post() -> List[Tuple[Dict[str, Any], URL]]

Make POST requests.

Returns:

Type Description
List[Tuple[Dict[str, Any], URL]]

A list of response data and corresponding request URLs.

Source code in src/koheesio/asyncio/http.py
async def post(self) -> List[Tuple[Dict[str, Any], yarl.URL]]:
    """
    Make POST requests.

    Returns
    -------
    List[Tuple[Dict[str, Any], yarl.URL]]
        A list of response data and corresponding request URLs.
    """
    tasks = self.__tasks_generator(method=HttpMethod.POST)
    responses_urls = await self._execute(tasks=tasks)

    return responses_urls

put async #

put() -> List[Tuple[Dict[str, Any], URL]]

Make PUT requests.

Returns:

Type Description
List[Tuple[Dict[str, Any], URL]]

A list of response data and corresponding request URLs.

Source code in src/koheesio/asyncio/http.py
async def put(self) -> List[Tuple[Dict[str, Any], yarl.URL]]:
    """
    Make PUT requests.

    Returns
    -------
    List[Tuple[Dict[str, Any], yarl.URL]]
        A list of response data and corresponding request URLs.
    """
    tasks = self.__tasks_generator(method=HttpMethod.PUT)
    responses_urls = await self._execute(tasks=tasks)

    return responses_urls

request async #

request(
    method: HttpMethod, url: URL, **kwargs
) -> Tuple[Dict[str, Any], URL]

Make an HTTP request.

Parameters:

Name Type Description Default
method HttpMethod

The HTTP method to use for the request.

required
url URL

The URL to make the request to.

required
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Tuple[Dict[str, Any], URL]

A tuple containing the response data and the request URL.

Source code in src/koheesio/asyncio/http.py
async def request(  # type: ignore[no-untyped-def]
    self,
    method: HttpMethod,
    url: yarl.URL,
    **kwargs,
) -> Tuple[Dict[str, Any], yarl.URL]:
    """
    Make an HTTP request.

    Parameters
    ----------
    method : HttpMethod
        The HTTP method to use for the request.
    url : yarl.URL
        The URL to make the request to.
    kwargs : Any
        Additional keyword arguments to pass to the request.

    Returns
    -------
    Tuple[Dict[str, Any], yarl.URL]
        A tuple containing the response data and the request URL.
    """
    async with self.__retry_client.request(method=method, url=url, **kwargs) as response:
        res = await response.json()

    return res, response.request_info.url

set_outputs #

set_outputs(response) -> None

Set the outputs of the step.

Parameters:

Name Type Description Default
response Any

The response data.

required
Source code in src/koheesio/asyncio/http.py
def set_outputs(self, response) -> None:  # type: ignore[no-untyped-def]
    """
    Set the outputs of the step.

    Parameters
    ----------
    response : Any
        The response data.
    """
    warnings.warn("set outputs is not implemented in AsyncHttpStep.")

validate_timeout #

validate_timeout(timeout: Any) -> None

Validate the 'timeout' field.

Parameters:

Name Type Description Default
timeout Any

The value of the 'timeout' field.

required

Raises:

Type Description
ValueError

If 'data' is not allowed in AsyncHttpStep.

Source code in src/koheesio/asyncio/http.py
@field_validator("timeout")
def validate_timeout(cls, timeout: Any) -> None:
    """
    Validate the 'timeout' field.

    Parameters
    ----------
    timeout : Any
        The value of the 'timeout' field.

    Raises
    ------
    ValueError
        If 'data' is not allowed in AsyncHttpStep.
    """
    if timeout:
        raise ValueError("timeout is not allowed in AsyncHttpStep. Provide timeout through retry_options.")