use arrow::{
array::{Array, ArrayData, PrimitiveArray},
datatypes::{ArrowPrimitiveType, DataType, Field, Schema, SchemaBuilder},
error::ArrowError,
ffi::{to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader},
};
use extendr_api::prelude::*;
pub fn allocate_array(args: Pairlist) -> Result<Robj> {
R!("nanoarrow::nanoarrow_allocate_array")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_allocate_array()` must be available")
.call(args)
}
pub fn allocate_array_stream(args: Pairlist) -> Result<Robj> {
R!("nanoarrow::nanoarrow_allocate_array_stream")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_allocate_array()` must be available")
.call(args)
}
pub fn allocate_schema(args: Pairlist) -> Result<Robj> {
R!("nanoarrow::nanoarrow_allocate_schema")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_allocate_schema()` must be available")
.call(args)
}
pub fn move_pointer(args: Pairlist) -> Result<Robj> {
R!("nanoarrow::nanoarrow_pointer_move")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_pointer_move()` must be available")
.call(args)
}
pub fn set_array_schema(arr: &Robj, schema: &Robj) {
let _ = R!("nanoarrow::nanoarrow_array_set_schema")
.expect("`nanoarrow` must be installed")
.as_function()
.expect("`nanoarrow_array_set_schema()` must be available")
.call(pairlist!(arr, schema));
}
pub trait ToArrowRobj {
fn to_arrow_robj(&self) -> Result<Robj>;
}
impl ToArrowRobj for ArrayData {
fn to_arrow_robj(&self) -> Result<Robj> {
let (ffi_array, ffi_schema) = to_ffi(self).expect("success converting arrow data");
let ffi_array_ptr = &ffi_array as *const FFI_ArrowArray as usize;
let arry_addr_chr = ffi_array_ptr.to_string();
let ffi_schema_ptr = &ffi_schema as *const FFI_ArrowSchema as usize;
let schema_addr_chr = ffi_schema_ptr.to_string();
let arr_to_fill = allocate_array(pairlist!())?;
let schema_to_fill = allocate_schema(pairlist!())?;
let _ = move_pointer(pairlist!(arry_addr_chr, &arr_to_fill));
let _ = move_pointer(pairlist!(schema_addr_chr, &schema_to_fill));
set_array_schema(&arr_to_fill, &schema_to_fill);
Ok(arr_to_fill)
}
}
impl<T: ArrowPrimitiveType> ToArrowRobj for PrimitiveArray<T> {
fn to_arrow_robj(&self) -> Result<Robj> {
let data = self.into_data();
data.to_arrow_robj()
}
}
impl ToArrowRobj for Field {
fn to_arrow_robj(&self) -> Result<Robj> {
let ffi_schema = FFI_ArrowSchema::try_from(self).expect("Field is FFI compatible");
let ffi_schema_ptr = &ffi_schema as *const FFI_ArrowSchema as usize;
let schema_addr_chr = ffi_schema_ptr.to_string();
let schema_to_fill = allocate_schema(pairlist!())?;
let _ = move_pointer(pairlist!(schema_addr_chr, &schema_to_fill));
Ok(schema_to_fill)
}
}
impl ToArrowRobj for Schema {
fn to_arrow_robj(&self) -> Result<Robj> {
let ffi_schema = FFI_ArrowSchema::try_from(self).expect("valid Schema");
let ffi_schema_ptr = &ffi_schema as *const FFI_ArrowSchema as usize;
let schema_addr_chr = ffi_schema_ptr.to_string();
let schema_to_fill = allocate_schema(pairlist!())?;
let _ = move_pointer(pairlist!(schema_addr_chr, &schema_to_fill));
Ok(schema_to_fill)
}
}
impl ToArrowRobj for DataType {
fn to_arrow_robj(&self) -> Result<Robj> {
let ffi_schema = FFI_ArrowSchema::try_from(self).expect("valid Schema");
let ffi_schema_ptr = &ffi_schema as *const FFI_ArrowSchema as usize;
let schema_addr_chr = ffi_schema_ptr.to_string();
let schema_to_fill = allocate_schema(pairlist!())?;
let _ = move_pointer(pairlist!(schema_addr_chr, &schema_to_fill));
Ok(schema_to_fill)
}
}
impl ToArrowRobj for RecordBatch {
fn to_arrow_robj(&self) -> Result<Robj> {
let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema().clone());
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
let mut stream = FFI_ArrowArrayStream::new(reader);
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream as usize;
let stream_to_fill = allocate_array_stream(pairlist!())?;
let _ = move_pointer(pairlist!(stream_ptr.to_string(), &stream_to_fill));
Ok(stream_to_fill)
}
}
pub trait IntoArrowRobj {
fn into_arrow_robj(self) -> Result<Robj>;
}
macro_rules! impl_into_arrow {
($t:ident) => {
impl IntoArrowRobj for $t {
fn into_arrow_robj(self) -> Result<Robj> {
self.to_arrow_robj()
}
}
};
}
impl_into_arrow!(ArrayData);
impl_into_arrow!(Field);
impl_into_arrow!(Schema);
impl_into_arrow!(DataType);
impl_into_arrow!(RecordBatch);
impl<T: ArrowPrimitiveType> IntoArrowRobj for PrimitiveArray<T> {
fn into_arrow_robj(self) -> Result<Robj> {
self.to_arrow_robj()
}
}
fn to_arrow_robj_stream_reader(reader: ArrowArrayStreamReader) -> Result<Robj> {
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
let mut stream = FFI_ArrowArrayStream::new(reader);
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream as usize;
let stream_to_fill = allocate_array_stream(pairlist!())?;
let _ = move_pointer(pairlist!(stream_ptr.to_string(), &stream_to_fill));
Ok(stream_to_fill)
}
impl IntoArrowRobj for ArrowArrayStreamReader {
fn into_arrow_robj(self) -> Result<Robj> {
to_arrow_robj_stream_reader(self)
}
}
impl IntoArrowRobj for Box<dyn RecordBatchReader + Send> {
fn into_arrow_robj(self) -> Result<Robj> {
let mut stream = FFI_ArrowArrayStream::new(self);
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream as usize;
let stream_to_fill = allocate_array_stream(pairlist!())?;
let _ = move_pointer(pairlist!(stream_ptr.to_string(), &stream_to_fill));
Ok(stream_to_fill)
}
}
impl IntoArrowRobj for Vec<RecordBatch> {
fn into_arrow_robj(self) -> Result<Robj> {
if self.is_empty() {
let sb = SchemaBuilder::new();
let schema = sb.finish();
let empty_iter = vec![].into_iter();
let rb = arrow::record_batch::RecordBatchIterator::new(empty_iter, schema.into());
return rb.into_arrow_robj();
}
let schema = self[0].schema();
let res = self.into_iter().map(Ok::<RecordBatch, ArrowError>);
let rbit = arrow::record_batch::RecordBatchIterator::new(res, schema);
let reader: Box<dyn RecordBatchReader + Send> = Box::new(rbit);
reader.into_arrow_robj()
}
}
impl<I> IntoArrowRobj for RecordBatchIterator<I>
where
I: IntoIterator<Item = std::result::Result<RecordBatch, ArrowError>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
{
fn into_arrow_robj(self) -> Result<Robj> {
let reader: Box<dyn RecordBatchReader + Send> = Box::new(self);
reader.into_arrow_robj()
}
}