diff --git a/.github/workflows/build_and_test_elixir.yml b/.github/workflows/build_and_test_elixir.yml index 03f3329e..ce5de2df 100644 --- a/.github/workflows/build_and_test_elixir.yml +++ b/.github/workflows/build_and_test_elixir.yml @@ -91,6 +91,18 @@ jobs: working-directory: bindings/elixir run: mix credo + - name: Cache PLT + uses: actions/cache@v4 + with: + path: bindings/elixir/priv/plts/ + key: ${{ runner.os }}-otp-${{ steps.beam.outputs.otp-version }}-elixir-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('bindings/elixir/mix.lock') }}-plt + restore-keys: | + ${{ runner.os }}-otp-${{ steps.beam.outputs.otp-version }}-elixir-${{ steps.beam.outputs.elixir-version }}-mix- + + - name: Dialyzer + working-directory: bindings/elixir + run: mix dialyzer + - name: Run unit tests working-directory: bindings/elixir run: mix test diff --git a/bindings/elixir/.gitignore b/bindings/elixir/.gitignore index 90277ffb..dd00538b 100644 --- a/bindings/elixir/.gitignore +++ b/bindings/elixir/.gitignore @@ -4,6 +4,7 @@ deps/ # Generated NIF shared library priv/native/ +priv/plts/ # Crash dumps erl_crash.dump diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index 8aaacf79..d7075b9e 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -88,6 +88,8 @@ defmodule Fluss.Config do writer_retries: non_neg_integer() | nil } + defguardp is_non_neg_integer(n) when is_integer(n) and n >= 0 + @spec new(String.t()) :: t() def new(bootstrap_servers) when is_binary(bootstrap_servers) do %__MODULE__{bootstrap_servers: bootstrap_servers} @@ -101,45 +103,46 @@ defmodule Fluss.Config do do: %{config | bootstrap_servers: servers} @spec set_connect_timeout_ms(t(), non_neg_integer()) :: t() - def set_connect_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), + def set_connect_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms), do: %{config | connect_timeout_ms: ms} @spec set_remote_file_download_thread_num(t(), non_neg_integer()) :: t() def set_remote_file_download_thread_num(%__MODULE__{} = config, threads) - when is_integer(threads), + when is_non_neg_integer(threads), do: %{config | remote_file_download_thread_num: threads} @spec set_scanner_log_fetch_max_bytes(t(), non_neg_integer()) :: t() def set_scanner_log_fetch_max_bytes(%__MODULE__{} = config, max_bytes) - when is_integer(max_bytes), + when is_non_neg_integer(max_bytes), do: %{config | scanner_log_fetch_max_bytes: max_bytes} @spec set_scanner_log_fetch_max_bytes_for_bucket(t(), non_neg_integer()) :: t() def set_scanner_log_fetch_max_bytes_for_bucket(%__MODULE__{} = config, max_bytes) - when is_integer(max_bytes), + when is_non_neg_integer(max_bytes), do: %{config | scanner_log_fetch_max_bytes_for_bucket: max_bytes} @spec set_scanner_log_fetch_min_bytes(t(), non_neg_integer()) :: t() def set_scanner_log_fetch_min_bytes(%__MODULE__{} = config, min_bytes) - when is_integer(min_bytes), + when is_non_neg_integer(min_bytes), do: %{config | scanner_log_fetch_min_bytes: min_bytes} @spec set_scanner_log_fetch_wait_max_time_ms(t(), non_neg_integer()) :: t() def set_scanner_log_fetch_wait_max_time_ms(%__MODULE__{} = config, wait_ms) - when is_integer(wait_ms), + when is_non_neg_integer(wait_ms), do: %{config | scanner_log_fetch_wait_max_time_ms: wait_ms} @spec set_scanner_log_max_poll_records(t(), non_neg_integer()) :: t() - def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_integer(num), + def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_non_neg_integer(num), do: %{config | scanner_log_max_poll_records: num} @spec set_scanner_remote_log_prefetch_num(t(), non_neg_integer()) :: t() - def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num) when is_integer(num), - do: %{config | scanner_remote_log_prefetch_num: num} + def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num) + when is_non_neg_integer(num), + do: %{config | scanner_remote_log_prefetch_num: num} @spec set_scanner_remote_log_read_concurrency(t(), non_neg_integer()) :: t() def set_scanner_remote_log_read_concurrency(%__MODULE__{} = config, concurrency) - when is_integer(concurrency), + when is_non_neg_integer(concurrency), do: %{config | scanner_remote_log_read_concurrency: concurrency} @spec set_security_protocol(t(), String.t()) :: t() @@ -163,11 +166,11 @@ defmodule Fluss.Config do do: %{config | writer_acks: acks} @spec set_writer_batch_size(t(), non_neg_integer()) :: t() - def set_writer_batch_size(%__MODULE__{} = config, size) when is_integer(size), + def set_writer_batch_size(%__MODULE__{} = config, size) when is_non_neg_integer(size), do: %{config | writer_batch_size: size} @spec set_writer_batch_timeout_ms(t(), non_neg_integer()) :: t() - def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), + def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms), do: %{config | writer_batch_timeout_ms: ms} @spec set_writer_bucket_no_key_assigner(t(), :sticky | :round_robin) :: t() @@ -176,11 +179,11 @@ defmodule Fluss.Config do do: %{config | writer_bucket_no_key_assigner: assigner} @spec set_writer_buffer_memory_size(t(), non_neg_integer()) :: t() - def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_integer(size), + def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_non_neg_integer(size), do: %{config | writer_buffer_memory_size: size} @spec set_writer_buffer_wait_timeout_ms(t(), non_neg_integer()) :: t() - def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), + def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms), do: %{config | writer_buffer_wait_timeout_ms: ms} @spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t() @@ -189,8 +192,9 @@ defmodule Fluss.Config do do: %{config | writer_dynamic_batch_size_enabled: enabled} @spec set_writer_dynamic_batch_size_min(t(), non_neg_integer()) :: t() - def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), - do: %{config | writer_dynamic_batch_size_min: size} + def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) + when is_non_neg_integer(size), + do: %{config | writer_dynamic_batch_size_min: size} @spec set_writer_enable_idempotence(t(), boolean()) :: t() def set_writer_enable_idempotence(%__MODULE__{} = config, enabled) @@ -199,15 +203,15 @@ defmodule Fluss.Config do @spec set_writer_max_inflight_requests_per_bucket(t(), non_neg_integer()) :: t() def set_writer_max_inflight_requests_per_bucket(%__MODULE__{} = config, n) - when is_integer(n), + when is_non_neg_integer(n), do: %{config | writer_max_inflight_requests_per_bucket: n} @spec set_writer_request_max_size(t(), non_neg_integer()) :: t() - def set_writer_request_max_size(%__MODULE__{} = config, size) when is_integer(size), + def set_writer_request_max_size(%__MODULE__{} = config, size) when is_non_neg_integer(size), do: %{config | writer_request_max_size: size} @spec set_writer_retries(t(), non_neg_integer()) :: t() - def set_writer_retries(%__MODULE__{} = config, n) when is_integer(n), + def set_writer_retries(%__MODULE__{} = config, n) when is_non_neg_integer(n), do: %{config | writer_retries: n} @spec get_bootstrap_servers(t()) :: String.t() diff --git a/bindings/elixir/mix.exs b/bindings/elixir/mix.exs index f5d416d8..9334be90 100644 --- a/bindings/elixir/mix.exs +++ b/bindings/elixir/mix.exs @@ -27,8 +27,14 @@ defmodule Fluss.MixProject do elixir: "~> 1.15", start_permanent: Mix.env() == :prod, elixirc_paths: elixirc_paths(Mix.env()), + aliases: aliases(), deps: deps(), description: "Elixir client for Apache Fluss", + dialyzer: [ + plt_add_apps: [:ex_unit, :mix], + plt_file: {:no_warn, "priv/plts/dialyzer.plt"}, + list_unused_filters: true + ], package: package() ] end @@ -46,7 +52,23 @@ defmodule Fluss.MixProject do [ {:rustler, "~> 0.37"}, {:ex_doc, "~> 0.31", only: :dev, runtime: false}, - {:credo, "~> 1.7", only: [:dev, :test], runtime: false} + {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false} + ] + end + + defp aliases do + [ + check: static_checks() + ] + end + + defp static_checks do + [ + "format --check-formatted", + "credo --strict", + "compile --warnings-as-errors", + "dialyzer" ] end diff --git a/bindings/elixir/mix.lock b/bindings/elixir/mix.lock index b1170d3f..5e931125 100644 --- a/bindings/elixir/mix.lock +++ b/bindings/elixir/mix.lock @@ -1,7 +1,9 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "credo": {:hex, :credo, "1.7.17", "f92b6aa5b26301eaa5a35e4d48ebf5aa1e7094ac00ae38f87086c562caf8a22f", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1eb5645c835f0b6c9b5410f94b5a185057bcf6d62a9c2b476da971cde8749645"}, + "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, + "erlex": {:hex, :erlex, "0.2.9", "7debbbaa9f4f368b8cd648983e0f1d7963028508e9c59e9d4ed504e94ef52a55", [:mix], [], "hexpm", "8cfffc0ec7159e6d73de2ab28a588064de80f88b2798d5cbe4482cbbc200178b"}, "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, diff --git a/bindings/elixir/test/config_test.exs b/bindings/elixir/test/config_test.exs index f4b8a11c..767ac247 100644 --- a/bindings/elixir/test/config_test.exs +++ b/bindings/elixir/test/config_test.exs @@ -214,6 +214,14 @@ defmodule Fluss.ConfigTest do assert config.writer_retries == 5 end + test "set_writer_batch_size/2 rejects negative integers" do + config = Fluss.Config.new("h:9123") + + assert_raise FunctionClauseError, fn -> + Fluss.Config.set_writer_batch_size(config, -1) + end + end + test "setters chain correctly" do config = Fluss.Config.new("localhost:9123")