Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,121 @@ public DataFrame unnestColumns(UnnestOptions options, String... columns) {
return new DataFrame(unnestColumns(nativeHandle, columns, options.preserveNulls()));
}

/**
* Equi-join this DataFrame with {@code right} on the named columns, using the given {@link
* JoinType}. The receiver and {@code right} both remain usable and must still be closed
* independently.
*
* <p>Equivalent to SQL {@code left <type> JOIN right ON l.leftCols[0] = r.rightCols[0] AND ...}.
* {@code leftCols} and {@code rightCols} must have the same length.
*
* @throws IllegalArgumentException if any argument is {@code null} or {@code leftCols.length !=
* rightCols.length}.
* @throws IllegalStateException if either DataFrame is closed or already collected.
* @throws RuntimeException if join planning fails (column collision in the combined schema,
* unknown column names, etc.).
*/
public DataFrame join(DataFrame right, JoinType type, String[] leftCols, String[] rightCols) {
checkJoinArgs(right, type, leftCols, rightCols);
return new DataFrame(
joinDataFrame(nativeHandle, right.nativeHandle, type.code(), leftCols, rightCols, null));
}

/**
* Equi-join this DataFrame with {@code right}, restricting the result with a residual SQL filter
* parsed against the <em>combined</em> schema (left columns followed by right columns; columns
* may be qualified with the relation alias when ambiguous). The receiver and {@code right} both
* remain usable and must still be closed independently.
*
* <p>For outer joins, the filter is applied only to matched rows; unmatched rows are passed
* through with nulls on the unmatched side, matching DataFusion's semantics.
*
* @throws IllegalArgumentException if any argument is {@code null} or {@code leftCols.length !=
* rightCols.length}.
* @throws IllegalStateException if either DataFrame is closed or already collected.
* @throws RuntimeException if join planning or filter parsing fails.
*/
public DataFrame join(
DataFrame right, JoinType type, String[] leftCols, String[] rightCols, String filter) {
checkJoinArgs(right, type, leftCols, rightCols);
if (filter == null) {
throw new IllegalArgumentException("join filter must be non-null");
}
return new DataFrame(
joinDataFrame(nativeHandle, right.nativeHandle, type.code(), leftCols, rightCols, filter));
}

/**
* Join this DataFrame with {@code right} using arbitrary SQL predicates parsed against the
* <em>combined</em> schema. Each predicate is parsed independently and the join evaluates their
* conjunction. Predicates may reference columns from either side and may be qualified with the
* relation alias when ambiguous (e.g. {@code "left.x = right.x"}). The receiver and {@code right}
* both remain usable and must still be closed independently.
*
* <p>DataFusion's optimiser identifies and rewrites equality predicates into hash-join keys
* automatically, so {@code joinOn(right, INNER, "l.id = r.id")} plans equivalently to {@link
* #join(DataFrame, JoinType, String[], String[])} with a single key. Use {@code joinOn} when the
* predicate is not a simple equality, e.g. inequality joins or range conditions.
*
* @throws IllegalArgumentException if {@code right} or {@code type} is {@code null}, or {@code
* predicates} is {@code null} or empty, or any predicate is {@code null}.
* @throws IllegalStateException if either DataFrame is closed or already collected.
* @throws RuntimeException if predicate parsing or join planning fails.
*/
public DataFrame joinOn(DataFrame right, JoinType type, String... predicates) {
if (right == null) {
throw new IllegalArgumentException("joinOn right must be non-null");
}
if (type == null) {
throw new IllegalArgumentException("joinOn type must be non-null");
}
if (predicates == null || predicates.length == 0) {
throw new IllegalArgumentException("joinOn predicates must be non-null and non-empty");
}
for (String p : predicates) {
if (p == null) {
throw new IllegalArgumentException("joinOn predicates must not contain null");
}
}
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (right.nativeHandle == 0) {
throw new IllegalStateException("right DataFrame is closed or already collected");
}
return new DataFrame(
joinOnDataFrame(nativeHandle, right.nativeHandle, type.code(), predicates));
}

private void checkJoinArgs(
DataFrame right, JoinType type, String[] leftCols, String[] rightCols) {
if (right == null) {
throw new IllegalArgumentException("join right must be non-null");
}
if (type == null) {
throw new IllegalArgumentException("join type must be non-null");
}
if (leftCols == null) {
throw new IllegalArgumentException("join leftCols must be non-null");
}
if (rightCols == null) {
throw new IllegalArgumentException("join rightCols must be non-null");
}
if (leftCols.length != rightCols.length) {
throw new IllegalArgumentException(
"join leftCols and rightCols must have the same length, got "
+ leftCols.length
+ " and "
+ rightCols.length);
}
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (right.nativeHandle == 0) {
throw new IllegalStateException("right DataFrame is closed or already collected");
}
}

/**
* Materialize this DataFrame as Parquet at {@code path}. The path is treated as a directory
* unless overridden via {@link ParquetWriteOptions#singleFileOutput(boolean)}. The receiver
Expand Down Expand Up @@ -472,6 +587,17 @@ public void close() {

private static native long unnestColumns(long handle, String[] columns, boolean preserveNulls);

private static native long joinDataFrame(
long leftHandle,
long rightHandle,
byte joinType,
String[] leftCols,
String[] rightCols,
String filter);

private static native long joinOnDataFrame(
long leftHandle, long rightHandle, byte joinType, String[] predicates);

private static native void writeParquetWithOptions(
long handle,
String path,
Expand Down
61 changes: 61 additions & 0 deletions core/src/main/java/org/apache/datafusion/JoinType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

/**
* Join algorithm requested for {@link DataFrame#join} / {@link DataFrame#joinOn}. Mirrors
* DataFusion's {@code JoinType} enum one-to-one.
*
* <ul>
* <li>{@link #INNER} — rows where the join condition matches in both sides.
* <li>{@link #LEFT} / {@link #RIGHT} — outer joins; unmatched rows on the named side are kept and
* padded with nulls on the other side.
* <li>{@link #FULL} — full outer join; unmatched rows from either side are kept with nulls.
* <li>{@link #LEFT_SEMI} / {@link #RIGHT_SEMI} — returns rows from the named side that have at
* least one match on the other; only the named side's columns appear in the output.
* <li>{@link #LEFT_ANTI} / {@link #RIGHT_ANTI} — returns rows from the named side that have no
* match on the other; only the named side's columns appear in the output.
* <li>{@link #LEFT_MARK} / {@link #RIGHT_MARK} — returns one row per row of the named side, with
* an additional boolean {@code mark} column indicating whether the join condition matched.
* </ul>
*/
public enum JoinType {
INNER((byte) 0),
LEFT((byte) 1),
RIGHT((byte) 2),
FULL((byte) 3),
LEFT_SEMI((byte) 4),
RIGHT_SEMI((byte) 5),
LEFT_ANTI((byte) 6),
RIGHT_ANTI((byte) 7),
LEFT_MARK((byte) 8),
RIGHT_MARK((byte) 9);

private final byte code;

JoinType(byte code) {
this.code = code;
}

/** Stable byte code for FFI. */
public byte code() {
return code;
}
}
Loading
Loading