Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 107 additions & 10 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ mod errors;

use std::sync::{Arc, OnceLock};

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::ffi_stream::FFI_ArrowArrayStream;
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::dataframe::DataFrame;
use datafusion::error::DataFusionError;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use jni::objects::{JClass, JString};
use jni::sys::jlong;
use jni::objects::{JByteArray, JClass, JString};
use jni::sys::{jboolean, jlong};
use jni::JNIEnv;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -129,13 +130,61 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_closeSessionCon
})
}

#[allow(clippy::too_many_arguments)]
fn with_parquet_options<R>(
env: &mut JNIEnv,
file_extension: JString,
parquet_pruning_set: jboolean,
parquet_pruning_value: jboolean,
skip_metadata_set: jboolean,
skip_metadata_value: jboolean,
metadata_size_hint: jlong,
schema_ipc_bytes: JByteArray,
f: impl FnOnce(ParquetReadOptions) -> JniResult<R>,
) -> JniResult<R> {
let file_ext: String = env.get_string(&file_extension)?.into();

let schema: Option<Schema> = if !schema_ipc_bytes.is_null() {
let bytes: Vec<u8> = env.convert_byte_array(&schema_ipc_bytes)?;
let reader = StreamReader::try_new(std::io::Cursor::new(bytes), None)?;
Some((*reader.schema()).clone())
} else {
None
};

let mut opts = ParquetReadOptions::default().file_extension(&file_ext);
if parquet_pruning_set != 0 {
opts = opts.parquet_pruning(parquet_pruning_value != 0);
}
if skip_metadata_set != 0 {
opts = opts.skip_metadata(skip_metadata_value != 0);
}
if metadata_size_hint >= 0 {
opts = opts.metadata_size_hint(Some(metadata_size_hint as usize));
}
if let Some(ref s) = schema {
opts = opts.schema(s);
}

f(opts)
}

#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerParquet<'local>(
pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerParquetWithOptions<
'local,
>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
name: JString<'local>,
path: JString<'local>,
file_extension: JString<'local>,
parquet_pruning_set: jboolean,
parquet_pruning_value: jboolean,
skip_metadata_set: jboolean,
skip_metadata_value: jboolean,
metadata_size_hint: jlong,
schema_ipc_bytes: JByteArray<'local>,
) {
try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
if handle == 0 {
Expand All @@ -144,11 +193,59 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerParquet
let ctx = unsafe { &*(handle as *const SessionContext) };
let name: String = env.get_string(&name)?.into();
let path: String = env.get_string(&path)?.into();
runtime().block_on(async {
ctx.register_parquet(&name, &path, ParquetReadOptions::default())
.await?;
Ok::<(), DataFusionError>(())
})?;
Ok(())
with_parquet_options(
env,
file_extension,
parquet_pruning_set,
parquet_pruning_value,
skip_metadata_set,
skip_metadata_value,
metadata_size_hint,
schema_ipc_bytes,
|opts| {
runtime().block_on(async {
ctx.register_parquet(&name, &path, opts).await?;
Ok::<(), DataFusionError>(())
})?;
Ok(())
},
)
})
}

#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_readParquetWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
path: JString<'local>,
file_extension: JString<'local>,
parquet_pruning_set: jboolean,
parquet_pruning_value: jboolean,
skip_metadata_set: jboolean,
skip_metadata_value: jboolean,
metadata_size_hint: jlong,
schema_ipc_bytes: JByteArray<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let path: String = env.get_string(&path)?.into();
with_parquet_options(
env,
file_extension,
parquet_pruning_set,
parquet_pruning_value,
skip_metadata_set,
skip_metadata_value,
metadata_size_hint,
schema_ipc_bytes,
|opts| {
let df = runtime().block_on(ctx.read_parquet(path, opts))?;
Ok(Box::into_raw(Box::new(df)) as jlong)
},
)
})
}
84 changes: 84 additions & 0 deletions src/main/java/org/apache/datafusion/ParquetReadOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import org.apache.arrow.vector.types.pojo.Schema;

