|
1 | 1 | """Module for DNSPolicy related classes"""
|
2 | 2 |
|
3 | 3 | from dataclasses import dataclass
|
4 |
| -from typing import Optional |
| 4 | +from typing import Optional, Literal |
5 | 5 |
|
6 | 6 | from testsuite.gateway import Referencable
|
| 7 | +from testsuite.kubernetes import KubernetesObject |
7 | 8 | from testsuite.kubernetes.client import KubernetesClient
|
8 | 9 | from testsuite.kuadrant.policy import Policy
|
9 | 10 | from testsuite.utils import asdict, check_condition
|
@@ -31,6 +32,33 @@ class LoadBalancing:
|
31 | 32 | weight: Optional[int] = None
|
32 | 33 |
|
33 | 34 |
|
| 35 | +@dataclass |
| 36 | +class AdditionalHeadersRef: |
| 37 | + """Object representing DNSPolicy additionalHeadersRef field""" |
| 38 | + |
| 39 | + name: str |
| 40 | + |
| 41 | + |
| 42 | +@dataclass |
| 43 | +class HealthCheck: # pylint: disable=invalid-name |
| 44 | + """Object representing DNSPolicy health check specification""" |
| 45 | + |
| 46 | + additionalHeadersRef: Optional[AdditionalHeadersRef] = None |
| 47 | + path: Optional[str] = None |
| 48 | + failureThreshold: Optional[int] = None |
| 49 | + interval: Optional[str] = None |
| 50 | + port: Optional[int] = None |
| 51 | + protocol: Literal["HTTP", "HTTPS"] = "HTTP" |
| 52 | + |
| 53 | + |
| 54 | +class DNSHealthCheckProbe(KubernetesObject): |
| 55 | + """DNSHealthCheckProbe object""" |
| 56 | + |
| 57 | + def is_healthy(self) -> bool: |
| 58 | + """Returns True if DNSHealthCheckProbe endpoint is healthy""" |
| 59 | + return self.refresh().model.status.healthy |
| 60 | + |
| 61 | + |
34 | 62 | class DNSPolicy(Policy):
|
35 | 63 | """DNSPolicy object"""
|
36 | 64 |
|
@@ -61,6 +89,16 @@ def create_instance(
|
61 | 89 |
|
62 | 90 | return cls(model, context=cluster.context)
|
63 | 91 |
|
| 92 | + def set_health_check(self, health_check: HealthCheck): |
| 93 | + """Sets health check for DNSPolicy""" |
| 94 | + self.model["spec"]["healthCheck"] = asdict(health_check) |
| 95 | + |
| 96 | + def get_dns_health_probe(self) -> DNSHealthCheckProbe: |
| 97 | + """Returns DNSHealthCheckProbe object for the created DNSPolicy""" |
| 98 | + with self.context: |
| 99 | + dns_probe = self.get_owned("dnsrecords.kuadrant.io")[0].get_owned("DNSHealthCheckProbe")[0] |
| 100 | + return DNSHealthCheckProbe(dns_probe.model, context=self.context) |
| 101 | + |
64 | 102 | def wait_for_full_enforced(self, timelimit=300):
|
65 | 103 | """Wait for a Policy to be fully Enforced with increased timelimit for DNSPolicy"""
|
66 | 104 | super().wait_for_full_enforced(timelimit=timelimit)
|
0 commit comments