Skip to content

Commit 433ff36

Browse files
authored
Added registry management functions. Schema recovery working (#6)
1 parent 3c17f6a commit 433ff36

File tree

7 files changed

+354
-51
lines changed

7 files changed

+354
-51
lines changed

kafka_schema_registry_admin/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
__email__ = "john@ews-network.net"
88
__version__ = "0.4.0"
99

10-
from .kafka_schema_registry_admin import SchemaRegistry
10+
from .kafka_schema_registry_admin import CompatibilityMode, RegistryMode, SchemaRegistry
1111

12-
__all__ = ["SchemaRegistry"]
12+
__all__ = ["SchemaRegistry", "RegistryMode", "CompatibilityMode"]

kafka_schema_registry_admin/client_wrapper/errors.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ def __init__(self, code, details):
4444
if error_message.startswith(
4545
"Schema being registered is incompatible with an earlier schema for subject"
4646
):
47-
raise IncompatibleSchema(code, details)
48-
super().__init__(details, code, details[1:])
47+
raise IncompatibleSchema(code, details[1:])
48+
else:
49+
super().__init__(details, code, details[1:])
4950

5051

5152
class IncompatibleSchema(ApiGenericException):
@@ -84,6 +85,16 @@ def __init__(self, code, details):
8485
super().__init__("Forbidden", code, details)
8586

8687

88+
class UnprocessableEntity(ApiGenericException):
89+
def __init__(self, code, details):
90+
super().__init__("Unprocessable Entity", code, details)
91+
92+
93+
class UnexpectedException(ApiGenericException):
94+
def __init__(self, code, details):
95+
super().__init__("Unexpected Error", code, details)
96+
97+
8798
class SchemaRegistryApiException(ApiGenericException):
8899
"""
89100
Top class for DatabaseUser exceptions
@@ -94,11 +105,18 @@ class SchemaRegistryApiException(ApiGenericException):
94105
404: NotFoundException,
95106
401: UnauthorizedException,
96107
403: ForbiddenException,
108+
422: UnprocessableEntity,
97109
}
98110

99111
def __init__(self, code, details):
100-
exception_class = self.EXCEPTION_CLASSES.get(code, ApiGenericException)
101-
super().__init__("Api Error", code, details)
112+
exception_class = self.EXCEPTION_CLASSES.get(code, UnexpectedException)
113+
super().__init__(
114+
details[0] is isinstance(details[0], str)
115+
and details[0]
116+
or "SchemaRegistry Api Error",
117+
code,
118+
details,
119+
)
102120
self.exception_instance = exception_class(code, details)
103121

104122

kafka_schema_registry_admin/kafka_schema_registry_admin.py

Lines changed: 163 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@ class Type(Enum):
3030
PROTOBUFF = "PROTOBUF"
3131

3232

33+
class RegistryMode(Enum):
34+
IMPORT = "IMPORT"
35+
READONLY = "READONLY"
36+
READWRITE = "READWRITE"
37+
38+
39+
class CompatibilityMode(Enum):
40+
BACKWARD = "BACKWARD"
41+
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
42+
FORWARD = "FORWARD"
43+
FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
44+
FULL = "FULL"
45+
FULL_TRANSITIVE = "FULL_TRANSITIVE"
46+
NONE = "NONE"
47+
48+
3349
class SchemaRegistry:
3450

3551
def __init__(self, base_url: str, *args, **kwargs):
@@ -133,7 +149,13 @@ def post_subject_schema(
133149
return req
134150

135151
def post_subject_schema_version(
136-
self, subject_name, definition, normalize: bool = False, schema_type=None
152+
self,
153+
subject_name,
154+
definition,
155+
normalize: bool = False,
156+
schema_type=None,
157+
version_id: int = None,
158+
schema_id: int = None,
137159
) -> Response:
138160
"""
139161
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`__
@@ -154,6 +176,11 @@ def post_subject_schema_version(
154176
url = f"/subjects/{subject_name}/versions"
155177
if normalize:
156178
url = f"{url}?normalize=true"
179+
"""When trying to do recovery, SR must be in import mode either globally or for the subject itself."""
180+
if version_id and schema_id:
181+
payload["version"] = version_id
182+
payload["id"] = schema_id
183+
157184
req = self.client.post(url, json=payload)
158185
return req
159186

@@ -215,9 +242,10 @@ def post_compatibility_subject_versions(
215242
references: list = None,
216243
) -> Response:
217244
url = f"/compatibility/subjects/{subject_name}/versions"
218-
return self.validate_subject_compatibility(
245+
payload = self.set_subject_validity_payload(
219246
url, definition, schema_type, verbose=verbose, references=references
220247
)
248+
return self.client.post(url, json=payload)
221249

222250
def post_compatibility_subject_version_id(
223251
self,
@@ -229,18 +257,19 @@ def post_compatibility_subject_version_id(
229257
references: list = None,
230258
) -> Response:
231259
url = f"/compatibility/subjects/{subject_name}/versions/{version_id}"
232-
return self.validate_subject_compatibility(
260+
payload = self.set_subject_validity_payload(
233261
url, definition, schema_type, verbose=verbose, references=references
234262
)
263+
return self.client.post(url, json=payload)
235264

236-
def validate_subject_compatibility(
237-
self,
265+
@staticmethod
266+
def set_subject_validity_payload(
238267
url: str,
239268
definition,
240269
schema_type,
241270
verbose: bool = False,
242271
references: list = None,
243-
) -> Response:
272+
) -> dict:
244273
if verbose:
245274
url = f"{url}?verbose=true"
246275
LOG.debug(url)
@@ -254,9 +283,7 @@ def validate_subject_compatibility(
254283
payload = {"schema": definition, "schemaType": schema_type}
255284
if references and isinstance(references, list):
256285
payload["references"] = references
257-
258-
req = self.client.post(url, json=payload)
259-
return req
286+
return payload
260287

261288
def get_compatibility_subject_config(self, subject_name) -> Response:
262289
url = f"/config/{subject_name}/"
@@ -268,3 +295,130 @@ def put_compatibility_subject_config(self, subject_name, compatibility) -> Respo
268295
payload = {"compatibility": compatibility}
269296
req = self.client.put(url, json=payload)
270297
return req
298+
299+
def get_mode(self, as_str: bool = False) -> Response | str:
300+
"""
301+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#mode>`__
302+
"""
303+
url_path: str = "/mode"
304+
req = self.client.get(url_path)
305+
if as_str:
306+
return RegistryMode[req.json().get("mode")].value
307+
return req
308+
309+
def put_mode(self, mode: str | RegistryMode, force: bool = False) -> Response:
310+
"""
311+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#put--mode>`__
312+
"""
313+
url_path: str = "/mode"
314+
if force:
315+
url_path += "?force=true"
316+
req = self.client.put(url_path, json={"mode": mode})
317+
return req
318+
319+
def get_subject_mode(self, subject_name: str) -> Response:
320+
"""
321+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--mode-(string-%20subject)>`__
322+
"""
323+
url_path: str = f"/mode/{subject_name}"
324+
req = self.client.get(url_path)
325+
return req
326+
327+
def put_subject_mode(
328+
self, subject_name: str, mode: str | RegistryMode, force: bool = False
329+
) -> Response:
330+
"""
331+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#put--mode-(string-%20subject)>`__
332+
"""
333+
url_path: str = f"/mode/{subject_name}"
334+
if force:
335+
url_path += "?force=true"
336+
req = self.client.put(url_path, json={"mode": mode})
337+
return req
338+
339+
def get_config(self):
340+
"""
341+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--config>`__
342+
"""
343+
url_path: str = "/config"
344+
req = self.client.get(url_path)
345+
return req
346+
347+
def put_config(
348+
self,
349+
alias: str = None,
350+
normalize: bool = False,
351+
compatibility: str | CompatibilityMode = "NONE",
352+
):
353+
"""
354+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#put--config>`__
355+
"""
356+
url_path: str = "/config"
357+
payload: dict = {}
358+
if compatibility:
359+
payload["compatibility"] = compatibility
360+
if alias:
361+
payload["alias"] = alias
362+
if normalize:
363+
payload["normalize"] = normalize
364+
req = self.client.put(url_path, json=payload)
365+
return req
366+
367+
def get_subject_config(
368+
self,
369+
subject: str,
370+
default: bool = False,
371+
alias: str = None,
372+
normalize: bool = False,
373+
compatibility: str | CompatibilityMode = "NONE",
374+
):
375+
"""
376+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--config>`__
377+
"""
378+
url_path: str = f"/config/{subject}"
379+
if default:
380+
url_path += "?defaultToGlobal=true"
381+
payload: dict = {}
382+
if compatibility:
383+
payload["compatibility"] = compatibility
384+
if alias:
385+
payload["alias"] = alias
386+
if normalize:
387+
payload["normalize"] = normalize
388+
req = self.client.get(url_path, json=payload)
389+
return req
390+
391+
def put_subject_config(
392+
self,
393+
subject: str,
394+
alias: str = None,
395+
normalize: bool = False,
396+
compatibility: str | CompatibilityMode = "NONE",
397+
):
398+
"""
399+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#put--config>`__
400+
"""
401+
url_path: str = f"/config/{subject}"
402+
payload: dict = {}
403+
if compatibility:
404+
payload["compatibility"] = compatibility
405+
if alias:
406+
payload["alias"] = alias
407+
if normalize:
408+
payload["normalize"] = normalize
409+
req = self.client.put(url_path, json=payload)
410+
return req
411+
412+
def delete_subject_config(
413+
self,
414+
subject: str,
415+
alias: str = None,
416+
normalize: bool = False,
417+
compatibility: str | CompatibilityMode = "NONE",
418+
):
419+
"""
420+
`API Doc <https://docs.confluent.io/platform/current/schema-registry/develop/api.html#delete--config-(string-%20subject)>`__
421+
"""
422+
url_path: str = f"/config/{subject}"
423+
req = self.client.delete(url_path)
424+
return req

tests/conftest.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,42 @@
22

33
from os import path
44

5+
import pytest
56
from testcontainers.compose import DockerCompose
67

8+
from kafka_schema_registry_admin import SchemaRegistry
9+
710
HERE = path.abspath(path.dirname(__file__))
811

9-
compose = DockerCompose(
10-
path.abspath(f"{HERE}/.."), compose_file_name="docker-compose.yaml", wait=True
12+
13+
docker_compose = DockerCompose(
14+
path.abspath(f"{HERE}/.."),
15+
compose_file_name="docker-compose.yaml",
16+
wait=True,
17+
pull=True,
1118
)
1219

20+
docker_compose.stop(down=True)
21+
docker_compose.start()
22+
sr_port = int(docker_compose.get_service_port("schema-registry", 8081))
23+
base_url: str = f"http://localhost:{sr_port}"
24+
docker_compose.wait_for(f"{base_url}/subjects")
25+
26+
27+
@pytest.fixture(scope="session")
28+
def local_registry():
29+
return SchemaRegistry(base_url)
30+
31+
32+
@pytest.fixture(scope="session")
33+
def authed_local_registry():
34+
return SchemaRegistry(
35+
base_url,
36+
**{"basic_auth.username": "confluent", "basic_auth.password": "confluent"},
37+
)
38+
1339

1440
def pytest_sessionfinish(session, exitstatus):
15-
# compose.stop()
41+
docker_compose.stop()
1642
print("Testing session has finished")
1743
print(f"Exit status: {exitstatus}")

tests/test_registry_config.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# SPDX-License-Identifier: Apache License 2.0
2+
# Copyright 2020-2021 John Mille <john@ews-network.net>
3+
import json
4+
from copy import deepcopy
5+
from os import path
6+
7+
import pytest
8+
9+
from kafka_schema_registry_admin import CompatibilityMode, RegistryMode, SchemaRegistry
10+
11+
12+
def test_changing_compatibility(authed_local_registry):
13+
config = authed_local_registry.get_config().json()
14+
print("CONFIG 1?", config)
15+
authed_local_registry.put_config(
16+
compatibility=CompatibilityMode.FULL.value, normalize=True
17+
)
18+
config = authed_local_registry.get_config().json()
19+
print("CONFIG 2?", config)
20+
21+
22+
def test_changing_mode(authed_local_registry):
23+
mode = authed_local_registry.get_mode(as_str=True)
24+
assert mode == RegistryMode.READWRITE.value
25+
26+
authed_local_registry.put_mode(RegistryMode.READONLY.value)
27+
assert (
28+
authed_local_registry.get_mode().json()["mode"] == RegistryMode.READONLY.value
29+
)
30+
31+
authed_local_registry.put_mode(RegistryMode.IMPORT.value)
32+
assert authed_local_registry.get_mode().json()["mode"] == RegistryMode.IMPORT.value

0 commit comments

Comments
 (0)