Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,155 @@ public DataFrame unnestColumns(UnnestOptions options, String... columns) {
return new DataFrame(unnestColumns(nativeHandle, columns, options.preserveNulls()));
}

// -- Set operations ------------------------------------------------------
//
// The Java method names mirror DataFusion's Rust API verbatim. SQL semantics:
//
// union = UNION ALL (positional, keeps duplicates)
// unionDistinct = UNION (positional, deduplicated)
// unionByName = UNION ALL by column name; missing columns become NULL
// unionByNameDistinct = UNION by column name; missing columns become NULL
// intersect = INTERSECT ALL (keeps duplicates)
// intersectDistinct = INTERSECT (deduplicated)
// except = EXCEPT ALL (keeps duplicates)
// exceptDistinct = EXCEPT (deduplicated)
//
// Note: the *_distinct variants deduplicate, while the unsuffixed methods keep
// duplicates. This is the inverse of Spark's convention, where `intersect`
// deduplicates and `intersectAll` keeps duplicates -- consult the Javadoc on
// each method to confirm semantics before porting Spark code.
//
// None of these methods consume the receiver or {@code other}; both DataFrames
// remain usable after the call. The native side clones the LogicalPlan on
// each side, which is cheap (LogicalPlan is Arc-backed in DataFusion).

/**
* Concatenate this DataFrame with {@code other} by column position, keeping all duplicates (SQL
* {@code UNION ALL}). The two schemas must match positionally. Both this DataFrame and {@code
* other} remain usable after the call and must still be closed independently.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame union(DataFrame other) {
return new DataFrame(unionRows(nativeHandle, otherHandle("union", other)));
}

/**
* Concatenate this DataFrame with {@code other} by column position, removing duplicates (SQL
* {@code UNION DISTINCT} -- equivalent to plain {@code UNION} in standard SQL). Both DataFrames
* remain usable.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame unionDistinct(DataFrame other) {
return new DataFrame(unionDistinctRows(nativeHandle, otherHandle("unionDistinct", other)));
}

/**
* Concatenate this DataFrame with {@code other} by column name, keeping all duplicates. Columns
* present in only one side are filled with NULL on the other. Both DataFrames remain usable.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if column types disagree on a shared name.
*/
public DataFrame unionByName(DataFrame other) {
return new DataFrame(unionByNameRows(nativeHandle, otherHandle("unionByName", other)));
}

/**
* Concatenate this DataFrame with {@code other} by column name, removing duplicates. Columns
* present in only one side are filled with NULL on the other. Both DataFrames remain usable.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if column types disagree on a shared name.
*/
public DataFrame unionByNameDistinct(DataFrame other) {
return new DataFrame(
unionByNameDistinctRows(nativeHandle, otherHandle("unionByNameDistinct", other)));
}

/**
* Rows present in both this DataFrame and {@code other}, keeping duplicates from the receiver
* (SQL {@code INTERSECT ALL}). Both schemas must match positionally. Both DataFrames remain
* usable.
*
* <p><strong>Implementation note:</strong> DataFusion implements {@code INTERSECT ALL} as a
* left-semi join on equality, not as standard SQL bag intersection. A left row is kept iff any
* matching row exists in {@code other}. With {@code left = (1, 2, 2, 3)} and {@code right = (2,
* 3)}, the result is {@code (2, 2, 3)} -- both copies of {@code 2} survive because each finds a
* match in {@code right}. PostgreSQL / Spark {@code INTERSECT ALL} would also yield {@code (2, 2,
* 3)} here, but the two engines diverge when {@code other} has fewer copies than {@code this} of
* a row that appears in both.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame intersect(DataFrame other) {
return new DataFrame(intersectRows(nativeHandle, otherHandle("intersect", other)));
}

/**
* Rows present in both this DataFrame and {@code other}, deduplicated (SQL {@code INTERSECT}).
* Both schemas must match positionally. Both DataFrames remain usable.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame intersectDistinct(DataFrame other) {
return new DataFrame(
intersectDistinctRows(nativeHandle, otherHandle("intersectDistinct", other)));
}

/**
* Rows present in this DataFrame but not in {@code other}, keeping duplicates from the receiver
* (SQL {@code EXCEPT ALL}). Both schemas must match positionally. Both DataFrames remain usable.
*
* <p><strong>Implementation note:</strong> DataFusion implements {@code EXCEPT ALL} as a
* left-anti join on equality, not as standard SQL bag difference. A left row is kept iff
* <em>no</em> matching row exists in {@code other} -- the multiplicity of matches is irrelevant.
* With {@code left = (1, 1, 2, 2, 3)} and {@code right = (1, 3)}, the result is {@code (2, 2)}:
* both copies of {@code 2} survive (no match in {@code right}); both copies of {@code 1} and the
* {@code 3} drop. PostgreSQL / Spark {@code EXCEPT ALL} would yield the same answer here, but the
* two engines diverge when {@code right} contains fewer copies than {@code left} of a row that
* appears in both.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame except(DataFrame other) {
return new DataFrame(exceptRows(nativeHandle, otherHandle("except", other)));
}

/**
* Rows present in this DataFrame but not in {@code other}, deduplicated (SQL {@code EXCEPT}).
* Both schemas must match positionally. Both DataFrames remain usable.
*
* @throws IllegalArgumentException if {@code other} is {@code null}.
* @throws RuntimeException if the schemas are incompatible.
*/
public DataFrame exceptDistinct(DataFrame other) {
return new DataFrame(exceptDistinctRows(nativeHandle, otherHandle("exceptDistinct", other)));
}

