Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,55 @@ public DataFrame withColumnRenamed(String oldName, String newName) {
return new DataFrame(renameColumn(nativeHandle, oldName, newName));
}

/**
* Add a column to this DataFrame computed from a SQL expression. If a column with the given name
* already exists, it is replaced in place; otherwise the new column is appended. The expression
* is parsed against this DataFrame's own schema, matching the convention used by {@link
* #filter(String)}. The receiver remains usable and must still be closed independently.
*
* @throws IllegalArgumentException if {@code name} or {@code expr} is {@code null}.
*/
public DataFrame withColumn(String name, String expr) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (name == null) {
throw new IllegalArgumentException("withColumn name must be non-null");
}
if (expr == null) {
throw new IllegalArgumentException("withColumn expr must be non-null");
}
return new DataFrame(withColumnExpr(nativeHandle, name, expr));
}

/**
* Expand list or struct columns into rows or fields, with default {@link UnnestOptions} (i.e.
* {@code preserveNulls = true}). The receiver remains usable and must still be closed
* independently.
*/
public DataFrame unnestColumns(String... columns) {
return unnestColumns(new UnnestOptions(), columns);
}

/**
* Expand list or struct columns into rows or fields with the supplied {@link UnnestOptions}. The
* receiver remains usable and must still be closed independently.
*
* @throws IllegalArgumentException if {@code options} or {@code columns} is {@code null}.
*/
public DataFrame unnestColumns(UnnestOptions options, String... columns) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (options == null) {
throw new IllegalArgumentException("unnestColumns options must be non-null");
}
if (columns == null) {
throw new IllegalArgumentException("unnestColumns columns must be non-null");
}
return new DataFrame(unnestColumns(nativeHandle, columns, options.preserveNulls()));
}

/**
* Materialize this DataFrame as Parquet at {@code path}. The path is treated as a directory
* unless overridden via {@link ParquetWriteOptions#singleFileOutput(boolean)}. The receiver
Expand Down Expand Up @@ -231,6 +280,10 @@ public void close() {

private static native long renameColumn(long handle, String oldName, String newName);

private static native long withColumnExpr(long handle, String name, String expr);

private static native long unnestColumns(long handle, String[] columns, boolean preserveNulls);

private static native void writeParquetWithOptions(
long handle,
String path,
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/org/apache/datafusion/UnnestOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

/**
* Configuration knobs for {@link DataFrame#unnestColumns(UnnestOptions, String...)}, mirroring
* DataFusion's {@code UnnestOptions}. Defaults match upstream: {@code preserveNulls = true}.
*
* <p>Per-column recursion (DataFusion's {@code recursions}) is intentionally not exposed yet — it
* needs a richer column-pair representation and is filed separately.
*/
public final class UnnestOptions {

private boolean preserveNulls = true;

/**
* When {@code true} (the default), nulls in the input column are preserved as null rows in the
* output. When {@code false}, nulls and empty lists are dropped.
*/
public UnnestOptions preserveNulls(boolean v) {
this.preserveNulls = v;
return this;
}

/** The current {@code preserveNulls} setting. */
public boolean preserveNulls() {
return preserveNulls;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ void methodsThrowAfterClose() {
assertThrows(IllegalStateException.class, df::distinct);
assertThrows(IllegalStateException.class, () -> df.dropColumns("x"));
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
assertThrows(IllegalStateException.class, df::count);
assertThrows(IllegalStateException.class, df::show);
assertThrows(IllegalStateException.class, () -> df.show(5));
Expand All @@ -154,6 +156,8 @@ void methodsThrowAfterCollect() throws Exception {
assertThrows(IllegalStateException.class, df::distinct);
assertThrows(IllegalStateException.class, () -> df.dropColumns("x"));
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1"));
assertThrows(IllegalStateException.class, () -> df.unnestColumns("x"));
assertThrows(IllegalStateException.class, df::count);
assertThrows(IllegalStateException.class, df::show);
assertThrows(IllegalStateException.class, () -> df.show(5));
Expand Down Expand Up @@ -353,4 +357,152 @@ void withColumnRenamedUnknownColumnIsNoOp() throws Exception {
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
}
}

@Test
void withColumnAppendsNewColumn() throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b");
DataFrame extended = source.withColumn("c", "a + b");
ArrowReader reader = extended.collect(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(1, root.getRowCount());
assertArrayEquals(
new String[] {"a", "b", "c"},
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
assertEquals(3L, ((BigIntVector) root.getVector("c")).get(0));
}
}

@Test
void withColumnReplacesExistingColumn() throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b");
DataFrame replaced = source.withColumn("b", "a * 10");
ArrowReader reader = replaced.collect(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertArrayEquals(
new String[] {"a", "b"},
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
assertEquals(10L, ((BigIntVector) root.getVector("b")).get(0));
}
}

@Test
void withColumnIsNonDestructive() {
try (SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b")) {
try (DataFrame extended = source.withColumn("c", "a + b")) {
assertEquals(1L, extended.count());
}
assertEquals(1L, source.count());
}
}

@Test
void withColumnUnknownColumnRefThrows() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
assertThrows(RuntimeException.class, () -> df.withColumn("y", "not_a_column + 1"));
}
}

@Test
void withColumnRejectsNullArgs() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
assertThrows(IllegalArgumentException.class, () -> df.withColumn(null, "x + 1"));
assertThrows(IllegalArgumentException.class, () -> df.withColumn("y", null));
}
}

