Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class Array(BaseModel, frozen=True):
shared: float
system: float | None
replication: float
version: str
id: str


def parse_array(string_table: StringTable) -> Array | None:
Expand All @@ -57,6 +59,8 @@ def parse_array(string_table: StringTable) -> Array | None:
shared=array["space"]["shared"],
system=array["space"]["system"],
replication=array["space"]["replication"],
version=array["version"],
id=array["id"],
)


Expand Down Expand Up @@ -125,3 +129,42 @@ def check_overall_capacity(item: str, params: Mapping[str, Any], section: Array)
**MAGIC_FACTOR_DEFAULT_PARAMS,
},
)


def discovery_pure_storage_fa_version(section: Array) -> DiscoveryResult:
if section.version:
yield Service()


def check_pure_storage_fa_version(section: Array) -> CheckResult:
yield Result(
state=State.OK,
summary=f"Version: {section.version}",
)


check_plugin_pure_storage_fa_version = CheckPlugin(
sections=["pure_storage_fa_arrays"],
name="pure_storage_fa_version",
discovery_function=discovery_pure_storage_fa_version,
check_function=check_pure_storage_fa_version,
service_name="FlashArray Version",
)


def discovery_pure_storage_fa_id(section: Array) -> DiscoveryResult:
if section.id:
yield Service()


def check_pure_storage_fa_id(section: Array) -> CheckResult:
yield Result(state=State.OK, summary=f"ID: {section.id}")