/**
* Validate the receiver and the other DataFrame and return {@code other.nativeHandle}. Common
* preamble for the eight set-operation methods so the validation logic stays in one place.
*/
private long otherHandle(String op, DataFrame other) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (other == null) {
throw new IllegalArgumentException(op + " other must be non-null");
}
if (other.nativeHandle == 0) {
throw new IllegalStateException(op + " other DataFrame is closed or already collected");
}
return other.nativeHandle;
}

/**
* Materialize this DataFrame as Parquet at {@code path}. The path is treated as a directory
* unless overridden via {@link ParquetWriteOptions#singleFileOutput(boolean)}. The receiver
Expand Down Expand Up @@ -354,6 +503,22 @@ public void close() {

private static native long unnestColumns(long handle, String[] columns, boolean preserveNulls);

private static native long unionRows(long handle, long otherHandle);

private static native long unionDistinctRows(long handle, long otherHandle);

private static native long unionByNameRows(long handle, long otherHandle);

private static native long unionByNameDistinctRows(long handle, long otherHandle);

private static native long intersectRows(long handle, long otherHandle);

private static native long intersectDistinctRows(long handle, long otherHandle);

private static native long exceptRows(long handle, long otherHandle);

private static native long exceptDistinctRows(long handle, long otherHandle);

private static native void writeParquetWithOptions(
long handle,
String path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ void chainFilterSelectCount() {

@Test
void methodsThrowAfterClose() {
try (SessionContext ctx = new SessionContext()) {
try (SessionContext ctx = new SessionContext();
DataFrame other = ctx.sql("SELECT 1 AS x")) {
DataFrame df = ctx.sql("SELECT 1 AS x");
df.close();
assertThrows(IllegalStateException.class, () -> df.select("x"));
Expand All @@ -135,6 +136,14 @@ void methodsThrowAfterClose() {
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
assertThrows(IllegalStateException.class, () -> df.union(other));
assertThrows(IllegalStateException.class, () -> df.unionDistinct(other));
assertThrows(IllegalStateException.class, () -> df.unionByName(other));
assertThrows(IllegalStateException.class, () -> df.unionByNameDistinct(other));
assertThrows(IllegalStateException.class, () -> df.intersect(other));
assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
assertThrows(IllegalStateException.class, () -> df.except(other));
assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
assertThrows(IllegalStateException.class, df::count);
assertThrows(IllegalStateException.class, df::show);
assertThrows(IllegalStateException.class, () -> df.show(5));
Expand All @@ -145,7 +154,8 @@ void methodsThrowAfterClose() {
void methodsThrowAfterCollect() throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
DataFrame df = ctx.sql("SELECT 1 AS x");
DataFrame other = ctx.sql("SELECT 1 AS x")) {
try (ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
}
Expand All @@ -158,6 +168,14 @@ void methodsThrowAfterCollect() throws Exception {
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
assertThrows(IllegalStateException.class, () -> df.union(other));
assertThrows(IllegalStateException.class, () -> df.unionDistinct(other));
assertThrows(IllegalStateException.class, () -> df.unionByName(other));
assertThrows(IllegalStateException.class, () -> df.unionByNameDistinct(other));
assertThrows(IllegalStateException.class, () -> df.intersect(other));
assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
assertThrows(IllegalStateException.class, () -> df.except(other));
assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
assertThrows(IllegalStateException.class, df::count);
assertThrows(IllegalStateException.class, df::show);
assertThrows(IllegalStateException.class, () -> df.show(5));
Expand Down Expand Up @@ -504,4 +522,150 @@ void unnestColumnsRejectsNullArgs() {
() -> df.unnestColumns(new UnnestOptions(), (String[]) null));
}
}

// -- Set operations -------------------------------------------------------

@Test
void unionKeepsDuplicates() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)");
DataFrame combined = left.union(right)) {
// union = UNION ALL: 3 + 2 = 5 rows, duplicates preserved.
assertEquals(5L, combined.count());
}
}

@Test
void unionDistinctRemovesDuplicates() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)");
DataFrame combined = left.unionDistinct(right)) {
// unionDistinct = UNION (deduplicated): {1, 2, 3} = 3 rows.
assertEquals(3L, combined.count());
}
}

@Test
void unionByNameAlignsColumnsByName() {
try (SessionContext ctx = new SessionContext();
// Left: (a, b). Right: (b, a) -- same names, swapped column positions.
DataFrame left = ctx.sql("SELECT 1 AS a, 2 AS b");
DataFrame right = ctx.sql("SELECT 4 AS b, 3 AS a");
DataFrame combined = left.unionByName(right)) {
// 2 rows: positional union would mix columns, but unionByName aligns them.
assertEquals(2L, combined.count());
}
}

@Test
void unionByNameDistinctRemovesDuplicates() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT 1 AS a, 2 AS b");
DataFrame right = ctx.sql("SELECT 2 AS b, 1 AS a");
DataFrame combined = left.unionByNameDistinct(right)) {
// After name-aligning, both rows are (a=1, b=2). Distinct collapses to 1.
assertEquals(1L, combined.count());
}
}

@Test
void intersectKeepsCommonRowsWithDuplicates() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2), (3)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (2), (3)) AS t(x)");
DataFrame inter = left.intersect(right)) {
// INTERSECT ALL: row r appears min(count(r) in left, count(r) in right) times.
// 2 appears 2 times in both => 2 rows; 3 appears 1 time in both => 1 row. Total 3.
assertEquals(3L, inter.count());
}
}

@Test
void intersectDistinctReturnsUniqueCommonRows() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2), (2), (3)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (2), (3)) AS t(x)");
DataFrame inter = left.intersectDistinct(right)) {
// INTERSECT (deduplicated): {2, 3} = 2 rows.
assertEquals(2L, inter.count());
}
}

@Test
void exceptKeepsLeftMinusRightWithDuplicates() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (1), (3)) AS t(x)");
DataFrame diff = left.except(right)) {
// DataFusion's EXCEPT ALL is implemented as a LeftAnti join, so a left row is kept iff
// its key has no match in right. With left {1, 1, 2, 2, 3} and right {1, 3}: both 1s and
// the 3 are dropped, both 2s are kept. Total 2 rows -- duplicates of 2 preserved.
assertEquals(2L, diff.count());
}
}

@Test
void exceptDistinctReturnsUniqueLeftOnlyRows() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (1), (3)) AS t(x)");
DataFrame diff = left.exceptDistinct(right)) {
// EXCEPT (DISTINCT): left is deduplicated to {1, 2, 3}, then anti-joined against
// right {1, 3}. Result: {2} = 1 row.
assertEquals(1L, diff.count());
}
}

@Test
void setOpsAreNonDestructive() {
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)");
DataFrame right = ctx.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)")) {
try (DataFrame combined = left.union(right)) {
assertEquals(4L, combined.count());
}
// Both originals still usable after the set-op call.
assertEquals(2L, left.count());
assertEquals(2L, right.count());
}
}

@Test
void setOpsRejectNullOther() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
assertThrows(IllegalArgumentException.class, () -> df.union(null));
assertThrows(IllegalArgumentException.class, () -> df.unionDistinct(null));
assertThrows(IllegalArgumentException.class, () -> df.unionByName(null));
assertThrows(IllegalArgumentException.class, () -> df.unionByNameDistinct(null));
assertThrows(IllegalArgumentException.class, () -> df.intersect(null));
assertThrows(IllegalArgumentException.class, () -> df.intersectDistinct(null));
assertThrows(IllegalArgumentException.class, () -> df.except(null));
assertThrows(IllegalArgumentException.class, () -> df.exceptDistinct(null));
}
}

@Test
void setOpsRejectClosedOther() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
DataFrame other = ctx.sql("SELECT 1 AS x");
other.close();
assertThrows(IllegalStateException.class, () -> df.union(other));
assertThrows(IllegalStateException.class, () -> df.intersectDistinct(other));
assertThrows(IllegalStateException.class, () -> df.exceptDistinct(other));
}
}

@Test
void unionWithIncompatibleSchemaThrows() {
// Different column count -- LogicalPlanBuilder rejects at plan-build time.
try (SessionContext ctx = new SessionContext();
DataFrame left = ctx.sql("SELECT 1 AS x, 2 AS y");
DataFrame right = ctx.sql("SELECT 1 AS x")) {
assertThrows(RuntimeException.class, () -> left.union(right));
}
}
}
Loading