Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions bindings/elixir/lib/fluss/admin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ defmodule Fluss.Admin do
end
end

@spec get_server_nodes(t()) :: {:ok, [Fluss.ServerNode.t()]} | {:error, Fluss.Error.t()}
def get_server_nodes(admin) do
admin
|> Native.admin_get_server_nodes()
|> Native.await_nif()
end

@spec get_server_nodes!(t()) :: [Fluss.ServerNode.t()]
def get_server_nodes!(admin) do
case get_server_nodes(admin) do
{:ok, nodes} -> nodes
{:error, %Fluss.Error{} = err} -> raise err
end
end

@spec create_database(t(), String.t(), boolean()) :: :ok | {:error, Fluss.Error.t()}
def create_database(admin, name, ignore_if_exists \\ true) do
admin
Expand Down Expand Up @@ -79,6 +94,21 @@ defmodule Fluss.Admin do
end
end

@spec database_exists(t(), String.t()) :: {:ok, boolean()} | {:error, Fluss.Error.t()}
def database_exists(admin, database_name) do
admin
|> Native.admin_database_exists(database_name)
|> Native.await_nif()
end

@spec database_exists!(t(), String.t()) :: boolean()
def database_exists!(admin, database_name) do
case database_exists(admin, database_name) do
{:ok, exists} -> exists
{:error, %Fluss.Error{} = err} -> raise err
end
end

@spec create_table(t(), String.t(), String.t(), Fluss.TableDescriptor.t(), boolean()) ::
:ok | {:error, Fluss.Error.t()}
def create_table(admin, database, table, descriptor, ignore_if_exists \\ true) do
Expand Down Expand Up @@ -108,4 +138,19 @@ defmodule Fluss.Admin do
{:error, %Fluss.Error{} = err} -> raise err
end
end

@spec table_exists(t(), String.t(), String.t()) :: {:ok, boolean()} | {:error, Fluss.Error.t()}
def table_exists(admin, database_name, table_name) do
admin
|> Native.admin_table_exists(database_name, table_name)
|> Native.await_nif()
end

@spec table_exists!(t(), String.t(), String.t()) :: boolean()
def table_exists!(admin, database_name, table_name) do
case table_exists(admin, database_name, table_name) do
{:ok, exists} -> exists
{:error, %Fluss.Error{} = err} -> raise err
end
end
end
7 changes: 7 additions & 0 deletions bindings/elixir/lib/fluss/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Fluss.Native do
# Admin
def admin_new(_conn), do: :erlang.nif_error(:nif_not_loaded)

def admin_get_server_nodes(_admin), do: :erlang.nif_error(:nif_not_loaded)

def admin_create_database(_admin, _name, _ignore_if_exists),
do: :erlang.nif_error(:nif_not_loaded)

Expand All @@ -33,6 +35,8 @@ defmodule Fluss.Native do

def admin_list_databases(_admin), do: :erlang.nif_error(:nif_not_loaded)

def admin_database_exists(_admin, _database_name), do: :erlang.nif_error(:nif_not_loaded)

def admin_create_table(_admin, _db, _table, _descriptor, _ignore_if_exists),
do: :erlang.nif_error(:nif_not_loaded)

Expand All @@ -41,6 +45,9 @@ defmodule Fluss.Native do

def admin_list_tables(_admin, _database), do: :erlang.nif_error(:nif_not_loaded)

def admin_table_exists(_admin, _database_name, _table_name),
do: :erlang.nif_error(:nif_not_loaded)

# Schema / TableDescriptor
def table_descriptor_new(_schema, _bucket_count, _properties),
do: :erlang.nif_error(:nif_not_loaded)
Expand Down
37 changes: 37 additions & 0 deletions bindings/elixir/lib/fluss/server_node.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

defmodule Fluss.ServerNode do
@moduledoc """
A node in the Fluss cluster — either a coordinator or a tablet server.
Returned by `Fluss.Admin.get_server_nodes/1`.
"""

@enforce_keys [:id, :uid, :host, :port, :server_type]
defstruct [:id, :uid, :host, :port, :server_type]

@type t :: %__MODULE__{
id: integer(),
uid: String.t(),
host: String.t(),
port: non_neg_integer(),
server_type: :tablet_server | :coordinator_server
}

@spec url(t()) :: String.t()
def url(%__MODULE__{host: host, port: port}), do: "#{host}:#{port}"
end
67 changes: 66 additions & 1 deletion bindings/elixir/native/fluss_nif/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,41 @@ use crate::connection::ConnectionResource;
use crate::schema::TableDescriptorResource;
use fluss::client::FlussAdmin;
use fluss::metadata::TablePath;
use rustler::{Env, ResourceArc, Term};
use fluss::{ServerNode, ServerType};
use rustler::{Env, NifStruct, NifUnitEnum, ResourceArc, Term};
use std::sync::Arc;

#[derive(NifUnitEnum)]
pub enum NifServerType {
TabletServer,
CoordinatorServer,
}

#[derive(NifStruct)]
#[module = "Fluss.ServerNode"]
pub struct NifServerNode {
pub id: i32,
pub uid: String,
pub host: String,
pub port: u32,
pub server_type: NifServerType,
}

