Skip to content

DataStream.stateful_transform

EXPERIMENTAL API

This is like transform, except you can use a stateful object as your transformation function. This is useful for example, if you want to run a heavy Pytorch model on each batch coming in, and you don't want to reload this model for each function call. Remember the transform API only supports stateless transformations. You could also implement much more complicated stateful transformations, like implementing your own aggregation function if you are not satisfied with Quokka's default operator's performance.

This API is still being finalized. A version of it that takes multiple input streams is also going to be added. This is the part of the DataStream level api that is closest to the underlying execution engine. Quokka's underlying execution engine basically executes a series of stateful transformations on batches of data. The difficulty here is how much of that underlying API to expose here so it's still useful without the user having to understand how the Quokka runtime works. To that end, we have to come up with suitable partitioner and placement strategy abstraction classes and interfaces.

If you are interested in helping us hammer out this API, please talke to me: zihengw@stanford.edu.

Parameters:

Name Type Description Default
executor pyquokka.executors.Executor

The stateful executor. It must be a subclass of pyquokka.executors.Executor, and expose the execute and done functions. More details forthcoming.

required
new_schema list

The names of the columns of the Polars DataFrame that the transformation function produces.

required
required_columns list or set

The names of the columns that are required for this transformation. This argument is made mandatory because it's often trivial to supply and can often greatly speed things up.

required
Return

A transformed DataStream.

Examples:

Check the code for the gramian function.

Source code in pyquokka/datastream.py
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
def stateful_transform(self, executor: Executor, new_schema: list, required_columns: set,
                       partitioner=PassThroughPartitioner(), placement_strategy = CustomChannelsStrategy(1)):

    """

    **EXPERIMENTAL API** 

    This is like `transform`, except you can use a stateful object as your transformation function. This is useful for example, if you want to run
    a heavy Pytorch model on each batch coming in, and you don't want to reload this model for each function call. Remember the `transform` API only
    supports stateless transformations. You could also implement much more complicated stateful transformations, like implementing your own aggregation
    function if you are not satisfied with Quokka's default operator's performance.

    This API is still being finalized. A version of it that takes multiple input streams is also going to be added. This is the part of the DataStream level 
    api that is closest to the underlying execution engine. Quokka's underlying execution engine basically executes a series of stateful transformations
    on batches of data. The difficulty here is how much of that underlying API to expose here so it's still useful without the user having to understand 
    how the Quokka runtime works. To that end, we have to come up with suitable partitioner and placement strategy abstraction classes and interfaces.

    If you are interested in helping us hammer out this API, please talke to me: zihengw@stanford.edu.

    Args:
        executor (pyquokka.executors.Executor): The stateful executor. It must be a subclass of `pyquokka.executors.Executor`, and expose the `execute` 
            and `done` functions. More details forthcoming.
        new_schema (list): The names of the columns of the Polars DataFrame that the transformation function produces. 
        required_columns (list or set): The names of the columns that are required for this transformation. This argument is made mandatory
            because it's often trivial to supply and can often greatly speed things up.

    Return:
        A transformed DataStream.

    Examples:
        Check the code for the `gramian` function.
    """

    assert type(required_columns) == set
    assert issubclass(type(executor), Executor), "user defined executor must be an instance of a \
        child class of the Executor class defined in pyquokka.executors. You must override the execute and done methods."

    select_stream = self.select(required_columns)

    custom_node = StatefulNode(
        schema=new_schema,
        # cannot push through any predicates or projections!
        schema_mapping={col: {-1: col} for col in new_schema},
        required_columns={0: required_columns},
        operator=executor
    )

    custom_node.set_placement_strategy(placement_strategy)

    return self.quokka_context.new_stream(
        sources={0: select_stream},
        partitioners={0: partitioner},
        node=custom_node,
        schema=new_schema,

    )