Skip to content

Http

This module contains several HTTP Steps that can be used to perform API Calls to HTTP endpoints

Example
from koheesio.steps.http import HttpGetStep

response = (
    HttpGetStep(url="https://google.com").execute().json_payload
)

In the above example, the response variable will contain the JSON response from the HTTP request.

koheesio.steps.http.HttpDeleteStep #

send DELETE requests

method class-attribute instance-attribute #

method: HttpMethod = DELETE

koheesio.steps.http.HttpGetStep #

send GET requests

Example

response = (
    HttpGetStep(url="https://google.com").execute().json_payload
)
In the above example, the response variable will contain the JSON response from the HTTP request.

method class-attribute instance-attribute #

method: HttpMethod = GET

koheesio.steps.http.HttpMethod #

Enumeration of allowed http methods

DELETE class-attribute instance-attribute #

DELETE = 'delete'

GET class-attribute instance-attribute #

GET = 'get'

POST class-attribute instance-attribute #

POST = 'post'

PUT class-attribute instance-attribute #

PUT = 'put'

from_string classmethod #

from_string(value: str) -> str

Allows for getting the right Method Enum by simply passing a string value This method is not case-sensitive

Source code in src/koheesio/steps/http.py
@classmethod
def from_string(cls, value: str) -> str:
    """Allows for getting the right Method Enum by simply passing a string value
    This method is not case-sensitive
    """
    return getattr(cls, value.upper())

koheesio.steps.http.HttpPostStep #

send POST requests

method class-attribute instance-attribute #

method: HttpMethod = POST

koheesio.steps.http.HttpPutStep #

send PUT requests

method class-attribute instance-attribute #

method: HttpMethod = PUT

koheesio.steps.http.HttpStep #

Can be used to perform API Calls to HTTP endpoints

Authorization

The optional auth_header parameter in HttpStep allows you to pass an authorization header, such as a bearer token. For example: auth_header = "Bearer <token>".

The auth_header value is stored as a SecretStr object to prevent sensitive information from being displayed in logs.

Of course, authorization can also just be passed as part of the regular headers parameter.

For example, either one of these parameters would semantically be the same:

headers = {
    "Authorization": "Bearer <token>",
    "Content-Type": "application/json",
}

or#

