This will write out the entire contents of the DataStream to a list of Parquets.
Parameters:
Name
Type
Description
Default
table_location
str
the root directory to write the output Parquets to. Similar to Spark, Quokka by default
writes out a directory of Parquets instead of dumping all the results to a single Parquet so the output can be
done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit
parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk.
required
output_line_limit
int
the row group size in each output file.
5000000
Return
DataStream containing the filenames of the Parquets that were produced.
Examples:
>>> f=qc.read_csv("lineitem.csv")>>> f=f.filter("l_orderkey < 10 and l_partkey > 5")>>> f.write_parquet("/home/user/test-out")
You should create the directory before hand! This will write out a list of Parquets to /home/user/test-out.
defwrite_parquet(self,table_location,output_line_limit=5000000):""" This will write out the entire contents of the DataStream to a list of Parquets. Args: table_location (str): the root directory to write the output Parquets to. Similar to Spark, Quokka by default writes out a directory of Parquets instead of dumping all the results to a single Parquet so the output can be done in parallel. If your dataset is small and you want a single file, you can adjust the output_line_limit parameter. Example table_locations: s3://bucket/prefix for cloud, absolute path /home/user/files for disk. output_line_limit (int): the row group size in each output file. Return: DataStream containing the filenames of the Parquets that were produced. Examples: >>> f = qc.read_csv("lineitem.csv") >>> f = f.filter("l_orderkey < 10 and l_partkey > 5") >>> f.write_parquet("/home/user/test-out") You should create the directory before hand! This will write out a list of Parquets to /home/user/test-out. """ifself.materialized:df=self._get_materialized_df()df.write_parquet(table_location)returniftable_location[:5]=="s3://":iftype(self.quokka_context.cluster)==LocalCluster:print("Warning: trying to write S3 dataset on local machine. This assumes high network bandwidth.")table_location=table_location[5:]bucket=table_location.split("/")[0]try:client=boto3.client("s3")region=client.get_bucket_location(Bucket=bucket)["LocationConstraint"]except:raiseException("Bucket does not exist.")executor=OutputExecutor(table_location,"parquet",region=region,row_group_size=output_line_limit)else:iftype(self.quokka_context.cluster)==EC2Cluster:raiseNotImplementedError("Does not support writing local dataset with S3 cluster. Must use S3 bucket.")asserttable_location[0]=="/","You must supply absolute path to directory."executor=OutputExecutor(table_location,"parquet",region="local",row_group_size=output_line_limit)name_stream=self.quokka_context.new_stream(sources={0:self},partitioners={0:PassThroughPartitioner()},node=StatefulNode(schema=["filename"],# this is a stateful node, but predicates and projections can be pushed down.schema_mapping={"filename":{-1:"filename"}},required_columns={0:set(self.schema)},operator=executor),schema=["filename"],)returnname_stream