Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions tests/unit/vertexai/genai/test_evals.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
from vertexai._genai import _evals_metric_loaders
from vertexai._genai import _gcs_utils
from vertexai._genai import _observability_data_converter
from vertexai._genai import _transformers
from vertexai._genai import evals
from vertexai._genai import types as vertexai_genai_types
from vertexai._genai.types import common as common_types
from google.genai import client
from google.genai import errors as genai_errors
from google.genai import types as genai_types
Expand Down Expand Up @@ -218,6 +220,49 @@ def test_get_api_client_with_none_location(
mock_vertexai_client.assert_not_called()


class TestTransformers:
"""Unit tests for transformers."""

def test_t_inline_results(self):
eval_result = common_types.EvaluationResult(
eval_case_results=[
common_types.EvalCaseResult(
eval_case_index=0,
response_candidate_results=[
common_types.ResponseCandidateResult(
response_index=0,
metric_results={
"tool_use_quality": common_types.EvalCaseMetricResult(
score=0.0,
explanation="Failed tool use",
)
}
)
]
)
],
evaluation_dataset=[
common_types.EvaluationDataset(
eval_cases=[
common_types.EvalCase(prompt=genai_types.Content(parts=[genai_types.Part(text="test prompt")]))
]
)
],
metadata=common_types.EvaluationRunMetadata(
candidate_names=["gemini-pro"]
)
)

payload = _transformers.t_inline_results([eval_result])

assert len(payload) == 1
assert payload[0]["metric"] == "tool_use_quality"
assert payload[0]["request"]["prompt"]["text"] == "test prompt"
assert len(payload[0]["candidate_results"]) == 1
assert payload[0]["candidate_results"][0]["candidate"] == "gemini-pro"
assert payload[0]["candidate_results"][0]["score"] == 0.0


class TestEvals:
"""Unit tests for the GenAI client."""

Expand Down
114 changes: 114 additions & 0 deletions vertexai/_genai/_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.genai._common import get_value_by_path as getv

from . import _evals_constant
from . import _evals_data_converters
from . import types

_METRIC_RES_NAME_RE = r"^projects/[^/]+/locations/[^/]+/evaluationMetrics/[^/]+$"
Expand Down Expand Up @@ -241,3 +242,116 @@ def t_metric_for_registry(
raise ValueError(f"Unsupported metric type: {metric_name}")

return metric_payload_item


def t_inline_results(
eval_results: list[Any],
) -> list[dict[str, Any]]:
"""Transforms a list of SDK EvaluationResults into API EvaluationResults."""
api_results = []

for eval_result in eval_results:
metadata = getv(eval_result, ["metadata"])
candidate_names = getv(metadata, ["candidate_names"]) if metadata else []
candidate_names = candidate_names or []

eval_dataset = getv(eval_result, ["evaluation_dataset"])
eval_cases = []
if isinstance(eval_dataset, list) and eval_dataset:
eval_cases = getv(eval_dataset[0], ["eval_cases"]) or []

eval_case_results = getv(eval_result, ["eval_case_results"]) or []

for case_result in eval_case_results:
case_idx = getv(case_result, ["eval_case_index"]) or 0

eval_case = None
if 0 <= case_idx < len(eval_cases):
eval_case = eval_cases[case_idx]

prompt_payload = {}
if eval_case:
agent_data = getv(eval_case, ["agent_data"])
prompt = getv(eval_case, ["prompt"])

if agent_data:
if hasattr(agent_data, "model_dump"):
prompt_payload["agent_data"] = agent_data.model_dump()
else:
prompt_payload["agent_data"] = agent_data
elif prompt:
text = _evals_data_converters._get_content_text(prompt) # pylint: disable=protected-access
if text:
prompt_payload["text"] = str(text)

cand_results = getv(case_result, ["response_candidate_results"]) or []
for resp_cand_result in cand_results:
resp_idx = getv(resp_cand_result, ["response_index"]) or 0
cand_name = f"candidate-{resp_idx}"
if 0 <= resp_idx < len(candidate_names):
cand_name = candidate_names[resp_idx]

metric_results = getv(resp_cand_result, ["metric_results"]) or {}

for metric_name, metric_res in metric_results.items():
api_rubric_verdicts = []
rubric_verdicts = getv(metric_res, ["rubric_verdicts"]) or []

for verdict in rubric_verdicts:
verdict_dict = {}
eval_rubric = getv(verdict, ["evaluated_rubric"])

if eval_rubric:
rubric_content = getv(eval_rubric, ["content"])
if rubric_content:
text = getv(rubric_content, ["text"])
prop = getv(rubric_content, ["property"])

content_dict = {}
if text:
content_dict["text"] = str(text)
if prop:
desc = getv(prop, ["description"])
if desc:
content_dict["property"] = {
"description": str(desc)
}
verdict_dict["evaluated_rubric"] = {
"content": content_dict
}

score = getv(verdict, ["score"])
if score is not None:
verdict_dict["score"] = float(score)

explanation = getv(verdict, ["explanation"])
if explanation:
verdict_dict["explanation"] = str(explanation)

if verdict_dict:
api_rubric_verdicts.append(verdict_dict)

score = getv(metric_res, ["score"])
explanation = getv(metric_res, ["explanation"])

candidate_result_payload = {
"candidate": str(cand_name),
"metric": str(metric_name),
}
if score is not None:
candidate_result_payload["score"] = float(score)
if explanation:
candidate_result_payload["explanation"] = str(explanation)
if api_rubric_verdicts:
candidate_result_payload["rubric_verdicts"] = (
api_rubric_verdicts
)

api_eval_result = {
"request": {"prompt": prompt_payload},
"metric": str(metric_name),
"candidate_results": [candidate_result_payload],
}
api_results.append(api_eval_result)

return api_results
147 changes: 147 additions & 0 deletions vertexai/_genai/evals.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,33 @@ def _GenerateInstanceRubricsRequest_to_vertex(
return to_object


def _GenerateLossClustersParameters_to_vertex(
from_object: Union[dict[str, Any], object],
parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
to_object: dict[str, Any] = {}
if getv(from_object, ["location"]) is not None:
setv(to_object, ["location"], getv(from_object, ["location"]))

if getv(from_object, ["evaluation_set"]) is not None:
setv(to_object, ["evaluationSet"], getv(from_object, ["evaluation_set"]))

if getv(from_object, ["inline_results"]) is not None:
setv(
to_object,
["inlineResults", "evaluationResults"],
t.t_inline_results(getv(from_object, ["inline_results"])),
)

if getv(from_object, ["configs"]) is not None:
setv(to_object, ["configs"], [item for item in getv(from_object, ["configs"])])

if getv(from_object, ["config"]) is not None:
setv(to_object, ["config"], getv(from_object, ["config"]))

return to_object


def _GenerateUserScenariosParameters_to_vertex(
from_object: Union[dict[str, Any], object],
parent_object: Optional[dict[str, Any]] = None,
Expand Down Expand Up @@ -1268,6 +1295,65 @@ def _generate_user_scenarios(
self._api_client._verify_response(return_value)
return return_value

def _generate_loss_clusters(
self,
*,
location: Optional[str] = None,
evaluation_set: Optional[str] = None,
inline_results: Optional[list[types.EvaluationResultOrDict]] = None,
configs: Optional[list[types.LossAnalysisConfigOrDict]] = None,
config: Optional[types.GenerateLossClustersConfigOrDict] = None,
) -> types.GenerateLossClustersOperation:
"""
Generates loss clusters from evaluation results.
"""

parameter_model = types._GenerateLossClustersParameters(
location=location,
evaluation_set=evaluation_set,
inline_results=inline_results,
configs=configs,
config=config,
)

request_url_dict: Optional[dict[str, str]]
if not self._api_client.vertexai:
raise ValueError("This method is only supported in the Vertex AI client.")
else:
request_dict = _GenerateLossClustersParameters_to_vertex(parameter_model)
request_url_dict = request_dict.get("_url")
if request_url_dict:
path = ":generateLossClusters".format_map(request_url_dict)
else:
path = ":generateLossClusters"

query_params = request_dict.get("_query")
if query_params:
path = f"{path}?{urlencode(query_params)}"
# TODO: remove the hack that pops config.
request_dict.pop("config", None)

http_options: Optional[types.HttpOptions] = None
if (
parameter_model.config is not None
and parameter_model.config.http_options is not None
):
http_options = parameter_model.config.http_options

request_dict = _common.convert_to_dict(request_dict)
request_dict = _common.encode_unserializable_types(request_dict)

response = self._api_client.request("post", path, request_dict, http_options)

response_dict = {} if not response.body else json.loads(response.body)

return_value = types.GenerateLossClustersOperation._from_response(
response=response_dict, kwargs=parameter_model.model_dump()
)

self._api_client._verify_response(return_value)
return return_value

def _generate_rubrics(
self,
*,
Expand Down Expand Up @@ -2833,6 +2919,67 @@ async def _generate_user_scenarios(
self._api_client._verify_response(return_value)
return return_value

async def _generate_loss_clusters(
self,
*,
location: Optional[str] = None,
evaluation_set: Optional[str] = None,
inline_results: Optional[list[types.EvaluationResultOrDict]] = None,
configs: Optional[list[types.LossAnalysisConfigOrDict]] = None,
config: Optional[types.GenerateLossClustersConfigOrDict] = None,
) -> types.GenerateLossClustersOperation:
"""
Generates loss clusters from evaluation results.
"""

parameter_model = types._GenerateLossClustersParameters(
location=location,
evaluation_set=evaluation_set,
inline_results=inline_results,
configs=configs,
config=config,
)

request_url_dict: Optional[dict[str, str]]
if not self._api_client.vertexai:
raise ValueError("This method is only supported in the Vertex AI client.")
else:
request_dict = _GenerateLossClustersParameters_to_vertex(parameter_model)
request_url_dict = request_dict.get("_url")
if request_url_dict:
path = ":generateLossClusters".format_map(request_url_dict)
else:
path = ":generateLossClusters"

query_params = request_dict.get("_query")
if query_params:
path = f"{path}?{urlencode(query_params)}"
# TODO: remove the hack that pops config.
request_dict.pop("config", None)

http_options: Optional[types.HttpOptions] = None
if (
parameter_model.config is not None
and parameter_model.config.http_options is not None
):
http_options = parameter_model.config.http_options

request_dict = _common.convert_to_dict(request_dict)
request_dict = _common.encode_unserializable_types(request_dict)

response = await self._api_client.async_request(
"post", path, request_dict, http_options
)

response_dict = {} if not response.body else json.loads(response.body)

return_value = types.GenerateLossClustersOperation._from_response(
response=response_dict, kwargs=parameter_model.model_dump()
)

self._api_client._verify_response(return_value)
return return_value

async def _generate_rubrics(
self,
*,
Expand Down
Loading
Loading