Skip to content

DataStream.union

The union of two streams is a stream that contains all the elements of both streams. The two streams must have the same schema. Note: since DataStreams are not ordered, you should not make any assumptions on the ordering of the rows in the result.

Parameters:

Name Type Description Default
other DataStream

another DataStream of the same schema.

required
Return

A new DataStream of the same schema as the two streams, with rows from both.

Examples:

>>> d = qc.read_csv("lineitem.csv")
>>> d1 = qc.read_csv("lineitem1.csv")
>>> d1 = d.union(d1)
Source code in pyquokka/datastream.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
def union(self, other):

    """
    The union of two streams is a stream that contains all the elements of both streams.
    The two streams must have the same schema.
    Note: since DataStreams are not ordered, you should not make any assumptions on the ordering of the rows in the result.

    Args:
        other (DataStream): another DataStream of the same schema.

    Return:
        A new DataStream of the same schema as the two streams, with rows from both.

    Examples:

        >>> d = qc.read_csv("lineitem.csv")
        >>> d1 = qc.read_csv("lineitem1.csv")
        >>> d1 = d.union(d1)

    """

    assert self.schema == other.schema
    assert self.sorted == other.sorted

    class UnionExecutor(Executor):
        def __init__(self) -> None:
            self.state = None
        def execute(self,batches,stream_id, executor_id):
            return pa.concat_tables(batches)
        def done(self,executor_id):
            return

    executor = UnionExecutor()

    node = StatefulNode(
        schema=self.schema,
        # cannot push through any predicates or projections!
        schema_mapping={col: {0: col, 1: col} for col in self.schema},
        required_columns={0: set(), 1: set()},
        operator=executor
    )

    return self.quokka_context.new_stream(
        sources={0: self, 1: other},
        partitioners={0: PassThroughPartitioner(), 1: PassThroughPartitioner()},
        node=node,
        schema=self.schema,
        sorted=None
    )