Skip to content

DataStream.with_columns_sql

This is the SQL analog of with_columns.

Parameters:

Name Type Description Default
new_columns str

A SQL expression X as in 'SELECT *, X from DataStream'. You can specify multiple columns by separating them with commas. You must provide an alias for each column. Please look at the examples.

required
foldable bool

Whether or not the function can be executed as part of the batch post-processing of the previous operation in the execution graph. This is set to True by default. Correctly setting this flag requires some insight into how Quokka works. Lightweight

True
Return

A new DataStream with new columns made by the user defined functions.

Examples:

>>> d = qc.read_csv("lineitem.csv")

Now create two columns high and low using SQL.

>>> d = d.with_columns_sql('o_orderpriority = "1-URGENT" or o_orderpriority = 2-HIGH as high, 
...                        o_orderpriority = "3-MEDIUM" or o_orderpriority = 4-NOT SPECIFIED" as low')

Another example.

>>> d = d.with_columns_sql('high + low as total')

You must provide aliases for your columns, and separate the column defintiions with commas.

Source code in pyquokka/datastream.py
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
def with_columns_sql(self, new_columns: str, foldable = True):

    """
    This is the SQL analog of with_columns. 

    Args:
        new_columns (str): A SQL expression X as in 'SELECT *, X from DataStream'. You can specify multiple columns by separating them with commas.
            You must provide an alias for each column. Please look at the examples.
        foldable (bool): Whether or not the function can be executed as part of the batch post-processing of the previous operation in the
            execution graph. This is set to True by default. Correctly setting this flag requires some insight into how Quokka works. Lightweight

    Return:
        A new DataStream with new columns made by the user defined functions.

    Examples:

        >>> d = qc.read_csv("lineitem.csv")

        Now create two columns high and low using SQL.

        >>> d = d.with_columns_sql('o_orderpriority = "1-URGENT" or o_orderpriority = 2-HIGH as high, 
        ...                        o_orderpriority = "3-MEDIUM" or o_orderpriority = 4-NOT SPECIFIED" as low')

        Another example.

        >>> d = d.with_columns_sql('high + low as total')

        You must provide aliases for your columns, and separate the column defintiions with commas.

    """

    statements = new_columns.split(",")
    sql_statement = "select *, " + new_columns + " from batch_arrow"
    new_column_names = []
    required_columns = set()
    for statement in statements:
        node = sqlglot.parse_one(statement)
        assert type(node) == sqlglot.exp.Alias, "must provide new name for each column: x1 as some_compute, x2 as some_compute, etc."
        new_column_names.append(node.alias)
        required_columns = required_columns.union(required_columns_from_exp(node.this))

    def polars_func(batch):
        con = duckdb.connect().execute('PRAGMA threads=%d' % 8)
        batch_arrow = batch.to_arrow()
        return polars.from_arrow(con.execute(sql_statement).arrow())

    return self.quokka_context.new_stream(
        sources={0: self},
        partitioners={0: PassThroughPartitioner()},
        node=MapNode(
            schema=self.schema+ new_column_names,
            schema_mapping={
                **{new_column: {-1: new_column} for new_column in new_column_names}, **{col: {0: col} for col in self.schema}},
            required_columns={0: required_columns},
            function=polars_func,
            foldable=foldable),
        schema=self.schema + new_column_names,
        sorted = self.sorted
        )