@Test
void unnestColumnsExpandsList() throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql("SELECT 1 AS id, [10, 20, 30] AS vals");
DataFrame unnested = source.unnestColumns("vals");
ArrowReader reader = unnested.collect(allocator)) {
int rows = 0;
while (reader.loadNextBatch()) {
rows += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(3, rows);
}
}

@Test
void unnestColumnsPreserveNullsTrueKeepsNullRow() throws Exception {
String sql =
"SELECT id, vals FROM (VALUES "
+ "(1, [10, 20]), "
+ "(2, CAST(NULL AS BIGINT[])), "
+ "(3, [30])) AS t(id, vals)";
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql(sql);
DataFrame unnested = source.unnestColumns(new UnnestOptions().preserveNulls(true), "vals");
ArrowReader reader = unnested.collect(allocator)) {
int rows = 0;
while (reader.loadNextBatch()) {
rows += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(4, rows);
}
}

@Test
void unnestColumnsPreserveNullsFalseDropsNullRow() throws Exception {
String sql =
"SELECT id, vals FROM (VALUES "
+ "(1, [10, 20]), "
+ "(2, CAST(NULL AS BIGINT[])), "
+ "(3, [30])) AS t(id, vals)";
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql(sql);
DataFrame unnested =
source.unnestColumns(new UnnestOptions().preserveNulls(false), "vals");
ArrowReader reader = unnested.collect(allocator)) {
int rows = 0;
while (reader.loadNextBatch()) {
rows += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(3, rows);
}
}

@Test
void unnestColumnsIsNonDestructive() {
try (SessionContext ctx = new SessionContext();
DataFrame source = ctx.sql("SELECT 1 AS id, [10, 20] AS vals")) {
try (DataFrame unnested = source.unnestColumns("vals")) {
assertEquals(2L, unnested.count());
}
assertEquals(1L, source.count());
}
}

@Test
void unnestColumnsUnknownColumnThrows() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x")) {
assertThrows(RuntimeException.class, () -> df.unnestColumns("not_a_column"));
}
}

@Test
void unnestColumnsRejectsNullArgs() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql("SELECT 1 AS x, [1, 2] AS vals")) {
assertThrows(
IllegalArgumentException.class, () -> df.unnestColumns((UnnestOptions) null, "vals"));
assertThrows(
IllegalArgumentException.class,
() -> df.unnestColumns(new UnnestOptions(), (String[]) null));
}
}
}
41 changes: 41 additions & 0 deletions core/src/test/java/org/apache/datafusion/UnnestOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class UnnestOptionsTest {

@Test
void defaultPreserveNullsIsTrue() {
assertTrue(new UnnestOptions().preserveNulls());
}

@Test
void preserveNullsSetterRoundTrips() {
UnnestOptions opts = new UnnestOptions().preserveNulls(false);
assertFalse(opts.preserveNulls());
opts.preserveNulls(true);
assertTrue(opts.preserveNulls());
}
}
51 changes: 51 additions & 0 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::sync::{Arc, OnceLock};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream;
use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::common::UnnestOptions;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrame;
use datafusion::dataframe::DataFrameWriteOptions;
Expand Down Expand Up @@ -358,6 +359,56 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_renameColumn<'local>
})
}

#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_DataFrame_withColumnExpr<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
name: JString<'local>,
expr: JString<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("DataFrame handle is null".into());
}
let df = unsafe { &*(handle as *const DataFrame) }.clone();
let name: String = env.get_string(&name)?.into();
let expr: String = env.get_string(&expr)?.into();
let parsed = df.parse_sql_expr(&expr)?;
let new_df = df.with_column(&name, parsed)?;
Ok(Box::into_raw(Box::new(new_df)) as jlong)
})
}

#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_DataFrame_unnestColumns<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
columns: JObjectArray<'local>,
preserve_nulls: jboolean,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("DataFrame handle is null".into());
}
let df = unsafe { &*(handle as *const DataFrame) }.clone();

let len = env.get_array_length(&columns)?;
let mut owned: Vec<String> = Vec::with_capacity(len as usize);
for i in 0..len {
let elem = env.get_object_array_element(&columns, i)?;
let jstr: JString = elem.into();
owned.push(env.get_string(&jstr)?.into());
}
let refs: Vec<&str> = owned.iter().map(String::as_str).collect();

let opts = UnnestOptions::new().with_preserve_nulls(preserve_nulls != 0);
let new_df = df.unnest_columns_with_options(&refs, opts)?;
Ok(Box::into_raw(Box::new(new_df)) as jlong)
})
}

#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeParquetWithOptions<'local>(
mut env: JNIEnv<'local>,
Expand Down
Loading