Skip to content

DataStream.gramian

This will compute DataStream[columns]^T * DataStream[columns]. The result will be len(columns) * len(columns), with schema same as columns.

Parameters:

Name Type Description Default
columns list

List of columns.

required
Return

A new DataStream of shape len(columns) * len(columns) which is DataStream[columns]^T * DataStream[columns].

Examples:

>>> d = qc.read_csv("lineitem.csv")

Now create two columns high and low using SQL.

>>> d = d.gramian(["l_quantity", "l_extendedprice"])

Result will be a 2x2 matrix.

Source code in pyquokka/datastream.py
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
def gramian(self, columns, demean = None):

    """
    This will compute DataStream[columns]^T * DataStream[columns]. The result will be len(columns) * len(columns), with schema same as columns.

    Args:
        columns (list): List of columns.

    Return:
        A new DataStream of shape len(columns) * len(columns) which is DataStream[columns]^T * DataStream[columns].

    Examples:

        >>> d = qc.read_csv("lineitem.csv")

        Now create two columns high and low using SQL.

        >>> d = d.gramian(["l_quantity", "l_extendedprice"])

        Result will be a 2x2 matrix.

    """

    def udf2(x):
        x = x.select(columns).to_numpy() - demean
        with threadpool_limits(limits=8, user_api='blas'):
            product = np.dot(x.transpose(), x)
        return polars.from_numpy(product, schema = columns)

    for col in columns:
        assert col in self.schema

    if demean is not None:
        assert type(demean) == np.ndarray, "demean must be a numpy array"
        assert len(demean) == len(columns), "demean must be the same length as columns"
    else:
        demean = np.zeros(len(columns))

    if self.materialized:
        df = self._get_materialized_df()
        stuff = df.select(columns).to_numpy() - demean
        product = np.dot(stuff.transpose(), stuff)  
        return self.quokka_context.from_polars(polars.from_numpy(product, schema = columns))

    class AggExecutor(Executor):
        def __init__(self) -> None:
            self.state = None
        def execute(self,batches,stream_id, executor_id):
            for batch in batches:
                #print(batch)
                if self.state is None:
                    self.state = polars.from_arrow(batch)
                else:
                    self.state += polars.from_arrow(batch)
        def done(self,executor_id):
            return self.state

    local_agg_executor = AggExecutor()
    agg_executor = AggExecutor()

    stream = self.select(columns)
    stream = stream.transform( udf2, new_schema = columns, required_columns = set(columns), foldable=True)
    stream = stream.stateful_transform( local_agg_executor , columns, required_columns = set(columns),
                        partitioner=PassThroughPartitioner(), placement_strategy = CustomChannelsStrategy(1))
    return stream.stateful_transform( agg_executor , columns, required_columns = set(columns),
                        partitioner=BroadcastPartitioner(), placement_strategy = SingleChannelStrategy())