Skip to content

DataStream.groupby

Group a DataStream on a list of columns, optionally specifying an ordering requirement.

This returns a GroupedDataStream object, which currently only expose the aggregate method. This is similar to Pandas df.groupby().agg() syntax. Eventually the GroupedDataStream object will also support different kinds of window functions.

Parameters:

Name Type Description Default
groupby list or str

a column or a list of columns to group on.

required
orderby list

a list of ordering requirements of the groupby columns, specified in a list like this: [(col1, "asc"), (col2, "desc")].

None
Return

A GroupedDataStream object with the specified grouping and the current DataStream.

Examples:

>>> lineitem = qc.read_csv("lineitem.csv")

result will be a GroupedDataStream.

>>> result = lineitem.groupby(["l_orderkey","l_orderdate"], orderby = [("l_orderkey", "asc"), ("l_orderdate", "desc")])
Source code in pyquokka/datastream.py
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
def groupby(self, groupby: list, orderby=None):

    """
    Group a DataStream on a list of columns, optionally specifying an ordering requirement.

    This returns a GroupedDataStream object, which currently only expose the `aggregate` method. This is similar to Pandas `df.groupby().agg()` syntax.
    Eventually the GroupedDataStream object will also support different kinds of window functions. 

    Args:
        groupby (list or str): a column or a list of columns to group on.
        orderby (list): a list of ordering requirements of the groupby columns, specified in a list like this:
            [(col1, "asc"), (col2, "desc")]. 

    Return:
        A GroupedDataStream object with the specified grouping and the current DataStream.

    Examples:

        >>> lineitem = qc.read_csv("lineitem.csv")

        `result` will be a GroupedDataStream.

        >>> result = lineitem.groupby(["l_orderkey","l_orderdate"], orderby = [("l_orderkey", "asc"), ("l_orderdate", "desc")])

    """

    if type(groupby) == str:
        groupby = [groupby]

    assert type(groupby) == list and len(
        groupby) > 0, "must specify at least one group key as a list of group keys, i.e. [key1,key2]"
    if orderby is not None:
        assert type(orderby) == list
        for i in range(len(orderby)):
            if type(orderby[i]) == tuple:
                assert orderby[i][0] in groupby
                assert orderby[i][1] == "asc" or orderby[i][1] == "desc"
            elif type(orderby[i]) == str:
                assert orderby[i] in groupby
                orderby[i] = (orderby[i], "asc")
            else:
                raise Exception("don't understand orderby format")

    return GroupedDataStream(self, groupby=groupby, orderby=orderby)