Skip to content

DataStream.rename

Renames columns in the DataStream according to rename_dict. This is similar to polars.rename. The keys you supply in rename_dict must be present in the schema, and the rename operation must not lead to duplicate column names.

Note this will lead to a physical operation at runtime. This might also complicate join reodering, so should be avoided if possible.

Parameters:

Name Type Description Default
rename_dict dict

key is old column name, value is new column name.

required
Return

A DataStream with new schema according to rename.

Examples:

>>> f = qc.read_csv("lineitem.csv")

Rename the l_orderdate and l_orderkey columns

>>> f = f.rename({"l_orderdate": "orderdate", "l_orderkey": "orderkey"})

This will now fail, since you renamed l_orderdate

>>> f = f.select(["l_orderdate"])
Source code in pyquokka/datastream.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def rename(self, rename_dict):

    """
    Renames columns in the DataStream according to rename_dict. This is similar to 
    [`polars.rename`](https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.rename.html).
    The keys you supply in rename_dict must be present in the schema, and the rename operation
    must not lead to duplicate column names.

    Note this will lead to a physical operation at runtime. This might also complicate join reodering, so should be avoided if possible.

    Args:
        rename_dict (dict): key is old column name, value is new column name.

    Return:
        A DataStream with new schema according to rename. 

    Examples:
        >>> f = qc.read_csv("lineitem.csv")

        Rename the l_orderdate and l_orderkey columns

        >>> f = f.rename({"l_orderdate": "orderdate", "l_orderkey": "orderkey"})

        This will now fail, since you renamed l_orderdate

        >>> f = f.select(["l_orderdate"])
    """

    new_sorted = {}
    assert type(
        rename_dict) == dict, "must specify a dictionary like Polars"
    for key in rename_dict:
        assert key in self.schema, "key in rename dict must be in schema"
        assert rename_dict[key] not in self.schema, "new name must not be in current schema"
        if self.sorted is not None and key in self.sorted:
            new_sorted[rename_dict[key]] = self.sorted[key]

    if self.materialized:
        df = self._get_materialized_df().rename(rename_dict)
        return self.quokka_context.from_polars(df)

    # the fact you can write this in one line is why I love Python
    new_schema = [col if col not in rename_dict else rename_dict[col]
                  for col in self.schema]
    schema_mapping = {}
    for key in rename_dict:
        schema_mapping[rename_dict[key]] = {0: key}
    for key in self.schema:
        if key not in rename_dict:
            schema_mapping[key] = {0: key}

    def f(x): return x.rename(rename_dict)

    return self.quokka_context.new_stream(
        sources={0: self},
        partitioners={0: PassThroughPartitioner()},
        node=MapNode(
            schema=new_schema,
            schema_mapping=schema_mapping,
            required_columns={0: set(rename_dict.keys())},
            function=f,
            foldable=True
        ),
        schema=new_schema,
        sorted = new_sorted if len(new_sorted) > 0 else None

    )