This will write out the entire contents of the DataStream to a list of CSVs.
Parameters:
Name
Type
Description
Default
table_location
str
the root directory to write the output CSVs to. Similar to Spark, Quokka by default
writes out a directory of CSVs instead of dumping all the results to a single CSV so the output can be
done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit
parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk.
required
output_line_limit
int
how many rows each CSV in the output should have. The current implementation simply buffers
this many rows in memory instead of using file appends, so you should have enough memory!
1000000
Return
DataStream containing the filenames of the CSVs that were produced.
Examples:
>>> f=qc.read_csv("lineitem.csv")>>> f=f.filter("l_orderkey < 10 and l_partkey > 5")>>> f.write_csv("/home/user/test-out")
Make sure to create the directory first! This will write out a list of CSVs to /home/user/test-out.
defwrite_csv(self,table_location,output_line_limit=1000000):""" This will write out the entire contents of the DataStream to a list of CSVs. Args: table_location (str): the root directory to write the output CSVs to. Similar to Spark, Quokka by default writes out a directory of CSVs instead of dumping all the results to a single CSV so the output can be done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk. output_line_limit (int): how many rows each CSV in the output should have. The current implementation simply buffers this many rows in memory instead of using file appends, so you should have enough memory! Return: DataStream containing the filenames of the CSVs that were produced. Examples: >>> f = qc.read_csv("lineitem.csv") >>> f = f.filter("l_orderkey < 10 and l_partkey > 5") >>> f.write_csv("/home/user/test-out") Make sure to create the directory first! This will write out a list of CSVs to /home/user/test-out. """assert"*"notintable_location,"* not supported, just supply the path."ifself.materialized:df=self._get_materialized_df()df.write_csv(table_location)returniftable_location[:5]=="s3://":iftype(self.quokka_context.cluster)==LocalCluster:print("Warning: trying to write S3 dataset on local machine. This assumes high network bandwidth.")table_location=table_location[5:]bucket=table_location.split("/")[0]try:client=boto3.client("s3")region=client.get_bucket_location(Bucket=bucket)["LocationConstraint"]except:raiseException("Bucket does not exist.")executor=OutputExecutor(table_location,"csv",region=region,row_group_size=output_line_limit)else:iftype(self.quokka_context.cluster)==EC2Cluster:raiseNotImplementedError("Does not support wQuokkariting local dataset with S3 cluster. Must use S3 bucket.")asserttable_location[0]=="/","You must supply absolute path to directory."assertos.path.isdir(table_location),"Must supply an existing directory"executor=OutputExecutor(table_location,"csv",region="local",row_group_size=output_line_limit)name_stream=self.quokka_context.new_stream(sources={0:self},partitioners={0:PassThroughPartitioner()},node=StatefulNode(schema=["filename"],# this is a stateful node, but predicates and projections can be pushed down.schema_mapping={"filename":{-1:"filename"}},required_columns={0:set(self.schema)},operator=executor),schema=["filename"],)returnname_stream