Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build_and_test_elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ jobs:
working-directory: bindings/elixir
run: mix credo

- name: Cache PLT
uses: actions/cache@v4
with:
path: bindings/elixir/priv/plts/
key: ${{ runner.os }}-otp-${{ steps.beam.outputs.otp-version }}-elixir-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('bindings/elixir/mix.lock') }}-plt
restore-keys: |
${{ runner.os }}-otp-${{ steps.beam.outputs.otp-version }}-elixir-${{ steps.beam.outputs.elixir-version }}-mix-

- name: Dialyzer
working-directory: bindings/elixir
run: mix dialyzer

- name: Run unit tests
working-directory: bindings/elixir
run: mix test
Expand Down
1 change: 1 addition & 0 deletions bindings/elixir/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ deps/

# Generated NIF shared library
priv/native/
priv/plts/

# Crash dumps
erl_crash.dump
42 changes: 23 additions & 19 deletions bindings/elixir/lib/fluss/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ defmodule Fluss.Config do
writer_retries: non_neg_integer() | nil
}

defguardp is_non_neg_integer(n) when is_integer(n) and n >= 0

@spec new(String.t()) :: t()
def new(bootstrap_servers) when is_binary(bootstrap_servers) do
%__MODULE__{bootstrap_servers: bootstrap_servers}
Expand All @@ -101,45 +103,46 @@ defmodule Fluss.Config do
do: %{config | bootstrap_servers: servers}

@spec set_connect_timeout_ms(t(), non_neg_integer()) :: t()
def set_connect_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
def set_connect_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms),
do: %{config | connect_timeout_ms: ms}

@spec set_remote_file_download_thread_num(t(), non_neg_integer()) :: t()
def set_remote_file_download_thread_num(%__MODULE__{} = config, threads)
when is_integer(threads),
when is_non_neg_integer(threads),
do: %{config | remote_file_download_thread_num: threads}

@spec set_scanner_log_fetch_max_bytes(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_max_bytes(%__MODULE__{} = config, max_bytes)
when is_integer(max_bytes),
when is_non_neg_integer(max_bytes),
do: %{config | scanner_log_fetch_max_bytes: max_bytes}

@spec set_scanner_log_fetch_max_bytes_for_bucket(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_max_bytes_for_bucket(%__MODULE__{} = config, max_bytes)
when is_integer(max_bytes),
when is_non_neg_integer(max_bytes),
do: %{config | scanner_log_fetch_max_bytes_for_bucket: max_bytes}

@spec set_scanner_log_fetch_min_bytes(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_min_bytes(%__MODULE__{} = config, min_bytes)
when is_integer(min_bytes),
when is_non_neg_integer(min_bytes),
do: %{config | scanner_log_fetch_min_bytes: min_bytes}

@spec set_scanner_log_fetch_wait_max_time_ms(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_wait_max_time_ms(%__MODULE__{} = config, wait_ms)
when is_integer(wait_ms),
when is_non_neg_integer(wait_ms),
do: %{config | scanner_log_fetch_wait_max_time_ms: wait_ms}

@spec set_scanner_log_max_poll_records(t(), non_neg_integer()) :: t()
def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_integer(num),
def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_non_neg_integer(num),
do: %{config | scanner_log_max_poll_records: num}

@spec set_scanner_remote_log_prefetch_num(t(), non_neg_integer()) :: t()
def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num) when is_integer(num),
do: %{config | scanner_remote_log_prefetch_num: num}
def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num)
when is_non_neg_integer(num),
do: %{config | scanner_remote_log_prefetch_num: num}

@spec set_scanner_remote_log_read_concurrency(t(), non_neg_integer()) :: t()
def set_scanner_remote_log_read_concurrency(%__MODULE__{} = config, concurrency)
when is_integer(concurrency),
when is_non_neg_integer(concurrency),
do: %{config | scanner_remote_log_read_concurrency: concurrency}

@spec set_security_protocol(t(), String.t()) :: t()
Expand All @@ -163,11 +166,11 @@ defmodule Fluss.Config do
do: %{config | writer_acks: acks}

@spec set_writer_batch_size(t(), non_neg_integer()) :: t()
def set_writer_batch_size(%__MODULE__{} = config, size) when is_integer(size),
def set_writer_batch_size(%__MODULE__{} = config, size) when is_non_neg_integer(size),
do: %{config | writer_batch_size: size}

@spec set_writer_batch_timeout_ms(t(), non_neg_integer()) :: t()
def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms),
do: %{config | writer_batch_timeout_ms: ms}

@spec set_writer_bucket_no_key_assigner(t(), :sticky | :round_robin) :: t()
Expand All @@ -176,11 +179,11 @@ defmodule Fluss.Config do
do: %{config | writer_bucket_no_key_assigner: assigner}

@spec set_writer_buffer_memory_size(t(), non_neg_integer()) :: t()
def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_integer(size),
def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_non_neg_integer(size),
do: %{config | writer_buffer_memory_size: size}

@spec set_writer_buffer_wait_timeout_ms(t(), non_neg_integer()) :: t()
def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_non_neg_integer(ms),
do: %{config | writer_buffer_wait_timeout_ms: ms}

@spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t()
Expand All @@ -189,8 +192,9 @@ defmodule Fluss.Config do
do: %{config | writer_dynamic_batch_size_enabled: enabled}

@spec set_writer_dynamic_batch_size_min(t(), non_neg_integer()) :: t()
def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_dynamic_batch_size_min: size}
def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size)
when is_non_neg_integer(size),
do: %{config | writer_dynamic_batch_size_min: size}

