Skip to content

QuokkaContext.read_iceberg

Must have pyiceberg installed on the client. This is a new API. This only supports AWS Glue catalog so far.

Parameters:

Name Type Description Default
table str

The table name to read from. This is the full table name, not just the table name. For example, if the table is in the database "default" and the table name is "my_table", then the table name should be "default.my_table".

required
snapshot int

The snapshot ID to read from. If this is not specified, the latest snapshot will be read from.

None

Returns:

Name Type Description
DataStream

The DataStream created from the iceberg table.

Examples:

>>> from pyquokka.df import QuokkaContext
>>> qc = QuokkaContext()
>>> stream = qc.read_iceberg("default.my_table")
Source code in pyquokka/df.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
def read_iceberg(self, table, snapshot = None):

    """
    Must have pyiceberg installed on the client. This is a new API. This only supports AWS Glue catalog so far. 

    Args:
        table (str): The table name to read from. This is the full table name, not just the table name. For example, if the table is in the database "default" and the table name is "my_table", then the table name should be "default.my_table".
        snapshot (int): The snapshot ID to read from. If this is not specified, the latest snapshot will be read from.

    Returns:
        DataStream: The DataStream created from the iceberg table.

    Examples:

        >>> from pyquokka.df import QuokkaContext
        >>> qc = QuokkaContext()
        >>> stream = qc.read_iceberg("default.my_table")
    """

    from pyiceberg.catalog import load_catalog
    catalog = load_catalog("glue", **{"type":"glue"})
    try:
        table = catalog.load_table(table)
    except:
        raise Exception("table not found")
    if snapshot is not None:
        assert snapshot in [i.snapshot_id for i in table.metadata.snapshots], "snapshot not found"

    self.nodes[self.latest_node_id] = InputS3IcebergNode(table, snapshot)
    self.latest_node_id += 1
    return DataStream(self, [i.name for i in table.schema().fields], self.latest_node_id - 1)