diff --git a/bindings/elixir/lib/fluss/admin.ex b/bindings/elixir/lib/fluss/admin.ex index 6dbdb3a9..3be8e96f 100644 --- a/bindings/elixir/lib/fluss/admin.ex +++ b/bindings/elixir/lib/fluss/admin.ex @@ -50,6 +50,21 @@ defmodule Fluss.Admin do end end + @spec get_server_nodes(t()) :: {:ok, [Fluss.ServerNode.t()]} | {:error, Fluss.Error.t()} + def get_server_nodes(admin) do + admin + |> Native.admin_get_server_nodes() + |> Native.await_nif() + end + + @spec get_server_nodes!(t()) :: [Fluss.ServerNode.t()] + def get_server_nodes!(admin) do + case get_server_nodes(admin) do + {:ok, nodes} -> nodes + {:error, %Fluss.Error{} = err} -> raise err + end + end + @spec create_database(t(), String.t(), boolean()) :: :ok | {:error, Fluss.Error.t()} def create_database(admin, name, ignore_if_exists \\ true) do admin @@ -79,6 +94,21 @@ defmodule Fluss.Admin do end end + @spec database_exists(t(), String.t()) :: {:ok, boolean()} | {:error, Fluss.Error.t()} + def database_exists(admin, database_name) do + admin + |> Native.admin_database_exists(database_name) + |> Native.await_nif() + end + + @spec database_exists!(t(), String.t()) :: boolean() + def database_exists!(admin, database_name) do + case database_exists(admin, database_name) do + {:ok, exists} -> exists + {:error, %Fluss.Error{} = err} -> raise err + end + end + @spec create_table(t(), String.t(), String.t(), Fluss.TableDescriptor.t(), boolean()) :: :ok | {:error, Fluss.Error.t()} def create_table(admin, database, table, descriptor, ignore_if_exists \\ true) do @@ -108,4 +138,19 @@ defmodule Fluss.Admin do {:error, %Fluss.Error{} = err} -> raise err end end + + @spec table_exists(t(), String.t(), String.t()) :: {:ok, boolean()} | {:error, Fluss.Error.t()} + def table_exists(admin, database_name, table_name) do + admin + |> Native.admin_table_exists(database_name, table_name) + |> Native.await_nif() + end + + @spec table_exists!(t(), String.t(), String.t()) :: boolean() + def table_exists!(admin, database_name, table_name) do + case table_exists(admin, database_name, table_name) do + {:ok, exists} -> exists + {:error, %Fluss.Error{} = err} -> raise err + end + end end diff --git a/bindings/elixir/lib/fluss/native.ex b/bindings/elixir/lib/fluss/native.ex index 865dda14..b0e41f04 100644 --- a/bindings/elixir/lib/fluss/native.ex +++ b/bindings/elixir/lib/fluss/native.ex @@ -25,6 +25,8 @@ defmodule Fluss.Native do # Admin def admin_new(_conn), do: :erlang.nif_error(:nif_not_loaded) + def admin_get_server_nodes(_admin), do: :erlang.nif_error(:nif_not_loaded) + def admin_create_database(_admin, _name, _ignore_if_exists), do: :erlang.nif_error(:nif_not_loaded) @@ -33,6 +35,8 @@ defmodule Fluss.Native do def admin_list_databases(_admin), do: :erlang.nif_error(:nif_not_loaded) + def admin_database_exists(_admin, _database_name), do: :erlang.nif_error(:nif_not_loaded) + def admin_create_table(_admin, _db, _table, _descriptor, _ignore_if_exists), do: :erlang.nif_error(:nif_not_loaded) @@ -41,6 +45,9 @@ defmodule Fluss.Native do def admin_list_tables(_admin, _database), do: :erlang.nif_error(:nif_not_loaded) + def admin_table_exists(_admin, _database_name, _table_name), + do: :erlang.nif_error(:nif_not_loaded) + # Schema / TableDescriptor def table_descriptor_new(_schema, _bucket_count, _properties), do: :erlang.nif_error(:nif_not_loaded) diff --git a/bindings/elixir/lib/fluss/server_node.ex b/bindings/elixir/lib/fluss/server_node.ex new file mode 100644 index 00000000..5441aa23 --- /dev/null +++ b/bindings/elixir/lib/fluss/server_node.ex @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.ServerNode do + @moduledoc """ + A node in the Fluss cluster — either a coordinator or a tablet server. + Returned by `Fluss.Admin.get_server_nodes/1`. + """ + + @enforce_keys [:id, :uid, :host, :port, :server_type] + defstruct [:id, :uid, :host, :port, :server_type] + + @type t :: %__MODULE__{ + id: integer(), + uid: String.t(), + host: String.t(), + port: non_neg_integer(), + server_type: :tablet_server | :coordinator_server + } + + @spec url(t()) :: String.t() + def url(%__MODULE__{host: host, port: port}), do: "#{host}:#{port}" +end diff --git a/bindings/elixir/native/fluss_nif/src/admin.rs b/bindings/elixir/native/fluss_nif/src/admin.rs index e3f29aeb..ddb83e68 100644 --- a/bindings/elixir/native/fluss_nif/src/admin.rs +++ b/bindings/elixir/native/fluss_nif/src/admin.rs @@ -21,9 +21,41 @@ use crate::connection::ConnectionResource; use crate::schema::TableDescriptorResource; use fluss::client::FlussAdmin; use fluss::metadata::TablePath; -use rustler::{Env, ResourceArc, Term}; +use fluss::{ServerNode, ServerType}; +use rustler::{Env, NifStruct, NifUnitEnum, ResourceArc, Term}; use std::sync::Arc; +#[derive(NifUnitEnum)] +pub enum NifServerType { + TabletServer, + CoordinatorServer, +} + +#[derive(NifStruct)] +#[module = "Fluss.ServerNode"] +pub struct NifServerNode { + pub id: i32, + pub uid: String, + pub host: String, + pub port: u32, + pub server_type: NifServerType, +} + +impl NifServerNode { + pub fn from_core(node: &ServerNode) -> Self { + Self { + id: node.id(), + uid: node.uid().to_string(), + host: node.host().to_string(), + port: node.port(), + server_type: match node.server_type() { + ServerType::TabletServer => NifServerType::TabletServer, + ServerType::CoordinatorServer => NifServerType::CoordinatorServer, + }, + } + } +} + pub struct AdminResource { pub inner: Arc, } @@ -41,6 +73,15 @@ fn admin_new( Ok(ResourceArc::new(AdminResource { inner })) } +#[rustler::nif] +fn admin_get_server_nodes<'a>(env: Env<'a>, admin: ResourceArc) -> Term<'a> { + async_nif::spawn_task_with_result(env, async move { + let nodes: Vec = admin.inner.get_server_nodes().await?; + let wrapped: Vec = nodes.iter().map(NifServerNode::from_core).collect(); + Ok(wrapped) + }) +} + #[rustler::nif] fn admin_create_database<'a>( env: Env<'a>, @@ -76,6 +117,17 @@ fn admin_list_databases<'a>(env: Env<'a>, admin: ResourceArc) -> async_nif::spawn_task_with_result(env, async move { admin.inner.list_databases().await }) } +#[rustler::nif] +fn admin_database_exists<'a>( + env: Env<'a>, + admin: ResourceArc, + database_name: String, +) -> Term<'a> { + async_nif::spawn_task_with_result(env, async move { + admin.inner.database_exists(&database_name).await + }) +} + #[rustler::nif] fn admin_create_table<'a>( env: Env<'a>, @@ -119,3 +171,16 @@ fn admin_list_tables<'a>( async move { admin.inner.list_tables(&database_name).await }, ) } + +#[rustler::nif] +fn admin_table_exists<'a>( + env: Env<'a>, + admin: ResourceArc, + database_name: String, + table_name: String, +) -> Term<'a> { + async_nif::spawn_task_with_result(env, async move { + let table_path = TablePath::new(database_name, table_name); + admin.inner.table_exists(&table_path).await + }) +} diff --git a/bindings/elixir/test/integration/admin_test.exs b/bindings/elixir/test/integration/admin_test.exs new file mode 100644 index 00000000..1f670350 --- /dev/null +++ b/bindings/elixir/test/integration/admin_test.exs @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Integration.AdminTest do + use Fluss.Test.IntegrationCase, async: false + + @database "fluss" + + describe "get_server_nodes/1" do + test "returns a non-empty list of %Fluss.ServerNode{} structs", %{admin: admin} do + assert {:ok, nodes} = Fluss.Admin.get_server_nodes(admin) + assert is_list(nodes) + assert nodes != [] + + for node <- nodes do + assert %Fluss.ServerNode{} = node + assert is_integer(node.id) + assert is_binary(node.uid) and node.uid != "" + assert is_binary(node.host) and node.host != "" + assert is_integer(node.port) and node.port > 0 + assert node.server_type in [:coordinator_server, :tablet_server] + end + end + + test "cluster has exactly one coordinator", %{admin: admin} do + {:ok, nodes} = Fluss.Admin.get_server_nodes(admin) + coordinators = Enum.filter(nodes, &(&1.server_type == :coordinator_server)) + + assert [_coordinator] = coordinators + end + end + + describe "get_server_nodes!/1" do + test "returns the list directly without an :ok tuple", %{admin: admin} do + nodes = Fluss.Admin.get_server_nodes!(admin) + assert is_list(nodes) + assert nodes != [] + assert %Fluss.ServerNode{} = hd(nodes) + end + end + + describe "database_exists/2" do + test "returns {:ok, true} for an existing database", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + + :ok = Fluss.Admin.create_database(admin, db, true) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert {:ok, true} = Fluss.Admin.database_exists(admin, db) + end + + test "returns {:ok, false} for a non-existent database", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + + assert Fluss.Admin.database_exists(admin, db) == {:ok, false} + end + end + + describe "database_exists!/2" do + test "returns true for an existing database", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + + :ok = Fluss.Admin.create_database(admin, db) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert Fluss.Admin.database_exists!(admin, db) + end + + test "returns false for a non-existent database", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + + refute Fluss.Admin.database_exists!(admin, db) + end + end + + describe "table_exists/3" do + test "returns {:ok, true} for an existing table", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + + schema = + Fluss.Schema.new() + |> Fluss.Schema.column("c1", :int) + |> Fluss.Schema.column("c2", :string) + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true) + on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end) + + assert {:ok, true} = Fluss.Admin.table_exists(admin, @database, table) + end + + test "returns {:ok, false} for a non-existent table", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + assert {:ok, false} = Fluss.Admin.table_exists(admin, @database, table) + end + end + + describe "table_exists!/3" do + test "returns true for an existing table", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + + schema = + Fluss.Schema.new() + |> Fluss.Schema.column("c1", :int) + |> Fluss.Schema.column("c2", :string) + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true) + on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end) + + assert Fluss.Admin.table_exists!(admin, @database, table) + end + + test "returns false for a non-existent table", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + refute Fluss.Admin.table_exists!(admin, @database, table) + end + end +end diff --git a/bindings/elixir/test/integration/log_table_test.exs b/bindings/elixir/test/integration/log_table_test.exs index b3041b95..16a670fe 100644 --- a/bindings/elixir/test/integration/log_table_test.exs +++ b/bindings/elixir/test/integration/log_table_test.exs @@ -16,29 +16,10 @@ # under the License. defmodule Fluss.Integration.LogTableTest do - use ExUnit.Case, async: false - - alias Fluss.Test.Cluster - - @moduletag :integration + use Fluss.Test.IntegrationCase, async: false @database "fluss" - setup_all do - case Cluster.ensure_started() do - {:ok, servers} -> - config = Fluss.Config.new(servers) - - # Wait for cluster to be fully ready (connection + admin working) - {conn, admin} = connect_with_retry(config, 90) - - %{conn: conn, admin: admin, config: config} - - {:error, reason} -> - raise "Failed to start Fluss cluster: #{reason}" - end - end - describe "append and scan" do test "append rows and scan with log scanner", %{conn: conn, admin: admin} do table_name = "ex_test_append_and_scan_#{:rand.uniform(100_000)}" @@ -388,26 +369,4 @@ defmodule Fluss.Integration.LogTableTest do defp cleanup_table(admin, table_name) do Fluss.Admin.drop_table(admin, @database, table_name, true) end - - defp connect_with_retry(config, timeout_s) do - deadline = System.monotonic_time(:second) + timeout_s - do_connect_retry(config, deadline, nil) - end - - defp do_connect_retry(config, deadline, last_error) do - if System.monotonic_time(:second) >= deadline do - raise "Could not connect to Fluss cluster: #{inspect(last_error)}" - end - - try do - conn = Fluss.Connection.new!(config) - admin = Fluss.Admin.new!(conn) - {:ok, _databases} = Fluss.Admin.list_databases(admin) - {conn, admin} - rescue - e -> - Process.sleep(2_000) - do_connect_retry(config, deadline, e) - end - end end diff --git a/bindings/elixir/test/support/cluster.ex b/bindings/elixir/test/support/cluster.ex index 40f0f68d..e6ff691d 100644 --- a/bindings/elixir/test/support/cluster.ex +++ b/bindings/elixir/test/support/cluster.ex @@ -46,6 +46,28 @@ defmodule Fluss.Test.Cluster do end end + def connect_with_retry(config, timeout_s) do + deadline = System.monotonic_time(:second) + timeout_s + do_connect_retry(config, deadline, nil) + end + + defp do_connect_retry(config, deadline, last_error) do + if System.monotonic_time(:second) >= deadline do + raise "Could not connect to Fluss cluster: #{inspect(last_error)}" + end + + try do + conn = Fluss.Connection.new!(config) + admin = Fluss.Admin.new!(conn) + {:ok, _databases} = Fluss.Admin.list_databases(admin) + {conn, admin} + rescue + e -> + Process.sleep(2_000) + do_connect_retry(config, deadline, e) + end + end + defp start_cluster do with {:ok, cli} <- find_cli_binary(), {output, 0} <- diff --git a/bindings/elixir/test/support/integration_case.ex b/bindings/elixir/test/support/integration_case.ex new file mode 100644 index 00000000..b5aa91a6 --- /dev/null +++ b/bindings/elixir/test/support/integration_case.ex @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Test.IntegrationCase do + @moduledoc """ + Testing Template for Fluss integration tests + """ + use ExUnit.CaseTemplate + alias Fluss.Test.Cluster + + using do + quote do + @moduletag :integration + alias Fluss.Test.Cluster + end + end + + setup_all do + case Cluster.ensure_started() do + {:ok, servers} -> + config = Fluss.Config.new(servers) + {conn, admin} = Cluster.connect_with_retry(config, 90) + %{conn: conn, admin: admin, config: config} + + {:error, reason} -> + raise "Failed to start Fluss cluster: #{reason}" + end + end +end