-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcertbot_pkcs12.py
85 lines (60 loc) · 2.24 KB
/
certbot_pkcs12.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Certbot PKCS#12 installer plugin."""
from certbot import interfaces
from certbot.display import util as display_util
from certbot.plugins import common
from OpenSSL import crypto
def _load_bytes(path):
with open(path, 'rb') as f:
return f.read()
def _load_key(path):
return crypto.load_privatekey(crypto.FILETYPE_PEM, _load_bytes(path))
def _load_cert(path):
return crypto.load_certificate(crypto.FILETYPE_PEM, _load_bytes(path))
def _load_certs(path):
delimiter = b'-----BEGIN CERTIFICATE-----\n'
for section in _load_bytes(path).split(delimiter):
section = section.strip()
if section:
yield crypto.load_certificate(
crypto.FILETYPE_PEM, delimiter + section)
class Installer(common.Plugin, interfaces.Installer):
"""PKCS#12 installer."""
description = "PKCS#12 installer plugin."
@classmethod
def add_parser_arguments(cls, add):
add("location", help="Location of PKCS#12 archive.")
add("passphrase", help="PKCS#12 archive passphrase.")
def prepare(self):
pass
def more_info(self):
return 'Install the key and certificate in a PKCS#12 archive.'
def get_all_names(self):
return []
def deploy_cert(self, domain, cert_path, key_path,
chain_path, fullchain_path):
passphrase = self.conf('passphrase')
if passphrase is not None:
passphrase = passphrase.encode()
pkcs12 = crypto.PKCS12()
pkcs12.set_privatekey(_load_key(key_path))
pkcs12.set_certificate(_load_cert(cert_path))
pkcs12.set_ca_certificates(_load_certs(chain_path))
out_bytes = pkcs12.export(passphrase=passphrase)
location = self.conf('location')
with open(location, 'wb') as f:
f.write(out_bytes)
display_util.notify(f'The PKCS#12 archive is stored at {location}.')
def enhance(self, domain, enhancement, options=None):
pass
def supported_enhancements(self):
return []
def save(self, title=None, temporary=False):
pass
def rollback_checkpoints(self, rollback=1):
pass
def recovery_routine(self):
pass
def config_test(self):
pass
def restart(self):
pass