Skip to content

QuokkaContext.read_csv

Read in a CSV file or files from a table location. It can be a single CSV or a list of CSVs. It can be CSV(s) on disk or CSV(s) on S3. Currently other clouds are not supported. The CSVs can have a predefined schema using a list of column names in the schema argument, or you can specify the CSV has a header row and Quokka will read the schema from it. You should also specify the CSV's separator.

Parameters:

Name Type Description Default
table_location str

where the CSV(s) are. This mostly mimics Spark behavior. Look at the examples.

required
schema list

you can provide a list of column names, it's kinda like polars.read_csv(new_columns=...)

None
has_header bool

is there a header row. If the schema is not provided, this should be True. If the schema IS provided, this can still be True. Quokka will just ignore the header row.

False
sep str

default to ',' but could be something else, like '|' for TPC-H tables

','
Return

A DataStream.

Examples:

Read a single CSV. It's better always to specify the absolute path.

>>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem.csv")

Read a directory of CSVs

>>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem/*")

Read a single CSV from S3

>>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/lineitem.tbl.1")

Read CSVs from S3 bucket with prefix

>>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/*")
~~~
Source code in pyquokka/df.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def read_csv(self, table_location: str, schema = None, has_header = False, sep=","):

    """
    Read in a CSV file or files from a table location. It can be a single CSV or a list of CSVs. It can be CSV(s) on disk
    or CSV(s) on S3. Currently other clouds are not supported. The CSVs can have a predefined schema using a list of 
    column names in the schema argument, or you can specify the CSV has a header row and Quokka will read the schema 
    from it. You should also specify the CSV's separator. 

    Args:
        table_location (str): where the CSV(s) are. This mostly mimics Spark behavior. Look at the examples.
        schema (list): you can provide a list of column names, it's kinda like polars.read_csv(new_columns=...)
        has_header (bool): is there a header row. If the schema is not provided, this should be True. If the schema IS provided, 
            this can still be True. Quokka will just ignore the header row.
        sep (str): default to ',' but could be something else, like '|' for TPC-H tables

    Return:
        A DataStream.

    Examples:

        Read a single CSV. It's better always to specify the absolute path.
        >>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem.csv")

        Read a directory of CSVs 
        >>> lineitem = qc.read_csv("/home/ubuntu/tpch/lineitem/*")

        Read a single CSV from S3
        >>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/lineitem.tbl.1")

        Read CSVs from S3 bucket with prefix
        >>> lineitem = qc.read_csv("s3://tpc-h-csv/lineitem/*")
        ~~~
    """

    def get_schema_from_bytes(resp):
        first_newline = resp.find(bytes('\n', 'utf-8'))
        if first_newline == -1:
            raise Exception("could not detect the first line break with first 4 kb")
        resp = resp[:first_newline]
        if resp[-1] == sep:
            resp = resp[:-1]
        schema = resp.decode("utf-8").split(sep)
        return schema

    def return_materialized_stream(where, my_schema = None):
        if has_header:
            df = polars.read_csv(where, has_header = True,separator = sep)
        else:
            df = polars.read_csv(where, new_columns = my_schema, has_header = False,separator = sep)
        self.nodes[self.latest_node_id] = InputPolarsNode(df)
        self.latest_node_id += 1
        return DataStream(self, df.columns, self.latest_node_id - 1, materialized=True)

    if schema is None:
        assert has_header, "if not provide schema, must have header."
    if schema is not None and has_header:
        print("You specified a schema as well as a header. Quokka should use your schema and ignore the names in the header.")

    if table_location[:5] == "s3://":

        if type(self.cluster) == LocalCluster:
            print("Warning: trying to read S3 dataset on local machine. This assumes high network bandwidth.")

        table_location = table_location[5:]
        bucket = table_location.split("/")[0]
        if "*" in table_location:
            assert table_location[-1] == "*" , "wildcard can only be the last character in address string"
            prefix = "/".join(table_location[:-1].split("/")[1:])

            s3 = boto3.client('s3')
            z = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
            if 'Contents' not in z:
                raise Exception("Wrong S3 path")
            files = [i['Key'] for i in z['Contents']]
            sizes = [i['Size'] for i in z['Contents']]
            assert 'NextContinuationToken' not in z, "too many files in S3 bucket"
            assert len(files) > 0, "no files under prefix"

            if schema is None:
                resp = s3.get_object(
                    Bucket=bucket, Key=files[0], Range='bytes={}-{}'.format(0, 4096))['Body'].read()
                schema = get_schema_from_bytes(resp)

            # if there are more than one files, there could be header problems. just do one file right now
            if len(files) == 1 and sizes[0] < self.sql_config["s3_csv_materialize_threshold"]:
                return return_materialized_stream("s3://" + bucket + "/" + files[0], schema)

            token = ray.get(self.catalog.register_s3_csv_source.remote(bucket, files[0], schema, sep, sum(sizes)))
            self.nodes[self.latest_node_id] = InputS3CSVNode(bucket, prefix, None, schema, sep, has_header)
            self.nodes[self.latest_node_id].set_catalog_id(token)

        else:
            key = "/".join(table_location.split("/")[1:])
            s3 = boto3.client('s3')
            try:
                response = s3.head_object(Bucket= bucket, Key=key)
                size = response['ContentLength']
            except:
                raise Exception("CSV not found in S3!")
            if schema is None:
                resp = s3.get_object(
                    Bucket=bucket, Key=key, Range='bytes={}-{}'.format(0, 4096))['Body'].read()
                schema = get_schema_from_bytes(resp)

            if size < 10 * 1048576:
                return return_materialized_stream("s3://" + table_location, schema)

            token = ray.get(self.catalog.register_s3_csv_source.remote(bucket, key, schema, sep, size))
            self.nodes[self.latest_node_id] = InputS3CSVNode(bucket, None, key, schema, sep, has_header)
            self.nodes[self.latest_node_id].set_catalog_id(token)
    else:

        if type(self.cluster) == EC2Cluster:
            raise NotImplementedError("Does not support reading local dataset with S3 cluster. Must use S3 bucket.")

        if "*" in table_location:
            table_location = table_location[:-1]
            assert table_location[-1] == "/", "must specify * with entire directory, doesn't support prefixes yet"
            try:
                files = [i for i in os.listdir(table_location)]
            except:
                raise Exception("Tried to get list of files at ", table_location, " failed. Make sure specify absolute path")
            assert len(files) > 0

            if schema is None:
                resp = open(table_location + files[0],"rb").read(1024 * 4)
                schema = get_schema_from_bytes(resp)

            if len(files) == 1 and os.path.getsize(table_location + files[0]) < self.sql_config["disk_csv_materialize_threshold"]:
                return return_materialized_stream(table_location + files[0], schema)

            token = ray.get(self.catalog.register_disk_csv_source.remote(table_location, schema, sep))
            self.nodes[self.latest_node_id] = InputDiskCSVNode(table_location, schema, sep, has_header)
            self.nodes[self.latest_node_id].set_catalog_id(token)
        else:
            size = os.path.getsize(table_location)
            if schema is None:
                resp = open(table_location,"rb").read(1024 * 4)
                schema = get_schema_from_bytes(resp)
            if size < self.sql_config["disk_csv_materialize_threshold"]:
                return return_materialized_stream(table_location, schema)

            token = ray.get(self.catalog.register_disk_csv_source.remote(table_location, schema, sep))
            self.nodes[self.latest_node_id] = InputDiskCSVNode(table_location, schema, sep, has_header)
            self.nodes[self.latest_node_id].set_catalog_id(token)

    self.latest_node_id += 1
    return DataStream(self, schema, self.latest_node_id - 1)