Read in a CSV file or files from a table location. It can be a single CSV or a list of CSVs. It can be CSV(s) on disk
or CSV(s) on S3. Currently other clouds are not supported. The CSVs can have a predefined schema using a list of
column names in the schema argument, or you can specify the CSV has a header row and Quokka will read the schema
from it. You should also specify the CSV's separator.
Parameters:
Name
Type
Description
Default
table_location
str
where the CSV(s) are. This mostly mimics Spark behavior. Look at the examples.
required
schema
list
you can provide a list of column names, it's kinda like polars.read_csv(new_columns=...)
None
has_header
bool
is there a header row. If the schema is not provided, this should be True. If the schema IS provided,
this can still be True. Quokka will just ignore the header row.
False
sep
str
default to ',' but could be something else, like '|' for TPC-H tables
','
Return
A DataStream.
Examples:
Read a single CSV. It's better always to specify the absolute path.
defread_csv(self,table_location:str,schema=None,has_header=False,sep=","):""" Read in a CSV file or files from a table location. It can be a single CSV or a list of CSVs. It can be CSV(s) on disk or CSV(s) on S3. Currently other clouds are not supported. The CSVs can have a predefined schema using a list of column names in the schema argument, or you can specify the CSV has a header row and Quokka will read the schema from it. You should also specify the CSV's separator. Args: table_location (str): where the CSV(s) are. This mostly mimics Spark behavior. Look at the examples. schema (list): you can provide a list of column names, it's kinda like polars.read_csv(new_columns=...) has_header (bool): is there a header row. If the schema is not provided, this should be True. If the schema IS provided, this can still be True. Quokka will just ignore the header row. sep (str): default to ',' but could be something else, like '|' for TPC-H tables Return: A DataStream. Examples: Read a single CSV. It's better always to specify the absolute path. >>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem.csv") Read a directory of CSVs >>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem/*") Read a single CSV from S3 >>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/lineitem.tbl.1") Read CSVs from S3 bucket with prefix >>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/*") ~~~ """defget_schema_from_bytes(resp):first_newline=resp.find(bytes('\n','utf-8'))iffirst_newline==-1:raiseException("could not detect the first line break with first 4 kb")resp=resp[:first_newline]ifresp[-1]==sep:resp=resp[:-1]schema=resp.decode("utf-8").split(sep)returnschemadefreturn_materialized_stream(where,my_schema=None):ifhas_header:df=polars.read_csv(where,has_header=True,separator=sep)else:df=polars.read_csv(where,new_columns=my_schema,has_header=False,separator=sep)self.nodes[self.latest_node_id]=InputPolarsNode(df)self.latest_node_id+=1returnDataStream(self,df.columns,self.latest_node_id-1,materialized=True)ifschemaisNone:asserthas_header,"if not provide schema, must have header."ifschemaisnotNoneandhas_header:print("You specified a schema as well as a header. Quokka should use your schema and ignore the names in the header.")iftable_location[:5]=="s3://":iftype(self.cluster)==LocalCluster:print("Warning: trying to read S3 dataset on local machine. This assumes high network bandwidth.")table_location=table_location[5:]bucket=table_location.split("/")[0]if"*"intable_location:asserttable_location[-1]=="*","wildcard can only be the last character in address string"prefix="/".join(table_location[:-1].split("/")[1:])s3=boto3.client('s3')z=s3.list_objects_v2(Bucket=bucket,Prefix=prefix)if'Contents'notinz:raiseException("Wrong S3 path")files=[i['Key']foriinz['Contents']]sizes=[i['Size']foriinz['Contents']]assert'NextContinuationToken'notinz,"too many files in S3 bucket"assertlen(files)>0,"no files under prefix"ifschemaisNone:resp=s3.get_object(Bucket=bucket,Key=files[0],Range='bytes={}-{}'.format(0,4096))['Body'].read()schema=get_schema_from_bytes(resp)# if there are more than one files, there could be header problems. just do one file right nowiflen(files)==1andsizes[0]<self.sql_config["s3_csv_materialize_threshold"]:returnreturn_materialized_stream("s3://"+bucket+"/"+files[0],schema)token=ray.get(self.catalog.register_s3_csv_source.remote(bucket,files[0],schema,sep,sum(sizes)))self.nodes[self.latest_node_id]=InputS3CSVNode(bucket,prefix,None,schema,sep,has_header)self.nodes[self.latest_node_id].set_catalog_id(token)else:key="/".join(table_location.split("/")[1:])s3=boto3.client('s3')try:response=s3.head_object(Bucket=bucket,Key=key)size=response['ContentLength']except:raiseException("CSV not found in S3!")ifschemaisNone:resp=s3.get_object(Bucket=bucket,Key=key,Range='bytes={}-{}'.format(0,4096))['Body'].read()schema=get_schema_from_bytes(resp)ifsize<10*1048576:returnreturn_materialized_stream("s3://"+table_location,schema)token=ray.get(self.catalog.register_s3_csv_source.remote(bucket,key,schema,sep,size))self.nodes[self.latest_node_id]=InputS3CSVNode(bucket,None,key,schema,sep,has_header)self.nodes[self.latest_node_id].set_catalog_id(token)else:iftype(self.cluster)==EC2Cluster:raiseNotImplementedError("Does not support reading local dataset with S3 cluster. Must use S3 bucket.")if"*"intable_location:table_location=table_location[:-1]asserttable_location[-1]=="/","must specify * with entire directory, doesn't support prefixes yet"try:files=[iforiinos.listdir(table_location)]except:raiseException("Tried to get list of files at ",table_location," failed. Make sure specify absolute path")assertlen(files)>0ifschemaisNone:resp=open(table_location+files[0],"rb").read(1024*4)schema=get_schema_from_bytes(resp)iflen(files)==1andos.path.getsize(table_location+files[0])<self.sql_config["disk_csv_materialize_threshold"]:returnreturn_materialized_stream(table_location+files[0],schema)token=ray.get(self.catalog.register_disk_csv_source.remote(table_location,schema,sep))self.nodes[self.latest_node_id]=InputDiskCSVNode(table_location,schema,sep,has_header)self.nodes[self.latest_node_id].set_catalog_id(token)else:size=os.path.getsize(table_location)ifschemaisNone:resp=open(table_location,"rb").read(1024*4)schema=get_schema_from_bytes(resp)ifsize<self.sql_config["disk_csv_materialize_threshold"]:returnreturn_materialized_stream(table_location,schema)token=ray.get(self.catalog.register_disk_csv_source.remote(table_location,schema,sep))self.nodes[self.latest_node_id]=InputDiskCSVNode(table_location,schema,sep,has_header)self.nodes[self.latest_node_id].set_catalog_id(token)self.latest_node_id+=1returnDataStream(self,schema,self.latest_node_id-1)