auth_header = "Bearer " ```

The auth_header parameter is useful when you want to keep the authorization separate from the other headers, for example when your implementation requires you to pass some custom headers in addition to the authorization header.

Note: The auth_header parameter can accept any authorization header value, including basic authentication tokens, digest authentication strings, NTLM, etc.

Understanding Retries#

This class includes a built-in retry mechanism for handling temporary issues, such as network errors or server downtime, that might cause the HTTP request to fail. The retry mechanism is controlled by three parameters: max_retries, initial_delay, and backoff.

  • max_retries determines the number of retries after the initial request. For example, if max_retries is set to 4, the request will be attempted a total of 5 times (1 initial attempt + 4 retries). If max_retries is set to 0, no retries will be attempted, and the request will be tried only once.

  • initial_delay sets the waiting period before the first retry. If initial_delay is set to 3, the delay before the first retry will be 3 seconds. Changing the initial_delay value directly affects the amount of delay before each retry.

  • backoff controls the rate at which the delay increases for each subsequent retry. If backoff is set to 2 (the default), the delay will double with each retry. If backoff is set to 1, the delay between retries will remain constant. Changing the backoff value affects how quickly the delay increases.

Given the default values of max_retries=3, initial_delay=2, and backoff=2, the delays between retries would be 2 seconds, 4 seconds, and 8 seconds, respectively. This results in a total delay of 14 seconds before all retries are exhausted.

For example, if you set initial_delay=3 and backoff=2, the delays before the retries would be 3 seconds, 6 seconds, and 12 seconds. If you set initial_delay=2 and backoff=3, the delays before the retries would be 2 seconds, 6 seconds, and 18 seconds. If you set initial_delay=2 and backoff=1, the delays before the retries would be 2 seconds, 2 seconds, and 2 seconds.

Parameters#

url : str, required API endpoint URL. headers : Dict[str, Union[str, SecretStr]], optional, default={"Content-Type": "application/json"} Request headers. auth_header : Optional[SecretStr], optional, default=None Authorization header. An optional parameter that can be used to pass an authorization, such as a bearer token. data : Union[Dict[str, str], str], optional, default={} Data to be sent along with the request. timeout : int, optional, default=3 Request timeout. Defaults to 3 seconds. method : Union[str, HttpMethod], required, default='get' What type of Http call to perform. One of 'get', 'post', 'put', 'delete'. Defaults to 'get'. session : requests.Session, optional, default=requests.Session() Existing requests session object to be used for making HTTP requests. If not provided, a new session object will be created. params : Optional[Dict[str, Any]] Set of extra parameters that should be passed to the HTTP request. Note: any kwargs passed to the class will be added to this dictionary.

Output#

response_raw : Optional[requests.Response] The raw requests.Response object returned by the appropriate requests.request() call. response_json : Optional[Union[Dict, List]] The JSON response for the request. raw_payload : Optional[str] The raw response for the request. status_code : Optional[int] The status return code of the request.

auth_header class-attribute instance-attribute #

auth_header: Optional[SecretStr] = Field(
    default=None,
    description="[Optional] Authorization header",
    alias="authorization_header",
    examples=["Bearer <token>"],
)

data class-attribute instance-attribute #

data: Union[Dict[str, str], str] = Field(
    default_factory=dict,
    description="[Optional] Data to be sent along with the request",
    alias="body",
)

headers class-attribute instance-attribute #

headers: Dict[str, Union[str, SecretStr]] = Field(
    default={"Content-Type": "application/json"},
    description="Request headers",
    alias="header",
)

method class-attribute instance-attribute #

method: Union[str, HttpMethod] = Field(
    default=GET,
    description="What type of Http call to perform. One of 'get', 'post', 'put', 'delete'. Defaults to 'get'.",
)

params class-attribute instance-attribute #

params: Optional[Dict[str, Any]] = Field(
    default_factory=dict,
    description="[Optional] Set of extra parameters that should be passed to HTTP request",
)

session class-attribute instance-attribute #

session: Session = Field(
    default_factory=Session,
    description="Existing requests session object to be used for making HTTP requests. If not provided, a new session object will be created.",
    exclude=True,
    repr=False,
)

timeout class-attribute instance-attribute #

timeout: int = Field(
    default=3, description="[Optional] Request timeout"
)

url class-attribute instance-attribute #

url: str = Field(
    default=..., description="API endpoint URL", alias="uri"
)

Output #

Output class for HttpStep

json_payload property #

json_payload: Union[dict, list, None]

Alias for response_json

raw_payload class-attribute instance-attribute #

raw_payload: Optional[str] = Field(
    default=None,
    alias="response_text",
    description="The raw response for the request",
)

response_json class-attribute instance-attribute #

response_json: Optional[Union[Dict, List]] = Field(
    default=None,
    alias="json_payload",
    description="The JSON response for the request",
)

response_raw class-attribute instance-attribute #

response_raw: Optional[Response] = Field(
    default=None,
    alias="response",
    description="The raw requests.Response object returned by the appropriate requests.request() call",
)

status_code class-attribute instance-attribute #

status_code: Optional[int] = Field(
    default=None,
    description="The status return code of the request",
)

decode_sensitive_headers #

decode_sensitive_headers(headers: dict) -> dict

Authorization headers are being converted into SecretStr under the hood to avoid dumping any sensitive content into logs by the encode_sensitive_headers method.

However, when calling the get_headers method, the SecretStr should be converted back to string, otherwise sensitive info would have looked like '**********'.

This method decodes values of the headers dictionary that are of type SecretStr into plain text.

Source code in src/koheesio/steps/http.py
@field_serializer("headers", when_used="json")
def decode_sensitive_headers(self, headers: dict) -> dict:
    """
    Authorization headers are being converted into SecretStr under the hood to avoid dumping any
    sensitive content into logs by the `encode_sensitive_headers` method.

    However, when calling the `get_headers` method, the SecretStr should be converted back to
    string, otherwise sensitive info would have looked like '**********'.

    This method decodes values of the `headers` dictionary that are of type SecretStr into plain text.
    """
    for k, v in headers.items():
        headers[k] = v.get_secret_value() if isinstance(v, SecretStr) else v
    return headers

delete #

delete() -> Response

Execute an HTTP DELETE call

Source code in src/koheesio/steps/http.py
def delete(self) -> requests.Response:
    """Execute an HTTP DELETE call"""
    self.method = HttpMethod.DELETE
    with self.request() as response:
        return response

encode_sensitive_headers #

encode_sensitive_headers() -> HttpStep

Encode potentially sensitive data into pydantic.SecretStr class to prevent them being displayed as plain text in logs.

Source code in src/koheesio/steps/http.py
@model_validator(mode="after")
def encode_sensitive_headers(self) -> "HttpStep":
    """
    Encode potentially sensitive data into pydantic.SecretStr class to prevent them
    being displayed as plain text in logs.
    """
    if auth_header := self.auth_header:
        # ensure the token is preceded with the word 'Bearer'
        self.headers["Authorization"] = auth_header
        del self.auth_header
    if auth := self.headers.get("Authorization"):
        self.headers["Authorization"] = auth if isinstance(auth, SecretStr) else SecretStr(auth)
    return self

execute #

execute() -> None

Executes the HTTP request.

This method simply calls self.request(), which includes the retry logic. If self.request() raises an exception, it will be propagated to the caller of this method.

Raises:

Type Description
(RequestException, HTTPError)

The last exception that was caught if self.request() fails after self.max_retries attempts.

Source code in src/koheesio/steps/http.py
def execute(self) -> None:
    """
    Executes the HTTP request.

    This method simply calls `self.request()`, which includes the retry logic. If `self.request()` raises an
    exception, it will be propagated to the caller of this method.

    Raises
    ------
    requests.RequestException, requests.HTTPError
        The last exception that was caught if `self.request()` fails after `self.max_retries` attempts.
    """
    with self._request() as response:
        self.log.info(f"HTTP request to {self.url}, status code {response.status_code}")
        self.set_outputs(response)

get #

get() -> Response

Execute an HTTP GET call

Source code in src/koheesio/steps/http.py
def get(self) -> requests.Response:
    """Execute an HTTP GET call"""
    self.method = HttpMethod.GET
    with self.request() as response:
        return response

get_headers #

get_headers() -> dict

Dump headers into JSON without SecretStr masking.

Source code in src/koheesio/steps/http.py
def get_headers(self) -> dict:
    """
    Dump headers into JSON without SecretStr masking.
    """
    return json.loads(self.model_dump_json()).get("headers")

get_options #

get_options() -> dict

options to be passed to requests.request()

Source code in src/koheesio/steps/http.py
def get_options(self) -> dict:
    """options to be passed to requests.request()"""
    return {
        "url": self.url,
        "headers": self.get_headers(),
        "data": self.data,
        "timeout": self.timeout,
        **self.params,  # type: ignore
    }

get_proper_http_method_from_str_value #

get_proper_http_method_from_str_value(
    method_value: str,
) -> str

Converts string value to HttpMethod enum value

Source code in src/koheesio/steps/http.py
@field_validator("method")
def get_proper_http_method_from_str_value(cls, method_value: str) -> str:
    """Converts string value to HttpMethod enum value"""
    if isinstance(method_value, str):
        try:
            method_value = HttpMethod.from_string(method_value)
        except AttributeError as e:
            raise AttributeError(
                "Only values from HttpMethod class are allowed! "
                f"Provided value: '{method_value}', allowed values: {', '.join(HttpMethod.__members__.keys())}"
            ) from e

    return method_value

post #

post() -> Response

Execute an HTTP POST call

Source code in src/koheesio/steps/http.py
def post(self) -> requests.Response:
    """Execute an HTTP POST call"""
    self.method = HttpMethod.POST
    with self.request() as response:
        return response

put #

put() -> Response

Execute an HTTP PUT call

Source code in src/koheesio/steps/http.py
def put(self) -> requests.Response:
    """Execute an HTTP PUT call"""
    self.method = HttpMethod.PUT
    with self.request() as response:
        return response

set_outputs #

set_outputs(response: Response) -> None

Types of response output

Source code in src/koheesio/steps/http.py
def set_outputs(self, response: requests.Response) -> None:
    """
    Types of response output
    """
    self.output.response_raw = response
    self.output.raw_payload = response.text
    self.output.status_code = response.status_code

    # Only decode non empty payloads to avoid triggering decoding error unnecessarily.
    if self.output.raw_payload:
        try:
            self.output.response_json = response.json()

        except json.decoder.JSONDecodeError as e:
            self.log.error(f"An error occurred while processing the JSON payload. Error message:\n{e.msg}")

koheesio.steps.http.PaginatedHttpGetStep #

Represents a paginated HTTP GET step.

Parameters:

Name Type Description Default
paginate bool

Whether to paginate the API response. Defaults to False.

required
pages int

Number of pages to paginate. Defaults to 1.

required
offset int

Offset for paginated API calls. Offset determines the starting page. Defaults to 1.

required
limit int

Limit for paginated API calls. Defaults to 100.

required

limit class-attribute instance-attribute #

limit: Optional[int] = Field(
    default=100,
    description="Limit for paginated API calls. The url should (optionally) contain a named limit parameter, for example: api.example.com/data?limit={limit}",
)

offset class-attribute instance-attribute #

offset: Optional[int] = Field(
    default=1,
    description="Offset for paginated API calls. Offset determines the starting page. Defaults to 1. The url can (optionally) contain a named 'offset' parameter, for example: api.example.com/data?offset={offset}",
)

pages class-attribute instance-attribute #

pages: Optional[int] = Field(
    default=1,
    description="Number of pages to paginate. Defaults to 1",
)

paginate class-attribute instance-attribute #

paginate: Optional[bool] = Field(
    default=False,
    description="Whether to paginate the API response. Defaults to False. When set to True, the API response will be paginated. The url should contain a named 'page' parameter for example: api.example.com/data?page={page}",
)

execute #

execute() -> None

Executes the HTTP GET request and handles pagination.

Returns:

Type Description
Output

The output of the HTTP GET request.

Source code in src/koheesio/steps/http.py
def execute(self) -> None:
    """
    Executes the HTTP GET request and handles pagination.

    Returns
    -------
    HttpGetStep.Output
        The output of the HTTP GET request.
    """
    # Set up pagination parameters
    offset, pages = (self.offset, self.pages + 1) if self.paginate else (1, 1)  # type: ignore
    data = []
    _basic_url = self.url

    for page in range(offset, pages):  # type: ignore[arg-type]
        if self.paginate:
            self.log.info(f"Fetching page {page} of {pages - 1}")

        self.url = self._url(basic_url=_basic_url, page=page)

        with self._request() as response:
            if isinstance(response_json := response.json(), list):
                data += response_json
            else:
                data.append(response_json)

    self.url = _basic_url
    self.output.response_json = data
    self.output.response_raw = None
    self.output.raw_payload = None
    self.output.status_code = None

get_options #

get_options() -> dict

Returns the options to be passed to the requests.request() function.

Returns:

Type Description
dict

The options.

Source code in src/koheesio/steps/http.py
def get_options(self) -> dict:
    """
    Returns the options to be passed to the requests.request() function.

    Returns
    -------
    dict
        The options.
    """
    options = {
        "url": self.url,
        "headers": self.get_headers(),
        "data": self.data,
        "timeout": self.timeout,
        **self._adjust_params(),  # type: ignore
    }

    return options