-
Notifications
You must be signed in to change notification settings - Fork 0
/
manage
executable file
·684 lines (589 loc) · 22.5 KB
/
manage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
#!/usr/bin/env python3
import asyncio
import base64
import contextlib
import copy
import dataclasses
import functools
import logging
import os
import pathlib
import re
import shlex
import subprocess
import tempfile
import typing
import click
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat
)
import yaml
import easykube
import pyhelm3
logging.basicConfig(format = "[%(levelname)-7s] %(message)s", level = logging.INFO)
REPO_ROOT = pathlib.Path(__file__).parent.parent
CAPI_API_GROUPS = [
"cluster.x-k8s.io",
"bootstrap.cluster.x-k8s.io",
"controlplane.cluster.x-k8s.io",
"infrastructure.cluster.x-k8s.io",
"addons.stackhpc.com",
]
logger = logging.getLogger()
def log_format_arg(argument):
argument = str(argument)
if argument == "-":
return "<stdin>"
elif "\n" in argument:
return "<multi-line string>"
else:
return argument
def exec(*cmd, **kwargs):
log_formatted_command = shlex.join(log_format_arg(part) for part in cmd)
logger.info("running command: %s", log_formatted_command)
try:
proc = subprocess.run(cmd, check = True, capture_output = True)
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.stderr)
else:
return proc.stdout.decode().strip()
async def ekwait(ekresource, name, namespace, predicate):
"""
Wait for a Kubernetes resource to fulfil a predicate.
"""
while True:
obj = await ekresource.fetch(name, namespace = namespace)
if predicate(obj):
break
await asyncio.sleep(5)
async def ekwait_all(ekresource, predicate, **kwargs):
"""
Wait for a Kubernetes resource to fulfil a predicate.
"""
async for obj in ekresource.list(**kwargs):
if predicate(obj):
continue
await ekwait(ekresource, obj.metadata.name, obj.metadata.namespace, predicate)
def has_condition(type, status):
"""
Returns a predicate function that tests if the object has a condition with the
given type and status.
"""
def predicate(obj):
conditions = obj.get("status", {}).get("conditions", [])
return any(c["type"] == type and c["status"] == status for c in conditions)
return predicate
def _resolve(obj, path):
prop, *rest = path
if rest:
return _resolve(obj.get(prop, {}), rest)
else:
return obj.get(prop)
def has_prop(path, value):
"""
Returns a predicate function that tests if the object has a property at the given
path with the given value.
"""
def predicate(obj):
return _resolve(obj, path) == value
return predicate
class ObjectGraph:
"""
Graph of Kubernetes objects.
Each node is a Kubernetes object and the edges are derived from the owner references.
"""
def __init__(self):
# The objects in the tree indexed by their UID
self.nodes = {}
# A map of UIDs to the UIDs of the owners of the object
self.owners = {}
def add_object(self, api_version, kind, obj):
"""
Add an object to the graph.
"""
obj.setdefault("apiVersion", api_version)
obj.setdefault("kind", kind)
self.nodes[obj.metadata.uid] = obj
self.owners[obj.metadata.uid] = set(
ref["uid"]
for ref in obj.metadata.get("ownerReferences", [])
)
def __iter__(self):
"""
Iterate over the graph.
Objects are only yielded once all their owners have been yielded.
"""
seen, pending = set(), set(self.nodes.keys())
while pending:
group_uids = set(
uid
for uid in pending
if all(ouid in seen for ouid in self.owners[uid])
)
seen, pending = seen | group_uids, pending - group_uids
yield from (self.nodes[uid] for uid in group_uids)
async def get_cluster_graph(ek_client, namespace):
"""
Get the graph for the cluster.
"""
graph = ObjectGraph()
# Start by adding secrets and configmaps to the tree
# We respect secrets that are owner by the CAPI cluster or the Helm release
eksecrets = await ek_client.api("v1").resource("secrets")
async for secret in eksecrets.list(namespace = namespace):
# If the secret is owned by a sealed secret, drop the owner reference
if "ownerReferences" in secret.metadata:
secret.metadata["ownerReferences"] = [
ref
for ref in secret.metadata["ownerReferences"]
if ref["kind"] != "SealedSecret"
]
graph.add_object(eksecrets.api_version, eksecrets.kind, secret)
# Then add all of the Cluster API resources from the namespace
for api_group in CAPI_API_GROUPS:
ekapi = await ek_client.api_preferred_version(api_group)
for resource in (await ekapi.resources()):
# Ignore subresources
if "/" in resource["name"]:
continue
ekresource = await ekapi.resource(resource["name"])
async for obj in ekresource.list(namespace = namespace):
graph.add_object(ekresource.api_version, ekresource.kind, obj)
return graph
def prepare_object(obj, new_uids):
"""
Prepare an object for insertion into the destination cluster.
"""
new_obj = copy.deepcopy(obj)
# Remove generated fields from the metadata
for key in ["creationTimestamp", "generation", "managedFields", "resourceVersion", "uid"]:
new_obj["metadata"].pop(key, None)
# Replace the UIDs in the owner references with the new ones
if "ownerReferences" in new_obj["metadata"]:
for ref in new_obj["metadata"]["ownerReferences"]:
ref["uid"] = new_uids[ref["uid"]]
# Remove kopf annotations from the addon objects
if "annotations" in new_obj["metadata"]:
new_obj["metadata"]["annotations"].pop("addons.stackhpc.com/kopf-managed", None)
new_obj["metadata"]["annotations"].pop("addons.stackhpc.com/last-handled-configuration", None)
# Discard the status, if one exists
new_obj.pop("status", None)
return new_obj
async def move_capi_resources(ek_from, ek_to, namespace):
"""
Move Cluster API resources in the specified namespace from one cluster to another.
"""
# Pause the clusters on the from side
ekcapi = await ek_from.api_preferred_version("cluster.x-k8s.io")
ekclusters = await ekcapi.resource("clusters")
async for cluster in ekclusters.list(namespace = namespace):
await ekclusters.patch(
cluster["metadata"]["name"],
{
"spec": {
"paused": True,
},
},
namespace = namespace
)
# Get a graph of all the Kubernetes objects relating to the cluster
graph = await get_cluster_graph(ek_from, namespace)
# Create the target namespace on the destination cluster
eknamespaces = await ek_to.api("v1").resource("namespaces")
try:
await eknamespaces.create({"metadata": {"name": namespace}})
except easykube.ApiError as exc:
if exc.status_code != 409:
raise
# Copy the objects to the new cluster
# When we iterate the graph, we get the objects in the order that they need to be copied
# Maintain an index of the UIDs in the source cluster to the UIDs in the dest cluster
new_uids = {}
for obj in graph:
new_obj = await ek_to.client_side_apply_object(prepare_object(obj, new_uids))
new_uids[obj.metadata.uid] = new_obj.metadata.uid
# Resume the clusters on the destination side
ekcapi = await ek_to.api_preferred_version("cluster.x-k8s.io")
ekclusters = await ekcapi.resource("clusters")
async for cluster in ekclusters.list(namespace = namespace):
await ekclusters.json_patch(
cluster["metadata"]["name"],
[
{
"op": "remove",
"path": "/spec/paused",
},
],
namespace = namespace
)
@contextlib.contextmanager
def kind_cluster():
"""
Context manager that manages a kind cluster and yields configured Helm and easykube clients.
"""
kind_cluster_name = os.environ.get("KIND_CLUSTER_NAME", "kind")
kind_cluster_version = os.environ.get("KIND_CLUSTER_VERSION", "v1.30.0")
with tempfile.NamedTemporaryFile() as kubeconfig:
kubeconfig.close()
stdout = exec("kind", "get", "clusters")
if any(line.strip() == kind_cluster_name for line in stdout.splitlines()):
exec("kind", "export", "kubeconfig", "--kubeconfig", kubeconfig.name)
else:
exec(
"kind",
"create",
"cluster",
"--kubeconfig",
kubeconfig.name,
"--image",
f"kindest/node:{kind_cluster_version}"
)
helm_client = pyhelm3.Client(kubeconfig = kubeconfig.name)
ek_client = easykube.Configuration.from_kubeconfig(kubeconfig.name).async_client()
yield helm_client, ek_client, kubeconfig.name
# If the task executes without error, tear down the kind cluster
# If not, we leave it behind for debugging
exec("kind", "delete", "cluster")
@dataclasses.dataclass
class HelmChartInfo:
"""
Information about a Helm chart.
"""
name: str
repo: str
version: str
@dataclasses.dataclass
class HelmReleaseInfo:
"""
Information about a Helm release.
"""
name: str
namespace: str
chart: HelmChartInfo
values: typing.List[typing.Dict[str, typing.Any]]
def find_resource(resources, kind, name, namespace):
"""
Finds the specified resource in the given list of resources.
"""
return next(
resource
for resource in resources
if (
resource["kind"] == kind and
resource["metadata"]["name"] == name and
resource["metadata"].get("namespace") == namespace
)
)
def extract_values(resource, key):
"""
Extracts YAML-formatted values from the specified resource and key.
"""
data = resource["data"][key]
if resource["kind"] == "Secret":
data = base64.b64decode(data)
return yaml.safe_load(data)
def fetch_helm_release_info(path, name, namespace):
"""
Renders the given path using kustomize, finds the specified HelmRelease and extracts
the information required to run the Helm command ourselves.
"""
resources = list(yaml.safe_load_all(exec("kustomize", "build", path)))
release = find_resource(resources, "HelmRelease", name, namespace)
chart_ref = release["spec"]["chartRef"]
chart = find_resource(resources, chart_ref["kind"], chart_ref["name"], namespace)
source_ref = chart["spec"]["sourceRef"]
repo = find_resource(resources, source_ref["kind"], source_ref["name"], namespace)
return HelmReleaseInfo(
name = release["spec"]["releaseName"],
namespace = release["spec"].get("targetNamespace", namespace),
chart = HelmChartInfo(
name = chart["spec"]["chart"],
repo = repo["spec"]["url"],
version = chart["spec"]["version"]
),
values = [
extract_values(
find_resource(
resources,
ref["kind"],
ref["name"],
namespace
),
ref["valuesKey"]
)
for ref in release["spec"].get("valuesFrom", [])
]
)
async def bootstrap_flux(helm_client):
"""
Bootstraps Flux on the cluster targetted by the given clients.
In order to ensure that the bootstrapped Flux can be adopted for self-management later,
we extract the chart and values for the release from the Flux resources that will be
used to set up the self-management later.
"""
flux_config = REPO_ROOT / "components" / "flux"
release_info = fetch_helm_release_info(flux_config, "flux", "flux-system")
await helm_client.ensure_release(
release_info.name,
await helm_client.get_chart(
release_info.chart.name,
repo = release_info.chart.repo,
version = release_info.chart.version
),
*release_info.values,
namespace = release_info.namespace,
wait = True
)
async def bootstrap_sealed_secrets(ek_client, path, kubeconfig_path):
"""
Boostraps the sealed secrets controller onto the target cluster
and adds the cloud credentials as a sealed secret.
"""
await kustomize_install(ek_client, REPO_ROOT / "components" / "sealed-secrets")
await wait_for_helm_releases(ek_client)
# Seal the cluster credentials using kubeseal
credentials_path = path / "credentials.yaml"
sealed_credentials_path = path / "credentials-sealed.yaml"
exec(
"kubeseal",
"--kubeconfig", kubeconfig_path,
"--format", "yaml",
"--controller-name", "sealed-secrets",
"--controller-namespace", "sealed-secrets-system",
"--secret-file", credentials_path,
"--sealed-secret-file", sealed_credentials_path,
)
async def kustomize_install(ek_client, path):
"""
Builds the given path using kustomize and installs the resulting objects.
"""
objs = yaml.safe_load_all(exec("kustomize", "build", str(path)))
for obj in objs:
await ek_client.client_side_apply_object(obj)
async def wait_for_helm_releases(ek_client):
ekapi = await ek_client.api_preferred_version("helm.toolkit.fluxcd.io")
ekresource = await ekapi.resource("helmreleases")
await ekwait_all(
ekresource,
has_condition("Ready", "True"),
all_namespaces = True
)
async def wait_for_capi_providers(ek_client):
"""
Waits for the Cluster API providers to become ready.
"""
ekapi = await ek_client.api_preferred_version("operator.cluster.x-k8s.io")
for resource in [
"coreproviders",
"bootstrapproviders",
"controlplaneproviders",
"infrastructureproviders",
]:
ekresource = await ekapi.resource(resource)
await ekwait_all(ekresource, has_condition("Ready", "True"), all_namespaces = True)
async def wait_for_cluster(ek_client, namespace):
"""
Waits for all the Cluster API resources in the given namespace to become ready.
The return value is the kubeconfig for the first cluster in the namespace.
"""
# Wait for the cluster to become ready
ekcapi = await ek_client.api_preferred_version("cluster.x-k8s.io")
ekclusters = await ekcapi.resource("clusters")
await ekwait_all(
ekclusters,
has_condition("Ready", "True"),
namespace = namespace
)
# Wait for all the machine deployments to become ready
ekmds = await ekcapi.resource("machinedeployments")
await ekwait_all(
ekmds,
has_condition("MachineSetReady", "True"),
namespace = namespace
)
# Return the kubeconfig for the first CAPI cluster in the namespace
cluster = await ekclusters.first(namespace = namespace)
name = cluster["metadata"]["name"]
eksecrets = await ek_client.api("v1").resource("secrets")
capi_kubeconfig = await eksecrets.fetch(f"{name}-kubeconfig", namespace = namespace)
return base64.b64decode(capi_kubeconfig.data["value"])
# Wait for all the addons to become ready
ekaddons = await ek_client.api_preferred_version("addons.stackhpc.com")
ekmanifests = await ekaddons.resource("manifests")
await ekwait_all(
ekmanifests,
has_prop(["status", "phase"], "Deployed"),
namespace = namespace
)
ekhelmreleases = await ekaddons.resource("helmreleases")
await ekwait_all(
ekhelmreleases,
has_prop(["status", "phase"], "Deployed"),
namespace = namespace
)
def fetch_git_info():
"""
Returns the URL of the remote origin and current branch for the current repo.
"""
origin = exec("git", "remote", "get-url", "origin")
# Check if the origin is an SSH URL
# If it is, we need to correct the format for Flux
if re.search("^https?://", origin) is None:
origin = "ssh://{}".format("/".join(origin.rsplit(":", maxsplit = 1)))
branch = exec("git", "branch", "--show-current")
return origin, branch
@contextlib.contextmanager
def generate_ssh_keypair():
"""
Generates an SSH keypair and yields the public key and the path to the private key.
"""
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
public_bytes = public_key.public_bytes(Encoding.OpenSSH, PublicFormat.OpenSSH)
private_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.OpenSSH, NoEncryption())
with tempfile.NamedTemporaryFile() as private_key_file:
private_key_file.write(private_bytes)
private_key_file.flush()
yield public_bytes.decode(), private_key_file.name
async def delete_cluster(ek_client, namespace):
"""
Deletes all the Cluster API clusters in the given namespace and waits for them to be gone.
"""
ekcapi = await ek_client.api_preferred_version("cluster.x-k8s.io")
ekclusters = await ekcapi.resource("clusters")
async for cluster in ekclusters.list(namespace = namespace):
await ekclusters.delete(cluster.metadata.name, namespace = namespace)
while True:
try:
await ekclusters.fetch(cluster.metadata.name, namespace = namespace)
except easykube.ApiError as exc:
if exc.status_code == 404:
break
else:
raise
await asyncio.sleep(5)
def as_sync(func):
"""
Decorator that converts an async function into a sync one.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapper
def resolve_cluster(cluster):
"""
Resolves the specified cluster and returns the path.
"""
path = REPO_ROOT / "clusters" / cluster
if path.exists():
return path
else:
raise RuntimeError("specified cluster does not exist")
@click.group()
def manage():
"""
Manage bootstrap and teardown for an cluster.
"""
@manage.command()
@click.argument("cluster")
@as_sync
async def bootstrap(cluster):
"""
Bootstrap the specified cluster.
"""
path = resolve_cluster(cluster)
with kind_cluster() as (helm_kind, ek_kind, kubeconfig_kind):
# Bootstrap Flux on the kind cluster
await bootstrap_flux(helm_kind)
# Bootstrap the sealed secrets on the kind cluster
await bootstrap_sealed_secrets(ek_kind, path, kubeconfig_kind)
# Install the Flux resources for the CAPI cluster on the kind cluster
await kustomize_install(ek_kind, path)
await wait_for_helm_releases(ek_kind)
# Wait for the CAPI cluster to become ready and extract the kubeconfig
kubeconfig_content = await wait_for_cluster(ek_kind, "capi-self")
# Initialise Helm and easykube clients for the CAPI cluster
kubeconfig_capi = path / "kubeconfig"
with open(kubeconfig_capi, "wb") as fh:
fh.write(kubeconfig_content)
ek_capi = easykube.Configuration.from_kubeconfig(kubeconfig_capi).async_client()
helm_capi = pyhelm3.Client(kubeconfig = kubeconfig_capi)
# Bootstrap Flux on the CAPI cluster
await bootstrap_flux(helm_capi)
# Bootstrap Cluster API on the CAPI cluster
await kustomize_install(ek_capi, REPO_ROOT / "components" / "cluster-api")
await wait_for_helm_releases(ek_capi)
await wait_for_capi_providers(ek_capi)
# Move the cluster resources from the kind cluster to the CAPI cluster
await move_capi_resources(ek_kind, ek_capi, "capi-self")
await wait_for_cluster(ek_capi, "capi-self")
# At this point, the cluster is self-managed
# Bootstrap the sealed secrets on the CAPI cluster
await bootstrap_sealed_secrets(ek_capi, path, kubeconfig_capi)
# Commit and push the changes to the remote
exec("git", "add", path)
exec("git", "commit", "-m", f"Bootstrap cluster - {cluster}")
exec("git", "push")
# Get the git origin and branch for the source
origin, branch = fetch_git_info()
# Configure the git source
flux_command = [
"flux",
"create",
"source",
"git",
cluster,
"--kubeconfig", kubeconfig_capi,
"--url", origin,
"--branch", branch,
"--silent",
]
if origin.startswith("ssh://"):
with generate_ssh_keypair() as (public_key, private_key_file):
click.echo(click.style("Add the following SSH public key as a deploy key for the repository:", bold = True, fg = "green"))
click.echo(click.style(public_key, bold = True, fg = "green"))
click.pause("Press any key to continue")
exec(*flux_command, "--private-key-file", private_key_file)
else:
exec(*flux_command)
# Configure the kustomization to manage the cluster resources
exec(
"flux",
"create",
"kustomization",
cluster,
"--kubeconfig", kubeconfig_capi,
"--source", f"GitRepository/{cluster}",
"--path", f"./clusters/{cluster}",
"--prune",
"--interval", "5m",
"--timeout", "5m"
)
@manage.command()
@click.argument("cluster")
@as_sync
async def teardown(cluster):
"""
Tear down the specified cluster.
"""
path = resolve_cluster(cluster)
# Initialise an easykube client using the kubeconfig
kubeconfig_capi = path / "kubeconfig"
ek_capi = easykube.Configuration.from_kubeconfig(kubeconfig_capi).async_client()
with kind_cluster() as (helm_kind, ek_kind, _):
# Bootstrap Flux on the kind cluster
await bootstrap_flux(helm_kind)
# Bootstrap Cluster API on the kind cluster
await kustomize_install(ek_kind, REPO_ROOT / "components" / "cluster-api")
await wait_for_helm_releases(ek_kind)
await wait_for_capi_providers(ek_kind)
# Move the cluster resources from the CAPI cluster to the kind cluster
await move_capi_resources(ek_capi, ek_kind, "capi-self")
# Delete the cluster
await delete_cluster(ek_kind, "capi-self")
if __name__ == "__main__":
manage()