Rest api
This module provides the RestApiReader class for interacting with RESTful APIs.
The RestApiReader class is designed to fetch data from RESTful APIs and store the response in a DataFrame. It supports
different transports, e.g. Paginated Http or Async HTTP. The main entry point is the execute
method, which performs transport.execute() call and provide data from the API calls.
For more details on how to use this class and its methods, refer to the class docstring.
koheesio.spark.readers.rest_api.RestApiReader #
A reader class that executes an API call and stores the response in a DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
transport |
Union[InstanceOf[AsyncHttpGetStep], InstanceOf[HttpGetStep]]
|
The HTTP transport step. |
required |
spark_schema |
Union[str, StructType, List[str], Tuple[str, ...], AtomicType]
|
The pyspark schema of the response. |
required |
Attributes:
Name | Type | Description |
---|---|---|
transport |
Union[InstanceOf[AsyncHttpGetStep], InstanceOf[HttpGetStep]]
|
The HTTP transport step. |
spark_schema |
Union[str, StructType, List[str], Tuple[str, ...], AtomicType]
|
The pyspark schema of the response. |
Returns:
Type | Description |
---|---|
Output
|
The output of the reader, which includes the DataFrame. |
Examples:
Here are some examples of how to use this class:
Example 1: Paginated Transport
import requests
from urllib3 import Retry
from koheesio.steps.http import HttpGetStep
from koheesio.spark.readers.rest_api import RestApiReader
session = requests.Session()
retry_logic = Retry(total=max_retries, status_forcelist=[503])
session.mount("https://", HTTPAdapter(max_retries=retry_logic))
session.mount("http://", HTTPAdapter(max_retries=retry_logic))
transport = PaginatedHtppGetStep(
url="https://api.example.com/data?page={page}",
paginate=True,
pages=3,
session=session,
)
task = RestApiReader(
transport=transport, spark_schema="id: int, page:int, value: string"
)
task.execute()
all_data = [row.asDict() for row in task.output.df.collect()]
Example 2: Async Transport
from aiohttp import ClientSession, TCPConnector
from aiohttp_retry import ExponentialRetry
from yarl import URL
from koheesio.steps.asyncio.http import AsyncHttpGetStep
from koheesio.spark.readers.rest_api import RestApiReader
session = ClientSession()
urls = [URL("http://httpbin.org/get"), URL("http://httpbin.org/get")]
retry_options = ExponentialRetry()
connector = TCPConnector(limit=10)
transport = AsyncHttpGetStep(
client_session=session,
url=urls,
retry_options=retry_options,
connector=connector,
)
task = RestApiReader(
transport=transport, spark_schema="id: int, page:int, value: string"
)
task.execute()
all_data = [row.asDict() for row in task.output.df.collect()]
spark_schema
class-attribute
instance-attribute
#
spark_schema: Union[
str, StructType, List[str], Tuple[str, ...], AtomicType
] = Field(
..., description="The pyspark schema of the response"
)
transport
class-attribute
instance-attribute
#
transport: Union[
InstanceOf[AsyncHttpGetStep], InstanceOf[HttpGetStep]
] = Field(
..., description="HTTP transport step", exclude=True
)
execute #
execute() -> Output
Executes the API call and stores the response in a DataFrame.
Returns:
Type | Description |
---|---|
Output
|
The output of the reader, which includes the DataFrame. |