Skip to content

Memory

Create Spark DataFrame directly from the data stored in a Python variable

koheesio.spark.readers.memory.DataFormat #

Data formats supported by the InMemoryDataReader

CSV class-attribute instance-attribute #

CSV = 'csv'

JSON class-attribute instance-attribute #

JSON = 'json'

koheesio.spark.readers.memory.InMemoryDataReader #

Directly read data from a Python variable and convert it to a Spark DataFrame.

Read the data, that is stored in one of the supported formats (see DataFormat) directly from the variable and convert it to the Spark DataFrame. The use cases include converting JSON output of the API into the dataframe; reading the CSV data via the API (e.g. Box API).

The advantage of using this reader is that it allows to read the data directly from the Python variable, without the need to store it on the disk. This can be useful when the data is small and does not need to be stored permanently.

Parameters:

Name Type Description Default
data Union[str, list, dict, bytes]

Source data

required
format DataFormat

File / data format

required
schema_ Optional[StructType]

Schema that will be applied during the creation of Spark DataFrame

None
params Optional[Dict[str, Any]]

Set of extra parameters that should be passed to the appropriate reader (csv / json). Optionally, the user can pass the parameters that are specific to the reader (e.g. multiLine for JSON reader) as key-word arguments. These will be merged with the params parameter.

dict
Example
# Read CSV data from a string
df1 = InMemoryDataReader(format=DataFormat.CSV, data='foo,bar\nA,1\nB,2')

# Read JSON data from a string
df2 = InMemoryDataReader(format=DataFormat.JSON, data='{"foo": A, "bar": 1}'
df3 = InMemoryDataReader(format=DataFormat.JSON, data=['{"foo": "A", "bar": 1}', '{"foo": "B", "bar": 2}']

data class-attribute instance-attribute #

data: Union[str, list, dict, bytes] = Field(
    default=..., description="Source data"
)

format class-attribute instance-attribute #

format: DataFormat = Field(
    default=..., description="File / data format"
)

params class-attribute instance-attribute #

params: Dict[str, Any] = Field(
    default_factory=dict,
    description="[Optional] Set of extra parameters that should be passed to the appropriate reader (csv / json)",
)

schema_ class-attribute instance-attribute #

schema_: Optional[StructType] = Field(
    default=None,
    alias="schema",
    description="[Optional] Schema that will be applied during the creation of Spark DataFrame",
)

execute #

execute() -> Output

Execute method appropriate to the specific data format

Source code in src/koheesio/spark/readers/memory.py
def execute(self) -> Reader.Output:
    """
    Execute method appropriate to the specific data format
    """
    if self.data is None:
        raise ValueError("Data is not provided")

    if isinstance(self.data, bytes):
        self.data = self.data.decode("utf-8")

    _func = getattr(InMemoryDataReader, f"_{self.format}")
    _df = partial(_func, self)()
    self.output.df = _df