/**
* Configuration knobs for parquet sources passed to {@link SessionContext#registerParquet(String,
* String, ParquetReadOptions)} and {@link SessionContext#readParquet(String, ParquetReadOptions)}.
*
* <p>Mirrors a subset of DataFusion's {@code ParquetReadOptions}. All setters return {@code this}
* for fluent chaining. Defaults: {@code fileExtension = ".parquet"}; all other fields {@code null}
* (meaning the SessionConfig default is used, or the schema is inferred from the file).
*/
public final class ParquetReadOptions {

private String fileExtension = ".parquet";
private Boolean parquetPruning;
private Boolean skipMetadata;
private Long metadataSizeHint;
private Schema schema;

public ParquetReadOptions fileExtension(String ext) {
this.fileExtension = ext;
return this;
}

public ParquetReadOptions parquetPruning(boolean v) {
this.parquetPruning = v;
return this;
}

public ParquetReadOptions skipMetadata(boolean v) {
this.skipMetadata = v;
return this;
}

public ParquetReadOptions metadataSizeHint(long bytes) {
this.metadataSizeHint = bytes;
return this;
}

public ParquetReadOptions schema(Schema schema) {
this.schema = schema;
return this;
}

String fileExtension() {
return fileExtension;
}

Boolean parquetPruning() {
return parquetPruning;
}

Boolean skipMetadata() {
return skipMetadata;
}

Long metadataSizeHint() {
return metadataSizeHint;
}

Schema schema() {
return schema;
}
}
95 changes: 93 additions & 2 deletions src/main/java/org/apache/datafusion/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@

package org.apache.datafusion;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* A DataFusion session context.
*
Expand Down Expand Up @@ -54,10 +64,70 @@ public DataFrame sql(String query) {
}

public void registerParquet(String name, String path) {
registerParquet(name, path, new ParquetReadOptions());
}

/**
* Register a parquet file as a table with the supplied {@link ParquetReadOptions}.
*
* @throws RuntimeException if registration fails (path not found, schema mismatch, etc.).
*/
public void registerParquet(String name, String path, ParquetReadOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
registerParquetWithOptions(
nativeHandle,
name,
path,
options.fileExtension(),
options.parquetPruning() != null,
options.parquetPruning() != null && options.parquetPruning(),
options.skipMetadata() != null,
options.skipMetadata() != null && options.skipMetadata(),
options.metadataSizeHint() != null ? options.metadataSizeHint() : -1L,
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
}

/** Read a parquet file as a {@link DataFrame} without registering it. */
public DataFrame readParquet(String path) {
return readParquet(path, new ParquetReadOptions());
}

/**
* Read a parquet file as a {@link DataFrame} with the supplied {@link ParquetReadOptions}.
*
* @throws RuntimeException if the read fails.
*/
public DataFrame readParquet(String path, ParquetReadOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
registerParquet(nativeHandle, name, path);
long dfHandle =
readParquetWithOptions(
nativeHandle,
path,
options.fileExtension(),
options.parquetPruning() != null,
options.parquetPruning() != null && options.parquetPruning(),
options.skipMetadata() != null,
options.skipMetadata() != null && options.skipMetadata(),
options.metadataSizeHint() != null ? options.metadataSizeHint() : -1L,
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
return new DataFrame(dfHandle);
}

private static byte[] serializeSchemaIpc(Schema schema) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
ArrowStreamWriter writer = new ArrowStreamWriter(root, null, Channels.newChannel(baos))) {
writer.start();
writer.end();
} catch (IOException e) {
throw new RuntimeException("Failed to serialize Arrow schema for JNI", e);
}
return baos.toByteArray();
}

@Override
Expand All @@ -72,7 +142,28 @@ public void close() {

private static native long createDataFrame(long handle, String sql);

private static native void registerParquet(long handle, String name, String path);
private static native void registerParquetWithOptions(
long handle,
String name,
String path,
String fileExtension,
boolean parquetPruningSet,
boolean parquetPruningValue,
boolean skipMetadataSet,
boolean skipMetadataValue,
long metadataSizeHint,
byte[] schemaIpcBytes);

private static native long readParquetWithOptions(
long handle,
String path,
String fileExtension,
boolean parquetPruningSet,
boolean parquetPruningValue,
boolean skipMetadataSet,
boolean skipMetadataValue,
long metadataSizeHint,
byte[] schemaIpcBytes);

private static native void closeSessionContext(long handle);
}
Loading
Loading