impl NifServerNode {
pub fn from_core(node: &ServerNode) -> Self {
Self {
id: node.id(),
uid: node.uid().to_string(),
host: node.host().to_string(),
port: node.port(),
server_type: match node.server_type() {
ServerType::TabletServer => NifServerType::TabletServer,
ServerType::CoordinatorServer => NifServerType::CoordinatorServer,
},
}
}
}

pub struct AdminResource {
pub inner: Arc<FlussAdmin>,
}
Expand All @@ -41,6 +73,15 @@ fn admin_new(
Ok(ResourceArc::new(AdminResource { inner }))
}

#[rustler::nif]
fn admin_get_server_nodes<'a>(env: Env<'a>, admin: ResourceArc<AdminResource>) -> Term<'a> {
async_nif::spawn_task_with_result(env, async move {
let nodes: Vec<ServerNode> = admin.inner.get_server_nodes().await?;
let wrapped: Vec<NifServerNode> = nodes.iter().map(NifServerNode::from_core).collect();
Ok(wrapped)
})
}

#[rustler::nif]
fn admin_create_database<'a>(
env: Env<'a>,
Expand Down Expand Up @@ -76,6 +117,17 @@ fn admin_list_databases<'a>(env: Env<'a>, admin: ResourceArc<AdminResource>) ->
async_nif::spawn_task_with_result(env, async move { admin.inner.list_databases().await })
}

#[rustler::nif]
fn admin_database_exists<'a>(
env: Env<'a>,
admin: ResourceArc<AdminResource>,
database_name: String,
) -> Term<'a> {
async_nif::spawn_task_with_result(env, async move {
admin.inner.database_exists(&database_name).await
})
}

#[rustler::nif]
fn admin_create_table<'a>(
env: Env<'a>,
Expand Down Expand Up @@ -119,3 +171,16 @@ fn admin_list_tables<'a>(
async move { admin.inner.list_tables(&database_name).await },
)
}

#[rustler::nif]
fn admin_table_exists<'a>(
env: Env<'a>,
admin: ResourceArc<AdminResource>,
database_name: String,
table_name: String,
) -> Term<'a> {
async_nif::spawn_task_with_result(env, async move {
let table_path = TablePath::new(database_name, table_name);
admin.inner.table_exists(&table_path).await
})
}
133 changes: 133 additions & 0 deletions bindings/elixir/test/integration/admin_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

defmodule Fluss.Integration.AdminTest do
use Fluss.Test.IntegrationCase, async: false

@database "fluss"

describe "get_server_nodes/1" do
test "returns a non-empty list of %Fluss.ServerNode{} structs", %{admin: admin} do
assert {:ok, nodes} = Fluss.Admin.get_server_nodes(admin)
assert is_list(nodes)
assert nodes != []

for node <- nodes do
assert %Fluss.ServerNode{} = node
assert is_integer(node.id)
assert is_binary(node.uid) and node.uid != ""
assert is_binary(node.host) and node.host != ""
assert is_integer(node.port) and node.port > 0
assert node.server_type in [:coordinator_server, :tablet_server]
end
end

test "cluster has exactly one coordinator", %{admin: admin} do
{:ok, nodes} = Fluss.Admin.get_server_nodes(admin)
coordinators = Enum.filter(nodes, &(&1.server_type == :coordinator_server))

assert [_coordinator] = coordinators
end
end

describe "get_server_nodes!/1" do
test "returns the list directly without an :ok tuple", %{admin: admin} do
nodes = Fluss.Admin.get_server_nodes!(admin)
assert is_list(nodes)
assert nodes != []
assert %Fluss.ServerNode{} = hd(nodes)
end
end

describe "database_exists/2" do
test "returns {:ok, true} for an existing database", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

:ok = Fluss.Admin.create_database(admin, db, true)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, true} = Fluss.Admin.database_exists(admin, db)
end

test "returns {:ok, false} for a non-existent database", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

assert Fluss.Admin.database_exists(admin, db) == {:ok, false}
end
end

describe "database_exists!/2" do
test "returns true for an existing database", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

:ok = Fluss.Admin.create_database(admin, db)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert Fluss.Admin.database_exists!(admin, db)
end

test "returns false for a non-existent database", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

refute Fluss.Admin.database_exists!(admin, db)
end
end

describe "table_exists/3" do
test "returns {:ok, true} for an existing table", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"

schema =
Fluss.Schema.new()
|> Fluss.Schema.column("c1", :int)
|> Fluss.Schema.column("c2", :string)

descriptor = Fluss.TableDescriptor.new!(schema)
:ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true)
on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end)

assert {:ok, true} = Fluss.Admin.table_exists(admin, @database, table)
end

test "returns {:ok, false} for a non-existent table", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"
assert {:ok, false} = Fluss.Admin.table_exists(admin, @database, table)
end
end

describe "table_exists!/3" do
test "returns true for an existing table", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"

schema =
Fluss.Schema.new()
|> Fluss.Schema.column("c1", :int)
|> Fluss.Schema.column("c2", :string)

descriptor = Fluss.TableDescriptor.new!(schema)
:ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true)
on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end)

assert Fluss.Admin.table_exists!(admin, @database, table)
end

test "returns false for a non-existent table", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"
refute Fluss.Admin.table_exists!(admin, @database, table)
end
end
end
Loading
Loading