check_plugin_pure_storage_fa_id = CheckPlugin(
sections=["pure_storage_fa_arrays"],
name="pure_storage_fa_id",
discovery_function=discovery_pure_storage_fa_id,
check_function=check_pure_storage_fa_id,
service_name="FlashArray ID",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
# Copyright (C) 2023 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

import json
from datetime import datetime, timezone
from pydantic import BaseModel
from typing import Optional


from cmk.agent_based.v2 import (
AgentSection,
CheckPlugin,
CheckResult,
DiscoveryResult,
Result,
Service,
State,
StringTable,
)


class Certificate(BaseModel, frozen=True):
name: Optional[str] = None
status: Optional[str] = None
state: Optional[str] = None
country: Optional[str] = None
certificate_type: Optional[str] = None
intermediate_certificate: Optional[str] = None
email: Optional[str] = None
subject_alternative_names: Optional[list] = None
common_name: Optional[str] = None
organization: Optional[str] = None
organizational_unit: Optional[str] = None
locality: Optional[str] = None
key_algorithm: Optional[str] = None
key_size: Optional[int] = None
issued_by: Optional[str] = None
issued_to: Optional[str] = None
valid_from: Optional[int] = None
valid_to: Optional[int] = None


def parse_certificates(string_table: StringTable) -> list[Certificate]:
json_data = json.loads(string_table[0][0])
if "items" not in json_data:
return None
parsed = []
for cert in json_data["items"]:
parsed.append(Certificate(**cert))

return parsed


agent_section_pure_storage_fa_certificates = AgentSection(
name="pure_storage_fa_certificates",
parse_function=parse_certificates,
)


def discover_certificates(section: list[Certificate]) -> DiscoveryResult:
for cert in section:
yield Service(item=cert.name)


def check_certificates(item, section: list[Certificate]) -> CheckResult:
for certificate in section:
if item == certificate.name:
now = datetime.now(timezone.utc)
valid_to = datetime.fromtimestamp(
certificate.valid_to / 1000, tz=timezone.utc
)

if valid_to < now:
yield Result(state=State.CRIT, summary="Certificate ran out")
else:
yield Result(
state=State.OK,
summary=f"{certificate.status} certificate valid until {valid_to.strftime('%d.%m.%Y')}",
)


check_plugin_pure_storage_fa_certificates = CheckPlugin(
name="pure_storage_fa_certificates",
service_name="PureStorage Certificate Status %s",
discovery_function=discover_certificates,
check_function=check_certificates,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# Copyright (C) 2023 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

import json
from pydantic import BaseModel
from typing import Optional

from cmk.agent_based.v2 import (
AgentSection,
CheckPlugin,
CheckResult,
DiscoveryResult,
Result,
Service,
State,
StringTable,
)


class Connections(BaseModel, frozen=True):
id: Optional[str] = None
name: Optional[str] = None
management_address: Optional[str] = None
replication_addresses: Optional[list] = None
status: Optional[str] = None
type: Optional[str] = None
replication_transport: Optional[str] = None
version: Optional[str] = None
throttle: Optional[dict] = None


def parse_connections(string_table: StringTable) -> list[Connections]:
json_data = json.loads(string_table[0][0])
if "items" not in json_data:
return None
parsed = []
for connection in json_data["items"]:
parsed.append(Connections(**connection))

return parsed


agent_section_pure_storage_fa_connections = AgentSection(
name="pure_storage_fa_connections",
parse_function=parse_connections,
)


def discover_connections(section: list[Connections]) -> DiscoveryResult:
for connections in section:
params = {
"discovered_state": ["connected", connections.status],
"discovered_throttled": ["False", connections.throttle],
}
yield Service(item=connections.name, parameters=dict(params))


def check_connections(item, params, section: list[Connections]) -> CheckResult:
if section == []:
yield Result(state=State.CRIT, summary="No Array Connection found")
return

for connection in section:
if item == connection.name:
if connection.status in params["discovered_state"]:
if connection.throttle in params["discovered_throttled"]:
summary = f"Array {connection.name} is {connection.status}."
details = f"Throttled: {connection.throttle}\nVersion: {connection.version}\nManagement Address: {connection.management_address}\nReplication Addresses: {connection.replication_addresses}\nType: {connection.type}"
state = State.OK
else:
summary = f"Array {connection.name} is {connection.status}, but throttled."
details = f"Version: {connection.version}\nManagement Address: {connection.management_address}\nReplication Addresses: {connection.replication_addresses}\nType: {connection.type}"
state = State.WARN
else:
summary = (
f"Array {connection.name} is {connection.status}, but throttled."
)
details = f"Throttled: {connection.throttle}\nVersion: {connection.version}\nManagement Address: {connection.management_address}\nReplication Addresses: {connection.replication_addresses}\nType: {connection.type}"
state = State.CRIT

yield Result(state=state, summary=summary, details=details)


check_plugin_pure_storage_fa_connections = CheckPlugin(
name="pure_storage_fa_connections",
service_name="Connection to %s",
discovery_function=discover_connections,
check_function=check_connections,
check_ruleset_name="array_connections_default_levels",
check_default_parameters={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# Copyright (C) 2023 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

import json
from typing import List
from pydantic import BaseModel

from cmk.agent_based.v2 import (
AgentSection,
CheckPlugin,
CheckResult,
DiscoveryResult,
Result,
Service,
State,
StringTable,
check_levels,
Metric,
)


class DNSServer(BaseModel, frozen=True):
domain: str
dns_server: List[str]


def parse_dns(string_table: StringTable) -> DNSServer | None:
json_data = json.loads(string_table[0][0])
if "items" not in json_data:
return None
return DNSServer(
domain=json_data["items"][0]["domain"],
dns_server=json_data["items"][0]["nameservers"],
)


agent_section_pure_storage_fa_dns = AgentSection(
name="pure_storage_fa_dns",
parse_function=parse_dns,
)


def discover_dns(section: DNSServer) -> DiscoveryResult:
if section is not None:
yield Service()


def check_dns(section: DNSServer) -> CheckResult:
if len(section.dns_server) == 0:
yield Result(
state=State.CRIT,
summary=f"No DNS Server for Domain {section.domain} found!",
)
yield Metric("pure_storage_fa_dns", len(section.dns_server))
else:
yield from check_levels(
len(section.dns_server),
metric_name="pure_storage_fa_dns",
render_func=lambda v: (
f"{v} DNS Server in Domain {section.domain} found: {', '.join(section.dns_server)}"
),
)


check_plugin_pure_storage_fa_dns = CheckPlugin(
name="pure_storage_fa_dns",
service_name="DNS Server",
discovery_function=discover_dns,
check_function=check_dns,
)
Loading
Loading