Join a DataStream with another DataStream. This may result in a distributed hash join or a broadcast join depending on cardinality estimates.
Parameters:
Name
Type
Description
Default
right
DataStream
the DataStream to join to.
required
on
str
You could either specify this, if the join column has the same name in this DataStream and right, or left_on and right_on
if the join columns don't have the same name.
None
left_on
str
the name of the join column in this DataStream.
None
right_on
str
the name of the join column in right.
None
suffix
str
if right has columns with the same names as columns in this DataStream, their names will be appended with the suffix in the result.
'_2'
how
str
supports "inner", "left", "semi" or "anti"
'inner'
Return
A new DataStream that's the joined result of this DataStream and "right". By default, columns from both side will be retained,
except for right_on from the right side.
defjoin(self,right,on=None,left_on=None,right_on=None,suffix="_2",how="inner",maintain_sort_order=None):""" Join a DataStream with another DataStream. This may result in a distributed hash join or a broadcast join depending on cardinality estimates. Args: right (DataStream): the DataStream to join to. on (str): You could either specify this, if the join column has the same name in this DataStream and `right`, or `left_on` and `right_on` if the join columns don't have the same name. left_on (str): the name of the join column in this DataStream. right_on (str): the name of the join column in `right`. suffix (str): if `right` has columns with the same names as columns in this DataStream, their names will be appended with the suffix in the result. how (str): supports "inner", "left", "semi" or "anti" Return: A new DataStream that's the joined result of this DataStream and "right". By default, columns from both side will be retained, except for `right_on` from the right side. Examples: >>> lineitem = qc.read_csv("lineitem.csv") >>> orders = qc.read_csv("orders.csv") >>> result = lineitem.join(orders, left_on = "l_orderkey", right_on = "o_orderkey") >>> result = result.select(["o_orderkey"]) """asserthowin{"inner","left","semi","anti"}assertissubclass(type(right),DataStream),"must join against a Quokka DataStream"ifmaintain_sort_orderisnotNone:asserthowin{"inner","left"}# our broadcast join strategy should automatically satisfy this, no need to do anything specialiftype(right)==polars.DataFrame:assertmaintain_sort_order=="left"assertself.sortedisnotNoneelse:assertmaintain_sort_orderin{"left","right"}ifmaintain_sort_order=="left":assertself.sortedisnotNoneelse:assertright.sortedisnotNoneifhow=="left":assertmaintain_sort_order=="right","in a left join, can only maintain order of the right table"#if type(right) == polars.DataFrame and right.to_arrow().nbytes > 10485760:# raise Exception("You cannot join a DataStream against a Polars DataFrame more than 10MB in size. Sorry.")ifonisNone:assertleft_onisnotNoneandright_onisnotNoneassertleft_oninself.schema,"join key not found in left table"assertright_oninright.schema,"join key not found in right table"else:assertoninself.schema,"join key not found in left table"assertoninright.schema,"join key not found in right table"left_on=onright_on=onon=None# we can't do this check since schema is now a list of names with no type info. This should change in the future.#assert node1.schema[left_on] == node2.schema[right_on], "join column has different schema in tables"new_schema=self.schema.copy()ifself.materialized:schema_mapping={col:{-1:col}forcolinself.schema}else:schema_mapping={col:{0:col}forcolinself.schema}# if the right table is already materialized, the schema mapping should forget about it since we can't push anything down anyways.# an optimization could be to push down the predicate directly to the materialized Polars DataFrame in the BroadcastJoinExecutor# leave this as a TODO. this could be greatly benenficial if it significantly reduces the size of the small table.ifright.materialized:right_table_id=-1else:right_table_id=1rename_dict={}# import pdb;pdb.set_trace()right_cols=right.schemaifhownotin{"semi","anti"}else[right_on]forcolinright_cols:ifcol==right_on:continueifcolinnew_schema:assertcol+ \
suffixnotinnew_schema,("the suffix was not enough to guarantee unique col names",col+suffix,new_schema)new_schema.append(col+suffix)schema_mapping[col+suffix]={right_table_id:col+suffix}rename_dict[col]=col+suffixelse:new_schema.append(col)schema_mapping[col]={right_table_id:col}# you only need the key column on the RHS! select overloads in DataStream or Polars DataFrame runtime polymorphicifhow=="semi"orhow=="anti":right=right.select([right_on])iflen(rename_dict)>0:right=right.rename(rename_dict)if(notself.materializedandnotright.materialized)or(self.materializedandnotright.materializedandhow!="inner"):# if self.materialized, rewrite the schema_mappingforcolinschema_mapping:iflist(schema_mapping[col].keys())[0]==-1:schema_mapping[col]={0:col}ifmaintain_sort_orderisNone:assume_sorted={}elifmaintain_sort_order=="left":assume_sorted={0:True}else:assume_sorted={1:True}returnself.quokka_context.new_stream(sources={0:self,1:right},partitioners={0:HashPartitioner(left_on),1:HashPartitioner(right_on)},node=JoinNode(schema=new_schema,schema_mapping=schema_mapping,required_columns={0:{left_on},1:{right_on}},join_spec=(how,{0:left_on,1:right_on}),assume_sorted=assume_sorted),schema=new_schema,)elifself.materializedandnotright.materialized:asserthowin{"inner"}new_schema.remove(left_on)new_schema=[right_on]+new_schemadelschema_mapping[left_on]schema_mapping[right_on]={1:right_on}new_stream=self.quokka_context.new_stream(sources={1:right},partitioners={1:PassThroughPartitioner()},node=BroadcastJoinNode(schema=new_schema,schema_mapping=schema_mapping,required_columns={1:{right_on}},operator=BroadcastJoinExecutor(self._get_materialized_df(),small_on=left_on,big_on=right_on,suffix=suffix,how=how)),schema=new_schema,)ifright_on==left_on:returnnew_streamelse:returnnew_stream.rename({right_on:left_on})elifnotself.materializedandright.materialized:returnself.quokka_context.new_stream(sources={0:self},partitioners={0:PassThroughPartitioner()},node=BroadcastJoinNode(schema=new_schema,schema_mapping=schema_mapping,required_columns={0:{left_on}},operator=BroadcastJoinExecutor(right._get_materialized_df(),small_on=right_on,big_on=left_on,suffix=suffix,how=how)),schema=new_schema,)else:right_df=right._get_materialized_df()left_df=self._get_materialized_df()result=left_df.join(right_df,how=how,left_on=left_on,right_on=right_on,suffix=suffix)returnself.quokka_context.from_polars(result)