Skip to content

DataStream.write_csv

This will write out the entire contents of the DataStream to a list of CSVs.

Parameters:

Name Type Description Default
table_location str

the root directory to write the output CSVs to. Similar to Spark, Quokka by default writes out a directory of CSVs instead of dumping all the results to a single CSV so the output can be done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk.

required
output_line_limit int

how many rows each CSV in the output should have. The current implementation simply buffers this many rows in memory instead of using file appends, so you should have enough memory!

1000000
Return

DataStream containing the filenames of the CSVs that were produced.

Examples:

>>> f = qc.read_csv("lineitem.csv")
>>> f = f.filter("l_orderkey < 10 and l_partkey > 5")
>>> f.write_csv("/home/user/test-out") 

Make sure to create the directory first! This will write out a list of CSVs to /home/user/test-out.

Source code in pyquokka/datastream.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def write_csv(self, table_location, output_line_limit=1000000):
    """
    This will write out the entire contents of the DataStream to a list of CSVs. 

    Args:
        table_location (str): the root directory to write the output CSVs to. Similar to Spark, Quokka by default
            writes out a directory of CSVs instead of dumping all the results to a single CSV so the output can be
            done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit
            parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk.
        output_line_limit (int): how many rows each CSV in the output should have. The current implementation simply buffers
            this many rows in memory instead of using file appends, so you should have enough memory!

    Return:
        DataStream containing the filenames of the CSVs that were produced. 

    Examples:

        >>> f = qc.read_csv("lineitem.csv")
        >>> f = f.filter("l_orderkey < 10 and l_partkey > 5")
        >>> f.write_csv("/home/user/test-out") 

        Make sure to create the directory first! This will write out a list of CSVs to /home/user/test-out.
    """

    assert "*" not in table_location, "* not supported, just supply the path."

    if self.materialized:
        df = self._get_materialized_df()
        df.write_csv(table_location)
        return

    if table_location[:5] == "s3://":

        if type(self.quokka_context.cluster) == LocalCluster:
            print(
                "Warning: trying to write S3 dataset on local machine. This assumes high network bandwidth.")

        table_location = table_location[5:]
        bucket = table_location.split("/")[0]
        try:
            client = boto3.client("s3")
            region = client.get_bucket_location(Bucket=bucket)["LocationConstraint"]
        except:
            raise Exception("Bucket does not exist.")
        executor = OutputExecutor(
            table_location, "csv", region=region, row_group_size=output_line_limit)

    else:

        if type(self.quokka_context.cluster) == EC2Cluster:
            raise NotImplementedError(
                "Does not support wQuokkariting local dataset with S3 cluster. Must use S3 bucket.")

        assert table_location[0] == "/", "You must supply absolute path to directory."
        assert os.path.isdir(
            table_location), "Must supply an existing directory"

        executor = OutputExecutor(
            table_location, "csv", region="local", row_group_size=output_line_limit)

    name_stream = self.quokka_context.new_stream(
        sources={0: self},
        partitioners={0: PassThroughPartitioner()},
        node=StatefulNode(
            schema=["filename"],
            # this is a stateful node, but predicates and projections can be pushed down.
            schema_mapping={"filename": {-1: "filename"}},
            required_columns={0: set(self.schema)},
            operator=executor
        ),
        schema=["filename"],

    )

    return name_stream