Skip to content

DataStream.select

This will create a new DataStream that contains only selected columns from the source DataStream.

Since a DataStream is implemented as a stream of batches, you might be tempted to think of a filtered DataStream as a stream of batches where each batch directly results from selecting columns from a batch in the source DataStream. While this certainly may be the case, select() is aggressively optimized by Quokka and is most likely pushed all the way down to the input readers. As a result, you typically should not see a select node in a Quokka execution plan shown by explain().

It is much better to think of a DataStream simply as a stream of rows that meet certain criteria, and who may be non-deterministically batched together by the Quokka runtime. Indeed, Quokka makes no guarantees on the sizes of these batches, which is determined at runtime. This flexibility is an important reason for Quokka's superior performance.

Parameters:

Name Type Description Default
columns list

a list of columns to select from the source DataStream

required
Return

A DataStream consisting of only the columns selected.

Examples:

>>> f = qc.read_csv("lineitem.csv")

Select only the l_orderdate and l_orderkey columns

>>> f = f.select(["l_orderdate", "l_orderkey"])

This will now fail, since f's schema now consists of only two columns.

>>> f = f.select(["l_linenumber"])
Source code in pyquokka/datastream.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def select(self, columns: list):

    """
    This will create a new DataStream that contains only selected columns from the source DataStream.

    Since a DataStream is implemented as a stream of batches, you might be tempted to think of a filtered DataStream as a stream of batches where each
    batch directly results from selecting columns from a batch in the source DataStream. While this certainly may be the case, `select()` is aggressively 
    optimized by Quokka and is most likely pushed all the way down to the input readers. As a result, you typically should
    not see a select node in a Quokka execution plan shown by `explain()`. 

    It is much better to think of a DataStream simply as a stream of rows that meet certain criteria, and who may be non-deterministically 
    batched together by the Quokka runtime. Indeed, Quokka makes no guarantees on the sizes of these batches, which is determined at runtime. 
    This flexibility is an important reason for Quokka's superior performance.

    Args:
        columns (list): a list of columns to select from the source DataStream

    Return:
        A DataStream consisting of only the columns selected.

    Examples:

        >>> f = qc.read_csv("lineitem.csv")

        Select only the l_orderdate and l_orderkey columns

        >>> f = f.select(["l_orderdate", "l_orderkey"])

        This will now fail, since f's schema now consists of only two columns.

        >>> f = f.select(["l_linenumber"])
    """

    assert type(columns) == set or type(columns) == list

    for column in columns:
        assert column in self.schema, "Projection column not in schema"

    if self.materialized:
        df = self._get_materialized_df().select(columns)
        return self.quokka_context.from_polars(df)

    return self.quokka_context.new_stream(
        sources={0: self},
        partitioners={0: PassThroughPartitioner()},
        node=ProjectionNode(set(columns)),
        schema=columns,
        sorted = self.sorted
        )