@spec set_writer_enable_idempotence(t(), boolean()) :: t()
def set_writer_enable_idempotence(%__MODULE__{} = config, enabled)
Expand All @@ -199,15 +203,15 @@ defmodule Fluss.Config do

@spec set_writer_max_inflight_requests_per_bucket(t(), non_neg_integer()) :: t()
def set_writer_max_inflight_requests_per_bucket(%__MODULE__{} = config, n)
when is_integer(n),
when is_non_neg_integer(n),
do: %{config | writer_max_inflight_requests_per_bucket: n}

@spec set_writer_request_max_size(t(), non_neg_integer()) :: t()
def set_writer_request_max_size(%__MODULE__{} = config, size) when is_integer(size),
def set_writer_request_max_size(%__MODULE__{} = config, size) when is_non_neg_integer(size),
do: %{config | writer_request_max_size: size}

@spec set_writer_retries(t(), non_neg_integer()) :: t()
def set_writer_retries(%__MODULE__{} = config, n) when is_integer(n),
def set_writer_retries(%__MODULE__{} = config, n) when is_non_neg_integer(n),
do: %{config | writer_retries: n}

@spec get_bootstrap_servers(t()) :: String.t()
Expand Down
24 changes: 23 additions & 1 deletion bindings/elixir/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ defmodule Fluss.MixProject do
elixir: "~> 1.15",
start_permanent: Mix.env() == :prod,
elixirc_paths: elixirc_paths(Mix.env()),
aliases: aliases(),
deps: deps(),
description: "Elixir client for Apache Fluss",
dialyzer: [
plt_add_apps: [:ex_unit, :mix],
plt_file: {:no_warn, "priv/plts/dialyzer.plt"},
list_unused_filters: true
],
package: package()
]
end
Expand All @@ -46,7 +52,23 @@ defmodule Fluss.MixProject do
[
{:rustler, "~> 0.37"},
{:ex_doc, "~> 0.31", only: :dev, runtime: false},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false}
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}
]
end

defp aliases do
[
check: static_checks()
]
end

defp static_checks do
[
"format --check-formatted",
"credo --strict",
"compile --warnings-as-errors",
"dialyzer"
]
end

Expand Down
2 changes: 2 additions & 0 deletions bindings/elixir/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
%{
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.17", "f92b6aa5b26301eaa5a35e4d48ebf5aa1e7094ac00ae38f87086c562caf8a22f", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1eb5645c835f0b6c9b5410f94b5a185057bcf6d62a9c2b476da971cde8749645"},
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
"erlex": {:hex, :erlex, "0.2.9", "7debbbaa9f4f368b8cd648983e0f1d7963028508e9c59e9d4ed504e94ef52a55", [:mix], [], "hexpm", "8cfffc0ec7159e6d73de2ab28a588064de80f88b2798d5cbe4482cbbc200178b"},
"ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"},
"file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
Expand Down
8 changes: 8 additions & 0 deletions bindings/elixir/test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ defmodule Fluss.ConfigTest do
assert config.writer_retries == 5
end

test "set_writer_batch_size/2 rejects negative integers" do
config = Fluss.Config.new("h:9123")

assert_raise FunctionClauseError, fn ->
Fluss.Config.set_writer_batch_size(config, -1)
end
end

test "setters chain correctly" do
config =
Fluss.Config.new("localhost:9123")
Expand Down
Loading