diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java index dceb497..b263564 100644 --- a/core/src/main/java/org/apache/datafusion/DataFrame.java +++ b/core/src/main/java/org/apache/datafusion/DataFrame.java @@ -171,6 +171,55 @@ public DataFrame withColumnRenamed(String oldName, String newName) { return new DataFrame(renameColumn(nativeHandle, oldName, newName)); } + /** + * Add a column to this DataFrame computed from a SQL expression. If a column with the given name + * already exists, it is replaced in place; otherwise the new column is appended. The expression + * is parsed against this DataFrame's own schema, matching the convention used by {@link + * #filter(String)}. The receiver remains usable and must still be closed independently. + * + * @throws IllegalArgumentException if {@code name} or {@code expr} is {@code null}. + */ + public DataFrame withColumn(String name, String expr) { + if (nativeHandle == 0) { + throw new IllegalStateException("DataFrame is closed or already collected"); + } + if (name == null) { + throw new IllegalArgumentException("withColumn name must be non-null"); + } + if (expr == null) { + throw new IllegalArgumentException("withColumn expr must be non-null"); + } + return new DataFrame(withColumnExpr(nativeHandle, name, expr)); + } + + /** + * Expand list or struct columns into rows or fields, with default {@link UnnestOptions} (i.e. + * {@code preserveNulls = true}). The receiver remains usable and must still be closed + * independently. + */ + public DataFrame unnestColumns(String... columns) { + return unnestColumns(new UnnestOptions(), columns); + } + + /** + * Expand list or struct columns into rows or fields with the supplied {@link UnnestOptions}. The + * receiver remains usable and must still be closed independently. + * + * @throws IllegalArgumentException if {@code options} or {@code columns} is {@code null}. + */ + public DataFrame unnestColumns(UnnestOptions options, String... columns) { + if (nativeHandle == 0) { + throw new IllegalStateException("DataFrame is closed or already collected"); + } + if (options == null) { + throw new IllegalArgumentException("unnestColumns options must be non-null"); + } + if (columns == null) { + throw new IllegalArgumentException("unnestColumns columns must be non-null"); + } + return new DataFrame(unnestColumns(nativeHandle, columns, options.preserveNulls())); + } + /** * Materialize this DataFrame as Parquet at {@code path}. The path is treated as a directory * unless overridden via {@link ParquetWriteOptions#singleFileOutput(boolean)}. The receiver @@ -231,6 +280,10 @@ public void close() { private static native long renameColumn(long handle, String oldName, String newName); + private static native long withColumnExpr(long handle, String name, String expr); + + private static native long unnestColumns(long handle, String[] columns, boolean preserveNulls); + private static native void writeParquetWithOptions( long handle, String path, diff --git a/core/src/main/java/org/apache/datafusion/UnnestOptions.java b/core/src/main/java/org/apache/datafusion/UnnestOptions.java new file mode 100644 index 0000000..0dad58b --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/UnnestOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +/** + * Configuration knobs for {@link DataFrame#unnestColumns(UnnestOptions, String...)}, mirroring + * DataFusion's {@code UnnestOptions}. Defaults match upstream: {@code preserveNulls = true}. + * + *

Per-column recursion (DataFusion's {@code recursions}) is intentionally not exposed yet — it + * needs a richer column-pair representation and is filed separately. + */ +public final class UnnestOptions { + + private boolean preserveNulls = true; + + /** + * When {@code true} (the default), nulls in the input column are preserved as null rows in the + * output. When {@code false}, nulls and empty lists are dropped. + */ + public UnnestOptions preserveNulls(boolean v) { + this.preserveNulls = v; + return this; + } + + /** The current {@code preserveNulls} setting. */ + public boolean preserveNulls() { + return preserveNulls; + } +} diff --git a/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java b/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java index cb5c9ef..52e9afa 100644 --- a/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java +++ b/core/src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java @@ -133,6 +133,8 @@ void methodsThrowAfterClose() { assertThrows(IllegalStateException.class, df::distinct); assertThrows(IllegalStateException.class, () -> df.dropColumns("x")); assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y")); + assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1")); + assertThrows(IllegalStateException.class, () -> df.unnestColumns("x")); assertThrows(IllegalStateException.class, df::count); assertThrows(IllegalStateException.class, df::show); assertThrows(IllegalStateException.class, () -> df.show(5)); @@ -154,6 +156,8 @@ void methodsThrowAfterCollect() throws Exception { assertThrows(IllegalStateException.class, df::distinct); assertThrows(IllegalStateException.class, () -> df.dropColumns("x")); assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y")); + assertThrows(IllegalStateException.class, () -> df.withColumn("y", "x + 1")); + assertThrows(IllegalStateException.class, () -> df.unnestColumns("x")); assertThrows(IllegalStateException.class, df::count); assertThrows(IllegalStateException.class, df::show); assertThrows(IllegalStateException.class, () -> df.show(5)); @@ -353,4 +357,152 @@ void withColumnRenamedUnknownColumnIsNoOp() throws Exception { root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new)); } } + + @Test + void withColumnAppendsNewColumn() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b"); + DataFrame extended = source.withColumn("c", "a + b"); + ArrowReader reader = extended.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(1, root.getRowCount()); + assertArrayEquals( + new String[] {"a", "b", "c"}, + root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new)); + assertEquals(3L, ((BigIntVector) root.getVector("c")).get(0)); + } + } + + @Test + void withColumnReplacesExistingColumn() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b"); + DataFrame replaced = source.withColumn("b", "a * 10"); + ArrowReader reader = replaced.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertArrayEquals( + new String[] {"a", "b"}, + root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new)); + assertEquals(10L, ((BigIntVector) root.getVector("b")).get(0)); + } + } + + @Test + void withColumnIsNonDestructive() { + try (SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b")) { + try (DataFrame extended = source.withColumn("c", "a + b")) { + assertEquals(1L, extended.count()); + } + assertEquals(1L, source.count()); + } + } + + @Test + void withColumnUnknownColumnRefThrows() { + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql("SELECT 1 AS x")) { + assertThrows(RuntimeException.class, () -> df.withColumn("y", "not_a_column + 1")); + } + } + + @Test + void withColumnRejectsNullArgs() { + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql("SELECT 1 AS x")) { + assertThrows(IllegalArgumentException.class, () -> df.withColumn(null, "x + 1")); + assertThrows(IllegalArgumentException.class, () -> df.withColumn("y", null)); + } + } + + @Test + void unnestColumnsExpandsList() throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql("SELECT 1 AS id, [10, 20, 30] AS vals"); + DataFrame unnested = source.unnestColumns("vals"); + ArrowReader reader = unnested.collect(allocator)) { + int rows = 0; + while (reader.loadNextBatch()) { + rows += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rows); + } + } + + @Test + void unnestColumnsPreserveNullsTrueKeepsNullRow() throws Exception { + String sql = + "SELECT id, vals FROM (VALUES " + + "(1, [10, 20]), " + + "(2, CAST(NULL AS BIGINT[])), " + + "(3, [30])) AS t(id, vals)"; + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql(sql); + DataFrame unnested = source.unnestColumns(new UnnestOptions().preserveNulls(true), "vals"); + ArrowReader reader = unnested.collect(allocator)) { + int rows = 0; + while (reader.loadNextBatch()) { + rows += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(4, rows); + } + } + + @Test + void unnestColumnsPreserveNullsFalseDropsNullRow() throws Exception { + String sql = + "SELECT id, vals FROM (VALUES " + + "(1, [10, 20]), " + + "(2, CAST(NULL AS BIGINT[])), " + + "(3, [30])) AS t(id, vals)"; + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql(sql); + DataFrame unnested = + source.unnestColumns(new UnnestOptions().preserveNulls(false), "vals"); + ArrowReader reader = unnested.collect(allocator)) { + int rows = 0; + while (reader.loadNextBatch()) { + rows += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rows); + } + } + + @Test + void unnestColumnsIsNonDestructive() { + try (SessionContext ctx = new SessionContext(); + DataFrame source = ctx.sql("SELECT 1 AS id, [10, 20] AS vals")) { + try (DataFrame unnested = source.unnestColumns("vals")) { + assertEquals(2L, unnested.count()); + } + assertEquals(1L, source.count()); + } + } + + @Test + void unnestColumnsUnknownColumnThrows() { + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql("SELECT 1 AS x")) { + assertThrows(RuntimeException.class, () -> df.unnestColumns("not_a_column")); + } + } + + @Test + void unnestColumnsRejectsNullArgs() { + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql("SELECT 1 AS x, [1, 2] AS vals")) { + assertThrows( + IllegalArgumentException.class, () -> df.unnestColumns((UnnestOptions) null, "vals")); + assertThrows( + IllegalArgumentException.class, + () -> df.unnestColumns(new UnnestOptions(), (String[]) null)); + } + } } diff --git a/core/src/test/java/org/apache/datafusion/UnnestOptionsTest.java b/core/src/test/java/org/apache/datafusion/UnnestOptionsTest.java new file mode 100644 index 0000000..e80301a --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/UnnestOptionsTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class UnnestOptionsTest { + + @Test + void defaultPreserveNullsIsTrue() { + assertTrue(new UnnestOptions().preserveNulls()); + } + + @Test + void preserveNullsSetterRoundTrips() { + UnnestOptions opts = new UnnestOptions().preserveNulls(false); + assertFalse(opts.preserveNulls()); + opts.preserveNulls(true); + assertTrue(opts.preserveNulls()); + } +} diff --git a/native/src/lib.rs b/native/src/lib.rs index 9041819..042a301 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -30,6 +30,7 @@ use std::sync::{Arc, OnceLock}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream; use datafusion::arrow::record_batch::RecordBatchIterator; +use datafusion::common::UnnestOptions; use datafusion::config::TableParquetOptions; use datafusion::dataframe::DataFrame; use datafusion::dataframe::DataFrameWriteOptions; @@ -358,6 +359,56 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_renameColumn<'local> }) } +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_DataFrame_withColumnExpr<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + name: JString<'local>, + expr: JString<'local>, +) -> jlong { + try_unwrap_or_throw(&mut env, 0, |env| -> JniResult { + if handle == 0 { + return Err("DataFrame handle is null".into()); + } + let df = unsafe { &*(handle as *const DataFrame) }.clone(); + let name: String = env.get_string(&name)?.into(); + let expr: String = env.get_string(&expr)?.into(); + let parsed = df.parse_sql_expr(&expr)?; + let new_df = df.with_column(&name, parsed)?; + Ok(Box::into_raw(Box::new(new_df)) as jlong) + }) +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_DataFrame_unnestColumns<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + columns: JObjectArray<'local>, + preserve_nulls: jboolean, +) -> jlong { + try_unwrap_or_throw(&mut env, 0, |env| -> JniResult { + if handle == 0 { + return Err("DataFrame handle is null".into()); + } + let df = unsafe { &*(handle as *const DataFrame) }.clone(); + + let len = env.get_array_length(&columns)?; + let mut owned: Vec = Vec::with_capacity(len as usize); + for i in 0..len { + let elem = env.get_object_array_element(&columns, i)?; + let jstr: JString = elem.into(); + owned.push(env.get_string(&jstr)?.into()); + } + let refs: Vec<&str> = owned.iter().map(String::as_str).collect(); + + let opts = UnnestOptions::new().with_preserve_nulls(preserve_nulls != 0); + let new_df = df.unnest_columns_with_options(&refs, opts)?; + Ok(Box::into_raw(Box::new(new_df)) as jlong) + }) +} + #[no_mangle] pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeParquetWithOptions<'local>( mut env: JNIEnv<'local>,