Package simple_acme_dns

simple_acme_dns is a Python ACME client specifically tailored to the DNS-01 challenge. This makes it easy to manage ACME certificates and accounts all within Python without the need for an external tool like certbot. Although this module is intended for use with Let's Encrypt, it will support any CA utilizing the ACME v2 protocol.

Expand source code
# Copyright 2023 Jared Hendrickson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
simple_acme_dns is a Python ACME client specifically tailored to the DNS-01 challenge. This makes it easy to manage ACME
certificates and accounts all within Python without the need for an external tool like `certbot`. Although this module
is intended for use with Let's Encrypt, it will support any CA utilizing the ACME v2 protocol.
"""
import datetime
import json
import pathlib
import time

import OpenSSL
import josepy as jose
import validators
from acme import challenges
from acme import client
from acme import crypto_util
from acme import messages
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption

from . import errors
from . import tools

# Constants and Variables
DNS_LABEL = '_acme-challenge'
__pdoc__ = {"tests": False}    # Excludes 'tests' submodule from documentation


class ACMEClient:
    """
    A basic ACME client object to interface with a CA using the ACME DNS-01 challenge.
    """
    # This package is all about user simplicity, keeping all items contained to one class supports this.
    # pylint: disable=too-many-instance-attributes,too-many-public-methods

    def __init__(
            self,
            domains: list = None,
            email: str = None,
            directory: str = "https://acme-staging-v02.api.letsencrypt.org/directory",
            nameservers: list = None,
            new_account: bool = False,
            generate_csr: bool = False
    ):
        """
        Args:
            domains (list): A list of domains to request a certificate for.
            email (str): An email address to use when registering new ACME accounts.
            directory (str): The ACME directory URL to interact with.
            nameservers (list): A list of DNS server hosts to query when checking DNS propagation.
            new_account (bool): Automatically create a new ACME account upon creation.
            generate_csr (bool): Automatically generate a new private key and CSR upon creation.

        Examples:
            >>> import simple_acme_dns
            >>> client = simple_acme_dns.ACMEClient(
            ...     domains=["test1.example.com", "test2.example.com"],
            ...     email="example@example.com",
            ...     directory="https://acme-staging-v02.api.letsencrypt.org/directory",
            ...     nameservers=["8.8.8.8", "1.1.1.1"],
            ...     new_account=True,
            ...     generate_csr=True
            ... )
        """
        self.csr = ''.encode()
        self.directory = directory
        self.directory_obj = None
        self.account_key = None
        self.account = None
        self.account_path = None
        self.nameservers = nameservers
        self.net = None
        self.order = None
        self.final_order = None
        self.responses = []
        self.answers = []
        self._domains = domains if domains else []
        self._email = email
        self._certificate = ''.encode()
        self._private_key = ''.encode()
        self._acme_client = None
        self._verification_tokens = {}

        # Automatically create a new account if requested
        if new_account:
            self.new_account()
        # Automatically create a new private key and CSR
        if generate_csr:
            self.generate_private_key_and_csr()

    def generate_csr(self) -> bytes:
        """
        Generates a new CSR using the object's `domains` and `private_key` attributes.

        Returns:
            bytes: The X509 CSR data bytes-string. This method will update the `csr` attribute of the object
                with the same value.

        Examples:
            >>> client.generate_csr()
            b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxMIGZAgECMAAwWTATBgckjkn...'
        """
        self.csr = crypto_util.make_csr(self.private_key, self.domains)
        return self.csr

    def generate_private_key(self, key_type: str = 'ec256') -> bytes:
        """
        Generates a new RSA or EC private key.

        Args:
            key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

        Returns:
            bytes: The PEM encoded private key data bytes-string. This method will update the `private_key` property
                of the object with the same value.

        Raises:
            simple_acme_dns.errors.InvalidKeyType: When an unknown/unsupported `key_type` is requested.

        Examples:
            >>> client.generate_private_key(key_type="ec384")
            b'-----BEGIN EC PRIVATE KEY-----\\nMIGkAgEBBDAZRFNLcQdVJmLh42p8F4D92...'
        """
        # Generate a EC256 private key
        if key_type == 'ec256':
            key = ec.generate_private_key(ec.SECP256R1(), default_backend())
            self.private_key = key.private_bytes(
                encoding=Encoding.PEM,
                format=PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=NoEncryption())
        # Generate a EC384 private key
        elif key_type == 'ec384':
            key = ec.generate_private_key(ec.SECP384R1(), default_backend())
            self.private_key = key.private_bytes(
                encoding=Encoding.PEM,
                format=PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=NoEncryption()
            )
        # Generate a RSA2048 private key
        elif key_type == 'rsa2048':
            key = OpenSSL.crypto.PKey()
            key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
            self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
        # Generate a RSA4096 private key
        elif key_type == 'rsa4096':
            key = OpenSSL.crypto.PKey()
            key.generate_key(OpenSSL.crypto.TYPE_RSA, 4096)
            self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
        # Otherwise, the requested key type is not supported. Throw an error
        else:
            options = ['ec256', 'ec384', 'rsa2048', 'rsa4096']
            msg = f"Invalid private key rtype '{key_type}'. Options {options}"
            raise errors.InvalidKeyType(msg)
        return self.private_key

    def generate_private_key_and_csr(self, key_type: str = 'ec256') -> tuple:
        """
        Generates a new private key and CSR.

        Args:
            key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

        Returns:
            tuple: A tuple with the first value containing the private key, the second value contains the CSR. This
                method will update the `private_key` and `csr` properties of this object with the same values.

        Examples:
            >>> client.generate_private_key_and_csr(key_type="rsa2048")
            (b'-----BEGIN PRIVATE KEY-----\\nMIIEvAIBA...', b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxM...')
        """
        self.generate_private_key(key_type=key_type)
        self.generate_csr()
        return self.private_key, self.csr

    def request_verification_tokens(self) -> dict:
        """
        Requests verification tokens from the ACME server for each `domains` value. These tokens must be uploaded as
        a DNS TXT record for each corresponding domain to complete verification.

        Returns:
            dict: A dictionary where the key is the domain name the DNS name, and the value is a list of verification
                token strings that must be uploaded as a TXT record for that DNS name.

        Examples:
            >>> client.request_verification_tokens()
            {
                "_acme-challenge.test1.example.com": ["moY32lkdsZ3VWHM1mdM..."],
                "_acme-challenge.test2.example.com": ["asldfkjslweietj23_b...", "nMIIEvAIBA2-212_w..."]
            }
        """
        # Variables
        verification_tokens = {}
        self.responses = {}
        self.order = self.acme_client.new_order(self.csr)

        # Loop through each domain being challenged
        for domain, challenge_items in self.challenges.items():
            # Ensure the ACME label is prefixed to this domain and the wildcard is removed
            domain = f"{DNS_LABEL}.{self.strip_wildcard(domain)}"

            # Loop through each challenge for this domain and extract the response and verification token from each
            for challenge in challenge_items:
                # Create a dict list item for this domain to store it's associated verification tokens in
                verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []

                # Obtain the response and validation items from this challenge
                response, validation = challenge.response_and_validation(self.acme_client.net.key)
                verification_tokens[domain].append(validation)

                # Save the response, so it can be looked up later using the challenge token
                self.responses[challenge.chall.token] = response

        # Set our new verification tokens and return the value
        self._verification_tokens = verification_tokens
        return self.verification_tokens

    def request_certificate(self, wait: int = 0, timeout: int = 90) -> bytes:
        """
        Requests a final verification answer from the ACME server and requests the certificate if verification was
        successful. If you request the certificate before DNS has propagated and verification fails, you must start
        the verification process over entirely by requesting new verification tokens.

        Args:
            wait (int): The amount of time (in seconds) to wait before requesting a challenge answer from the
                server. This is only necessary if you are not using the `check_dns_propagation()` method to verify the
                DNS records exist and would rather wait a specific amount of time instead.
            timeout (int): The amount of time (in seconds) to wait for the ACME server to respond with a certificate.
                If the ACME server does not respond within this time-frame, the request will be considered a failure.

        Returns:
            bytes: The PEM encoded certificate data bytes-string. This method will update the `certificate` attribute
                with this value.

        Examples:
            >>> client.request_certificate()
            b'-----BEGIN CERTIFICATE-----\\nMIIEfzCCA2egAwI...
        """
        # Allow the user to specify an amount of time to wait before requesting the certificate
        time.sleep(wait)
        deadline = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

        # For each domain being challenged, request answers for their challenges
        for _, challenge_list in self.challenges.items():
            # Request an answer for each of this domain's challenges
            for challenge in challenge_list:
                self.answers.append(
                    self.acme_client.answer_challenge(challenge, self.responses[challenge.chall.token])
                )

        # Request our final order and save the certificate if successful
        self.final_order = self.acme_client.poll_and_finalize(self.order, deadline=deadline)
        self.certificate = self.final_order.fullchain_pem.encode()
        return self.certificate

    def revoke_certificate(self, reason: int = 0) -> None:
        """
        Attempts to revoke the existing certificate from the issuing ACME server.

        Args:
            reason (int): The numeric reason for revocation identifier. In most cases, this can be left as `0`.
                For more information, refer to: https://letsencrypt.org/docs/revoking/#specifying-a-reason-code

        Examples:
            >>> client.revoke_certificate()
        """
        # Load the certificate crypto object and request revocation from the ACME server
        cert_obj = jose.ComparableX509(OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, self.certificate))
        self.acme_client.revoke(cert_obj, reason)

    def new_account(self) -> None:
        """
        Registers a new ACME account at the set ACME `directory` URL. By running this method, you are agreeing to the
        ACME servers terms of use.

        Examples:
            >>> client.new_account()
        """
        # Generate a new RSA2048 account key
        rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
        self.account_key = jose.JWKRSA(key=rsa_key)

        # Initialize our ACME client object
        self.net = client.ClientNetwork(self.account_key, user_agent='simple_acme_dns/v2')
        self.directory_obj = messages.Directory.from_json(self.net.get(self.directory).json())
        self.acme_client = client.ClientV2(self.directory_obj, net=self.net)

        # Complete registration
        registration = messages.NewRegistration.from_data(email=self.email, terms_of_service_agreed=True)
        self.account = self.acme_client.new_account(registration)

    def deactivate_account(self, delete: bool = True) -> None:
        """
        Deactivates the current account registration. This action is irreversible.

        Args:
            delete (bool): Indicate whether the associated account file on the local system should also be
                deleted after deactivation.

        Examples:
            >>> client.deactivate_account()
        """
        # Tell the ACME server to deactivate this account
        self.acme_client.deactivate_registration(self.account)

        # If this object contains a linked file path, and deletion is requested, delete the linked file
        if self.account_path and delete:
            # Delete the file if it's present
            try:
                pathlib.Path(self.account_path).unlink()
            except FileNotFoundError:
                pass

    def export_account(self, save_certificate: bool = True, save_private_key: bool = False) -> str:
        """
        Exports the object as a JSON string. This allows the ACME account data to be exported to a string that can
        be re-imported for use later.

        Args:
            save_certificate (bool): Indicate whether the existing certificate should also be stored in the
                JSON string.
            save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

        Returns:
            str: The current object encoded as a JSON string.

        Examples:
            >>> client.export_account(save_certificate=True, save_private_key=True)
            '{"account": {"body": {"key": {"n": "vtByzpW..."}}}}'
        """
        # Format our object into a serializable format
        acct_data = {
            'account': self.account.to_json(),
            'account_key': self.account_key.json_dumps(),
            'directory': self.directory,
            'domains': self.domains,
            'certificate': self.certificate.decode() if save_certificate else '',
            'private_key': self.private_key.decode() if save_private_key else ''
        }

        return json.dumps(acct_data)

    def export_account_to_file(
            self,
            path: str = '.',
            name: str = 'account.json',
            save_certificate: bool = True,
            save_private_key: bool = False
    ) -> None:
        """
        Exports the object as a JSON string. This allows the ACME account data to be exported to a file that can
        be re-imported for use later.

        Args:
            path (str): The directory path to save the account file to. Defaults to the current working directory.
            name (str): The file name to save. Defaults to `account.json`.
            save_certificate (bool): Indicate whether the existing certificate should also be stored in the
                JSON string.
            save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

        Raises:
            simple_acme_dns.errors.InvalidPath: when the requested directory path does not exist.

        Examples:
            >>> client.export_account_to_file(
            ...     path="/tmp/",
            ...     name="my_acme_account.json",
            ...     save_certificate=True,
            ...     save_private_key=True
            ... )
        """
        dir_path = pathlib.Path(path).absolute()

        # Ensure our path is an existing directory, throw an error otherwise
        if dir_path.is_dir():
            # Open the file and write our JSON content
            with open(str(dir_path.joinpath(name)), 'w', encoding="utf-8") as account_file:
                account_file.write(self.export_account(save_certificate, save_private_key))
                self.account_path = str(dir_path.joinpath(name))
        else:
            msg = f"Directory at '{path}' does not exist."
            raise errors.InvalidPath(msg)

    @staticmethod
    def load_account(json_data: str) -> 'ACMEClient':
        """
        Loads an existing account from a JSON data string created by the `export_account()` method.

        Args:
            json_data (str): The JSON account data string to import.

        Returns:
            simple_acme_dns.ACMEClient: The imported ACMEClient object.

        Examples:
            >>> client = simple_acme_dns.ACMEClient.load_account('{"account": {"body": {"key": {"n": "vtByzpW..."}}}}')
        """
        acct_data = json.loads(json_data)
        obj = ACMEClient()

        # Format the serialized data back into the object
        obj.directory = acct_data.get('directory', None)
        obj.domains = acct_data.get('domains', [])
        obj.certificate = acct_data.get('certificate', '').encode()
        obj.private_key = acct_data.get('private_key', '').encode()
        obj.email = acct_data['account']['body']['contact'][0].replace('mailto:', '')
        obj.account = messages.RegistrationResource.json_loads(json.dumps(acct_data['account']))
        obj.account_key = jose.JWKRSA.json_loads(acct_data['account_key'])

        # Re-initialize the ACME client and registration
        obj.net = client.ClientNetwork(obj.account_key, user_agent='simple_acme_dns/1.0.0')
        obj.directory_obj = messages.Directory.from_json(obj.net.get(obj.directory).json())
        obj.acme_client = client.ClientV2(obj.directory_obj, net=obj.net)
        obj.account = obj.acme_client.query_registration(obj.account)

        return obj

    @staticmethod
    def load_account_from_file(filepath: str) -> 'ACMEClient':
        """
        Loads an existing account from a JSON file created by the `export_account_to_file()` method.

        Args:
            filepath (str): The JSON file path to import.

        Returns:
            simple_acme_dns.ACMEClient: The imported ACMEClient object.

        Raises:
            simple_acme_dns.errors.InvalidPath: When the JSON file path does not exist.

        Examples:
            >>> client = simple_acme_dns.ACMEClient.load_account('/tmp/my_acme_account.json')
        """
        filepath = pathlib.Path(filepath).absolute()

        # Ensure our file exists, throw an error otherwise
        if filepath.exists():
            # Open our file and read it's contents.
            with open(filepath, 'r', encoding="utf-8") as json_file:
                json_data = json_file.read()

            # Load contents into a new object.
            obj = ACMEClient.load_account(json_data)
            obj.account_path = filepath
        else:
            raise errors.InvalidPath(f"No JSON account file found at '{filepath}'")

        return obj

    def check_dns_propagation(
            self,
            timeout: int = 300,
            interval: int = 2,
            authoritative: bool = False,
            round_robin: bool = True,
            verbose: bool = False
    ) -> bool:
        """
        Check's each of our domain's TXT record until the value matches it's verification token or until the timeout is
        reached. This method should be executed before executing the `request_certificates()` method. This method can
        take several minutes to complete, ensure you adjust the timeout value accordingly.

        Args:
            timeout (int): The amount of time (in seconds) to continue trying to verify the TXT records.
            interval (int): The amount of time (in seconds) between DNS requests per domain.
            authoritative (bool): Identify and use the authoritative nameserver for each domain instead of the
                `nameservers` values.
            round_robin (bool): Rotate between each nameserver instead of the default failover behavior.
            verbose (bool): Print DNS answers to the console.

        Returns:
            bool: A boolean indicating whether all the `domains` correctly return their verification token in
                the corresponding TXT records.

        Examples:
            >>> client.nameservers = ["8.8.8.8", "1.1.1.1"]
            >>> client.check_dns_propagation(
            ...     timeout=180,
            ...     interval=5,
            ...     authoritative=False,
            ...     round_robin=True,
            ...     verbose=False
            ... )
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
            Token 'O32-fd_...' for '_acme-challenge.test2.example.com' not found in [] via 8.8.8.8
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
            Token 'O32-fd_...' for '_acme-challenge.test2.example.com' found in ['O32-fd_...'] via 1.1.1.1
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' found in ['moY3Cd0...'] via 8.8.8.8
            True
        """
        # Variables
        verified = []
        resolvers = []
        timeout = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

        # Create a DNS resolver objects for each domain being verified.
        for domain, tokens in self._verification_tokens.items():
            # Create a resolver for each token required for verification of this domain.
            for token in tokens:
                resolv = tools.DNSQuery(
                    domain,
                    rtype='TXT',
                    authoritative=authoritative,
                    nameservers=self.nameservers,
                    round_robin=round_robin
                )
                resolvers.append((domain, token, resolv))

        # Loop until we have exceeded our timeout value
        while datetime.datetime.now() < timeout:
            # Loop through each domain being verified
            for domain, token, resolver in resolvers:
                # Only try to verify the domain if it has not already been verified
                if token not in verified:
                    resolver.resolve()
                    # Save this domain as verified if our token was found in the TXT record values
                    if token in resolver.values:
                        verified.append(token)
                    # If verbose mode is enabled, print the results to the console
                    if verbose:
                        action = ('found' if token in verified else 'not found')
                        values = resolver.values
                        nameserver = resolver.last_nameserver
                        msg = f"Token '{token}' for '{domain}' {action} in {values} via {nameserver}"
                        print(msg)

            # Check that all resolvers completed verification
            if len(verified) == len(resolvers):
                return True

            # Avoid flooding the DNS server(s) by briefly pausing between DNS checks
            time.sleep(interval)

        return False

    @staticmethod
    def strip_wildcard(domain: str) -> str:
        """
        Strips the wildcard portion of a domain (*.) if present.

        Args:
            domain (str): The domain string to strip wildcards from.

        Returns:
            str: The domain string without the wildcard portion.
        """
        # If wildcard domain, strip of the wildcard to validate domain
        return domain[2:] if domain[:2].startswith("*.") else domain

    @property
    def challenges(self) -> dict:
        """
        Getter for the `challenges` property. Returns current DNS challenges found in our current ACME order.

        Returns:
            dict: A dictionary where the key is the domain name, and the value is a list of Challenge objects.

        Raises:
            simple_acme_dns.errors.OrderNotFound: When this property is called before the `order` object exists.
        """
        # Variables
        challs = {}

        # Do not allow this property to be called if an order has not been created beforehand.
        if not self.order:
            raise errors.OrderNotFound("Cannot get 'challenges' without an ACME order.")

        # Loop through each of our authorizations
        for auth in list(self.order.authorizations):
            # Loop through each authorization's available challenges
            for challenge in auth.body.challenges:
                # If this challenge is a DNS01 Challenege object, add it to our challenges.
                if isinstance(challenge.chall, challenges.DNS01):
                    # Capture the original domain requested
                    domain = auth.body.identifier.value
                    # Add this challenge to the dictionary item for this authorization's domain name
                    challs[domain] = challs[domain] if domain in challs else []
                    challs[domain].append(challenge)

        # If no challenges were found, throw an error
        if not challs:
            msg = f"ACME server at '{self.directory}' does not support DNS-01 challenge."
            raise errors.ChallengeUnavailable(msg.format(directory=str(self.directory)))

        return challs

    @property
    def acme_client(self) -> client.ClientV2:
        """
        Getter for the `client` property. This checks that the ACME client is set up whenever it's referenced.

        Returns:
            acme.client.ClientV2: The ClientV2 object needed to interact with the ACME server.

        Raises:
            simple_acme_dns.errors.InvalidAccount: When no account registration is configured for this object.
        """
        if not isinstance(self._acme_client, client.ClientV2):
            msg = 'No account registration found. You must register a new account or load an existing account first.'
            raise errors.InvalidAccount(msg)

        return self._acme_client

    @acme_client.setter
    def acme_client(self, value: client.ClientV2):
        """
        Setter for the `acme_client` property. This ensures the acme_client is an acme.client.ClientV2 object

        Args
         value (str): The `acme_client` value being set.

        Raises:
            simple_acme_dns.errors.InvalidAccount: When the `value` is not a valid email address
        """
        if not isinstance(value, client.ClientV2):
            msg = f"Value '{value}' is not an acme.client.ClientV2 object."
            raise errors.InvalidAccount(msg)

        self._acme_client = value

    @property
    def email(self) -> str:
        """
        Getter for the `email` property. This checks that an email exists when it's referenced.

        Returns:
            str:  A string representation of the email address

        Raises:
            simple_acme_dns.errors.InvalidEmail: When `email` is not set.

        """
        if not self._email:
            msg = 'No account email found. You must set the _email value first.'
            raise errors.InvalidEmail(msg)

        return self._email

    @email.setter
    def email(self, value: str):
        """
        Setter for the `email` property. This ensures an email address is valid before setting.

        Args
         value (str): The `email` value being set.

        Raises:
            simple_acme_dns.errors.InvalidEmail: When the `value` is not a valid email address
        """
        if not validators.email(value):
            msg = f"Value '{value}' is not a valid email address."
            raise errors.InvalidEmail(msg)

        self._email = value

    @property
    def verification_tokens(self) -> dict:
        """
        Getter for the `verification_tokens` property. This checks that verification tokens already
        exist whenever they are referenced.

        Returns:
            dict: A dictionary where the key is the verification domain name, and the value is a list of tokens for
                that domain. A DNS TXT entry must be created for each domain name with each of it's corresponding
                tokens as the TXT value.

        Raises:
            simple_acme_dns.errors.InvalidVerificationToken: When no verification tokens have been requested using
                the `request_verification_tokens()` method.
        """
        if not self._verification_tokens:
            msg = 'No verification tokens found. You must run request_verification_tokens() first.'
            raise errors.InvalidVerificationToken(msg)

        return self._verification_tokens

    @property
    def domains(self) -> list:
        """
        Getter for the `domains` property. This checks that domains are already set whenever it's referenced.

        Returns:
            list: A list of domain names currently set.

        Raises:
             simple_acme_dns.errors.InvalidDomain: When no `domains` have been set.
        """
        if not self._domains:
            msg = 'No domains found. You must set the domains value first.'
            raise errors.InvalidDomain(msg)

        return self._domains

    @domains.setter
    def domains(self, value) -> None:
        """
        Setter for the `domains` property. This checks that the assigned domains value is a list of valid FQDNs.

        Args:
            value (str): A list of domains to be set.

        Returns:
            list: A list of valid domain names.

        Raises:
            simple_acme_dns.errors.InvalidDomain: When one or more domains are invalid.
        """
        # Ensure set value is a list
        if not isinstance(value, list):
            msg = "Domains must be of type 'list'."
            raise errors.InvalidDomain(msg)

        # Ensure each domain within the list is an RFC2181 compliant hostname
        for domain in value:
            # Check that value (minus the wildcard if present) is a valid FQDN
            if not validators.domain(self.strip_wildcard(domain)):
                msg = f"Invalid domain name '{domain}'. Domain name must adhere to RFC2181."
                raise errors.InvalidDomain(msg)

        # If we've made it this far, the value is valid. Set it.
        self._domains = value

    @property
    def csr(self) -> bytes:
        """
        Getter for the `csr` property. This checks that a CSR is already set whenever it's referenced.

        Returns:
            bytes: The current PEM encoded CSR data bytes-string if present.

        Raises:
            simple_acme_dns.errors.InvalidCSR: When the `csr` value has not been set yet.
        """
        # Throw an error if the CSR is referenced before it is set
        if not self._csr:
            raise errors.InvalidCSR("CSR value must be set before referencing 'csr'.")

        return self._csr

    @csr.setter
    def csr(self, value: bytes) -> None:
        """
        Setter for the 'csr' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The CSR value being set.

        Raises:
            simple_acme_dns.errors.InvalidCSR: When the `csr` value being set is not of type `bytes`.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidCSR("CSR must be type 'bytes'.")

        self._csr = value

    @property
    def certificate(self) -> bytes:
        """
        Getter for the `certificate` property. This checks that a certificate is already set whenever it's referenced.

        Returns:
            bytes: The current PEM encoded certificate data bytes-string if present.
        """
        return self._certificate

    @certificate.setter
    def certificate(self, value: bytes) -> None:
        """
        Setter for the 'certificate' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The certificate value being set.

        Raises:
            simple_acme_dns.errors.InvalidCertificate: When the `certificate` value being set is not of type `bytes`.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidCertificate("Certificate must be type 'bytes'.")

        self._certificate = value

    @property
    def private_key(self) -> bytes:
        """
        Getter for the 'private_key' property. This checks that a private_key is already set whenever it's referenced.

        Returns:
            bytes: The PEM encoded `private_key` bytes-string that is currently set if present.
        """
        return self._private_key

    @private_key.setter
    def private_key(self, value: bytes) -> None:
        """
        Setter for the 'private_key' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The `private_key` value being set.

        Raises:
            simple_acme_dns.errors.InvalidPrivateKey: When no `private_key` value has been set.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidPrivateKey("Private key must be type 'bytes'.")

        self._private_key = value

Sub-modules

simple_acme_dns.errors

Custom exception classes for simple_acme_dns.

simple_acme_dns.tools

DNS tools to assist ACME verification.

Classes

class ACMEClient (domains: list = None, email: str = None, directory: str = 'https://acme-staging-v02.api.letsencrypt.org/directory', nameservers: list = None, new_account: bool = False, generate_csr: bool = False)

A basic ACME client object to interface with a CA using the ACME DNS-01 challenge.

Args

domains : list
A list of domains to request a certificate for.
email : str
An email address to use when registering new ACME accounts.
directory : str
The ACME directory URL to interact with.
nameservers : list
A list of DNS server hosts to query when checking DNS propagation.
new_account : bool
Automatically create a new ACME account upon creation.
generate_csr : bool
Automatically generate a new private key and CSR upon creation.

Examples

>>> import simple_acme_dns
>>> client = simple_acme_dns.ACMEClient(
...     domains=["test1.example.com", "test2.example.com"],
...     email="example@example.com",
...     directory="https://acme-staging-v02.api.letsencrypt.org/directory",
...     nameservers=["8.8.8.8", "1.1.1.1"],
...     new_account=True,
...     generate_csr=True
... )
Expand source code
class ACMEClient:
    """
    A basic ACME client object to interface with a CA using the ACME DNS-01 challenge.
    """
    # This package is all about user simplicity, keeping all items contained to one class supports this.
    # pylint: disable=too-many-instance-attributes,too-many-public-methods

    def __init__(
            self,
            domains: list = None,
            email: str = None,
            directory: str = "https://acme-staging-v02.api.letsencrypt.org/directory",
            nameservers: list = None,
            new_account: bool = False,
            generate_csr: bool = False
    ):
        """
        Args:
            domains (list): A list of domains to request a certificate for.
            email (str): An email address to use when registering new ACME accounts.
            directory (str): The ACME directory URL to interact with.
            nameservers (list): A list of DNS server hosts to query when checking DNS propagation.
            new_account (bool): Automatically create a new ACME account upon creation.
            generate_csr (bool): Automatically generate a new private key and CSR upon creation.

        Examples:
            >>> import simple_acme_dns
            >>> client = simple_acme_dns.ACMEClient(
            ...     domains=["test1.example.com", "test2.example.com"],
            ...     email="example@example.com",
            ...     directory="https://acme-staging-v02.api.letsencrypt.org/directory",
            ...     nameservers=["8.8.8.8", "1.1.1.1"],
            ...     new_account=True,
            ...     generate_csr=True
            ... )
        """
        self.csr = ''.encode()
        self.directory = directory
        self.directory_obj = None
        self.account_key = None
        self.account = None
        self.account_path = None
        self.nameservers = nameservers
        self.net = None
        self.order = None
        self.final_order = None
        self.responses = []
        self.answers = []
        self._domains = domains if domains else []
        self._email = email
        self._certificate = ''.encode()
        self._private_key = ''.encode()
        self._acme_client = None
        self._verification_tokens = {}

        # Automatically create a new account if requested
        if new_account:
            self.new_account()
        # Automatically create a new private key and CSR
        if generate_csr:
            self.generate_private_key_and_csr()

    def generate_csr(self) -> bytes:
        """
        Generates a new CSR using the object's `domains` and `private_key` attributes.

        Returns:
            bytes: The X509 CSR data bytes-string. This method will update the `csr` attribute of the object
                with the same value.

        Examples:
            >>> client.generate_csr()
            b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxMIGZAgECMAAwWTATBgckjkn...'
        """
        self.csr = crypto_util.make_csr(self.private_key, self.domains)
        return self.csr

    def generate_private_key(self, key_type: str = 'ec256') -> bytes:
        """
        Generates a new RSA or EC private key.

        Args:
            key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

        Returns:
            bytes: The PEM encoded private key data bytes-string. This method will update the `private_key` property
                of the object with the same value.

        Raises:
            simple_acme_dns.errors.InvalidKeyType: When an unknown/unsupported `key_type` is requested.

        Examples:
            >>> client.generate_private_key(key_type="ec384")
            b'-----BEGIN EC PRIVATE KEY-----\\nMIGkAgEBBDAZRFNLcQdVJmLh42p8F4D92...'
        """
        # Generate a EC256 private key
        if key_type == 'ec256':
            key = ec.generate_private_key(ec.SECP256R1(), default_backend())
            self.private_key = key.private_bytes(
                encoding=Encoding.PEM,
                format=PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=NoEncryption())
        # Generate a EC384 private key
        elif key_type == 'ec384':
            key = ec.generate_private_key(ec.SECP384R1(), default_backend())
            self.private_key = key.private_bytes(
                encoding=Encoding.PEM,
                format=PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=NoEncryption()
            )
        # Generate a RSA2048 private key
        elif key_type == 'rsa2048':
            key = OpenSSL.crypto.PKey()
            key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
            self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
        # Generate a RSA4096 private key
        elif key_type == 'rsa4096':
            key = OpenSSL.crypto.PKey()
            key.generate_key(OpenSSL.crypto.TYPE_RSA, 4096)
            self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
        # Otherwise, the requested key type is not supported. Throw an error
        else:
            options = ['ec256', 'ec384', 'rsa2048', 'rsa4096']
            msg = f"Invalid private key rtype '{key_type}'. Options {options}"
            raise errors.InvalidKeyType(msg)
        return self.private_key

    def generate_private_key_and_csr(self, key_type: str = 'ec256') -> tuple:
        """
        Generates a new private key and CSR.

        Args:
            key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

        Returns:
            tuple: A tuple with the first value containing the private key, the second value contains the CSR. This
                method will update the `private_key` and `csr` properties of this object with the same values.

        Examples:
            >>> client.generate_private_key_and_csr(key_type="rsa2048")
            (b'-----BEGIN PRIVATE KEY-----\\nMIIEvAIBA...', b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxM...')
        """
        self.generate_private_key(key_type=key_type)
        self.generate_csr()
        return self.private_key, self.csr

    def request_verification_tokens(self) -> dict:
        """
        Requests verification tokens from the ACME server for each `domains` value. These tokens must be uploaded as
        a DNS TXT record for each corresponding domain to complete verification.

        Returns:
            dict: A dictionary where the key is the domain name the DNS name, and the value is a list of verification
                token strings that must be uploaded as a TXT record for that DNS name.

        Examples:
            >>> client.request_verification_tokens()
            {
                "_acme-challenge.test1.example.com": ["moY32lkdsZ3VWHM1mdM..."],
                "_acme-challenge.test2.example.com": ["asldfkjslweietj23_b...", "nMIIEvAIBA2-212_w..."]
            }
        """
        # Variables
        verification_tokens = {}
        self.responses = {}
        self.order = self.acme_client.new_order(self.csr)

        # Loop through each domain being challenged
        for domain, challenge_items in self.challenges.items():
            # Ensure the ACME label is prefixed to this domain and the wildcard is removed
            domain = f"{DNS_LABEL}.{self.strip_wildcard(domain)}"

            # Loop through each challenge for this domain and extract the response and verification token from each
            for challenge in challenge_items:
                # Create a dict list item for this domain to store it's associated verification tokens in
                verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []

                # Obtain the response and validation items from this challenge
                response, validation = challenge.response_and_validation(self.acme_client.net.key)
                verification_tokens[domain].append(validation)

                # Save the response, so it can be looked up later using the challenge token
                self.responses[challenge.chall.token] = response

        # Set our new verification tokens and return the value
        self._verification_tokens = verification_tokens
        return self.verification_tokens

    def request_certificate(self, wait: int = 0, timeout: int = 90) -> bytes:
        """
        Requests a final verification answer from the ACME server and requests the certificate if verification was
        successful. If you request the certificate before DNS has propagated and verification fails, you must start
        the verification process over entirely by requesting new verification tokens.

        Args:
            wait (int): The amount of time (in seconds) to wait before requesting a challenge answer from the
                server. This is only necessary if you are not using the `check_dns_propagation()` method to verify the
                DNS records exist and would rather wait a specific amount of time instead.
            timeout (int): The amount of time (in seconds) to wait for the ACME server to respond with a certificate.
                If the ACME server does not respond within this time-frame, the request will be considered a failure.

        Returns:
            bytes: The PEM encoded certificate data bytes-string. This method will update the `certificate` attribute
                with this value.

        Examples:
            >>> client.request_certificate()
            b'-----BEGIN CERTIFICATE-----\\nMIIEfzCCA2egAwI...
        """
        # Allow the user to specify an amount of time to wait before requesting the certificate
        time.sleep(wait)
        deadline = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

        # For each domain being challenged, request answers for their challenges
        for _, challenge_list in self.challenges.items():
            # Request an answer for each of this domain's challenges
            for challenge in challenge_list:
                self.answers.append(
                    self.acme_client.answer_challenge(challenge, self.responses[challenge.chall.token])
                )

        # Request our final order and save the certificate if successful
        self.final_order = self.acme_client.poll_and_finalize(self.order, deadline=deadline)
        self.certificate = self.final_order.fullchain_pem.encode()
        return self.certificate

    def revoke_certificate(self, reason: int = 0) -> None:
        """
        Attempts to revoke the existing certificate from the issuing ACME server.

        Args:
            reason (int): The numeric reason for revocation identifier. In most cases, this can be left as `0`.
                For more information, refer to: https://letsencrypt.org/docs/revoking/#specifying-a-reason-code

        Examples:
            >>> client.revoke_certificate()
        """
        # Load the certificate crypto object and request revocation from the ACME server
        cert_obj = jose.ComparableX509(OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, self.certificate))
        self.acme_client.revoke(cert_obj, reason)

    def new_account(self) -> None:
        """
        Registers a new ACME account at the set ACME `directory` URL. By running this method, you are agreeing to the
        ACME servers terms of use.

        Examples:
            >>> client.new_account()
        """
        # Generate a new RSA2048 account key
        rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
        self.account_key = jose.JWKRSA(key=rsa_key)

        # Initialize our ACME client object
        self.net = client.ClientNetwork(self.account_key, user_agent='simple_acme_dns/v2')
        self.directory_obj = messages.Directory.from_json(self.net.get(self.directory).json())
        self.acme_client = client.ClientV2(self.directory_obj, net=self.net)

        # Complete registration
        registration = messages.NewRegistration.from_data(email=self.email, terms_of_service_agreed=True)
        self.account = self.acme_client.new_account(registration)

    def deactivate_account(self, delete: bool = True) -> None:
        """
        Deactivates the current account registration. This action is irreversible.

        Args:
            delete (bool): Indicate whether the associated account file on the local system should also be
                deleted after deactivation.

        Examples:
            >>> client.deactivate_account()
        """
        # Tell the ACME server to deactivate this account
        self.acme_client.deactivate_registration(self.account)

        # If this object contains a linked file path, and deletion is requested, delete the linked file
        if self.account_path and delete:
            # Delete the file if it's present
            try:
                pathlib.Path(self.account_path).unlink()
            except FileNotFoundError:
                pass

    def export_account(self, save_certificate: bool = True, save_private_key: bool = False) -> str:
        """
        Exports the object as a JSON string. This allows the ACME account data to be exported to a string that can
        be re-imported for use later.

        Args:
            save_certificate (bool): Indicate whether the existing certificate should also be stored in the
                JSON string.
            save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

        Returns:
            str: The current object encoded as a JSON string.

        Examples:
            >>> client.export_account(save_certificate=True, save_private_key=True)
            '{"account": {"body": {"key": {"n": "vtByzpW..."}}}}'
        """
        # Format our object into a serializable format
        acct_data = {
            'account': self.account.to_json(),
            'account_key': self.account_key.json_dumps(),
            'directory': self.directory,
            'domains': self.domains,
            'certificate': self.certificate.decode() if save_certificate else '',
            'private_key': self.private_key.decode() if save_private_key else ''
        }

        return json.dumps(acct_data)

    def export_account_to_file(
            self,
            path: str = '.',
            name: str = 'account.json',
            save_certificate: bool = True,
            save_private_key: bool = False
    ) -> None:
        """
        Exports the object as a JSON string. This allows the ACME account data to be exported to a file that can
        be re-imported for use later.

        Args:
            path (str): The directory path to save the account file to. Defaults to the current working directory.
            name (str): The file name to save. Defaults to `account.json`.
            save_certificate (bool): Indicate whether the existing certificate should also be stored in the
                JSON string.
            save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

        Raises:
            simple_acme_dns.errors.InvalidPath: when the requested directory path does not exist.

        Examples:
            >>> client.export_account_to_file(
            ...     path="/tmp/",
            ...     name="my_acme_account.json",
            ...     save_certificate=True,
            ...     save_private_key=True
            ... )
        """
        dir_path = pathlib.Path(path).absolute()

        # Ensure our path is an existing directory, throw an error otherwise
        if dir_path.is_dir():
            # Open the file and write our JSON content
            with open(str(dir_path.joinpath(name)), 'w', encoding="utf-8") as account_file:
                account_file.write(self.export_account(save_certificate, save_private_key))
                self.account_path = str(dir_path.joinpath(name))
        else:
            msg = f"Directory at '{path}' does not exist."
            raise errors.InvalidPath(msg)

    @staticmethod
    def load_account(json_data: str) -> 'ACMEClient':
        """
        Loads an existing account from a JSON data string created by the `export_account()` method.

        Args:
            json_data (str): The JSON account data string to import.

        Returns:
            simple_acme_dns.ACMEClient: The imported ACMEClient object.

        Examples:
            >>> client = simple_acme_dns.ACMEClient.load_account('{"account": {"body": {"key": {"n": "vtByzpW..."}}}}')
        """
        acct_data = json.loads(json_data)
        obj = ACMEClient()

        # Format the serialized data back into the object
        obj.directory = acct_data.get('directory', None)
        obj.domains = acct_data.get('domains', [])
        obj.certificate = acct_data.get('certificate', '').encode()
        obj.private_key = acct_data.get('private_key', '').encode()
        obj.email = acct_data['account']['body']['contact'][0].replace('mailto:', '')
        obj.account = messages.RegistrationResource.json_loads(json.dumps(acct_data['account']))
        obj.account_key = jose.JWKRSA.json_loads(acct_data['account_key'])

        # Re-initialize the ACME client and registration
        obj.net = client.ClientNetwork(obj.account_key, user_agent='simple_acme_dns/1.0.0')
        obj.directory_obj = messages.Directory.from_json(obj.net.get(obj.directory).json())
        obj.acme_client = client.ClientV2(obj.directory_obj, net=obj.net)
        obj.account = obj.acme_client.query_registration(obj.account)

        return obj

    @staticmethod
    def load_account_from_file(filepath: str) -> 'ACMEClient':
        """
        Loads an existing account from a JSON file created by the `export_account_to_file()` method.

        Args:
            filepath (str): The JSON file path to import.

        Returns:
            simple_acme_dns.ACMEClient: The imported ACMEClient object.

        Raises:
            simple_acme_dns.errors.InvalidPath: When the JSON file path does not exist.

        Examples:
            >>> client = simple_acme_dns.ACMEClient.load_account('/tmp/my_acme_account.json')
        """
        filepath = pathlib.Path(filepath).absolute()

        # Ensure our file exists, throw an error otherwise
        if filepath.exists():
            # Open our file and read it's contents.
            with open(filepath, 'r', encoding="utf-8") as json_file:
                json_data = json_file.read()

            # Load contents into a new object.
            obj = ACMEClient.load_account(json_data)
            obj.account_path = filepath
        else:
            raise errors.InvalidPath(f"No JSON account file found at '{filepath}'")

        return obj

    def check_dns_propagation(
            self,
            timeout: int = 300,
            interval: int = 2,
            authoritative: bool = False,
            round_robin: bool = True,
            verbose: bool = False
    ) -> bool:
        """
        Check's each of our domain's TXT record until the value matches it's verification token or until the timeout is
        reached. This method should be executed before executing the `request_certificates()` method. This method can
        take several minutes to complete, ensure you adjust the timeout value accordingly.

        Args:
            timeout (int): The amount of time (in seconds) to continue trying to verify the TXT records.
            interval (int): The amount of time (in seconds) between DNS requests per domain.
            authoritative (bool): Identify and use the authoritative nameserver for each domain instead of the
                `nameservers` values.
            round_robin (bool): Rotate between each nameserver instead of the default failover behavior.
            verbose (bool): Print DNS answers to the console.

        Returns:
            bool: A boolean indicating whether all the `domains` correctly return their verification token in
                the corresponding TXT records.

        Examples:
            >>> client.nameservers = ["8.8.8.8", "1.1.1.1"]
            >>> client.check_dns_propagation(
            ...     timeout=180,
            ...     interval=5,
            ...     authoritative=False,
            ...     round_robin=True,
            ...     verbose=False
            ... )
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
            Token 'O32-fd_...' for '_acme-challenge.test2.example.com' not found in [] via 8.8.8.8
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
            Token 'O32-fd_...' for '_acme-challenge.test2.example.com' found in ['O32-fd_...'] via 1.1.1.1
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
            Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' found in ['moY3Cd0...'] via 8.8.8.8
            True
        """
        # Variables
        verified = []
        resolvers = []
        timeout = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

        # Create a DNS resolver objects for each domain being verified.
        for domain, tokens in self._verification_tokens.items():
            # Create a resolver for each token required for verification of this domain.
            for token in tokens:
                resolv = tools.DNSQuery(
                    domain,
                    rtype='TXT',
                    authoritative=authoritative,
                    nameservers=self.nameservers,
                    round_robin=round_robin
                )
                resolvers.append((domain, token, resolv))

        # Loop until we have exceeded our timeout value
        while datetime.datetime.now() < timeout:
            # Loop through each domain being verified
            for domain, token, resolver in resolvers:
                # Only try to verify the domain if it has not already been verified
                if token not in verified:
                    resolver.resolve()
                    # Save this domain as verified if our token was found in the TXT record values
                    if token in resolver.values:
                        verified.append(token)
                    # If verbose mode is enabled, print the results to the console
                    if verbose:
                        action = ('found' if token in verified else 'not found')
                        values = resolver.values
                        nameserver = resolver.last_nameserver
                        msg = f"Token '{token}' for '{domain}' {action} in {values} via {nameserver}"
                        print(msg)

            # Check that all resolvers completed verification
            if len(verified) == len(resolvers):
                return True

            # Avoid flooding the DNS server(s) by briefly pausing between DNS checks
            time.sleep(interval)

        return False

    @staticmethod
    def strip_wildcard(domain: str) -> str:
        """
        Strips the wildcard portion of a domain (*.) if present.

        Args:
            domain (str): The domain string to strip wildcards from.

        Returns:
            str: The domain string without the wildcard portion.
        """
        # If wildcard domain, strip of the wildcard to validate domain
        return domain[2:] if domain[:2].startswith("*.") else domain

    @property
    def challenges(self) -> dict:
        """
        Getter for the `challenges` property. Returns current DNS challenges found in our current ACME order.

        Returns:
            dict: A dictionary where the key is the domain name, and the value is a list of Challenge objects.

        Raises:
            simple_acme_dns.errors.OrderNotFound: When this property is called before the `order` object exists.
        """
        # Variables
        challs = {}

        # Do not allow this property to be called if an order has not been created beforehand.
        if not self.order:
            raise errors.OrderNotFound("Cannot get 'challenges' without an ACME order.")

        # Loop through each of our authorizations
        for auth in list(self.order.authorizations):
            # Loop through each authorization's available challenges
            for challenge in auth.body.challenges:
                # If this challenge is a DNS01 Challenege object, add it to our challenges.
                if isinstance(challenge.chall, challenges.DNS01):
                    # Capture the original domain requested
                    domain = auth.body.identifier.value
                    # Add this challenge to the dictionary item for this authorization's domain name
                    challs[domain] = challs[domain] if domain in challs else []
                    challs[domain].append(challenge)

        # If no challenges were found, throw an error
        if not challs:
            msg = f"ACME server at '{self.directory}' does not support DNS-01 challenge."
            raise errors.ChallengeUnavailable(msg.format(directory=str(self.directory)))

        return challs

    @property
    def acme_client(self) -> client.ClientV2:
        """
        Getter for the `client` property. This checks that the ACME client is set up whenever it's referenced.

        Returns:
            acme.client.ClientV2: The ClientV2 object needed to interact with the ACME server.

        Raises:
            simple_acme_dns.errors.InvalidAccount: When no account registration is configured for this object.
        """
        if not isinstance(self._acme_client, client.ClientV2):
            msg = 'No account registration found. You must register a new account or load an existing account first.'
            raise errors.InvalidAccount(msg)

        return self._acme_client

    @acme_client.setter
    def acme_client(self, value: client.ClientV2):
        """
        Setter for the `acme_client` property. This ensures the acme_client is an acme.client.ClientV2 object

        Args
         value (str): The `acme_client` value being set.

        Raises:
            simple_acme_dns.errors.InvalidAccount: When the `value` is not a valid email address
        """
        if not isinstance(value, client.ClientV2):
            msg = f"Value '{value}' is not an acme.client.ClientV2 object."
            raise errors.InvalidAccount(msg)

        self._acme_client = value

    @property
    def email(self) -> str:
        """
        Getter for the `email` property. This checks that an email exists when it's referenced.

        Returns:
            str:  A string representation of the email address

        Raises:
            simple_acme_dns.errors.InvalidEmail: When `email` is not set.

        """
        if not self._email:
            msg = 'No account email found. You must set the _email value first.'
            raise errors.InvalidEmail(msg)

        return self._email

    @email.setter
    def email(self, value: str):
        """
        Setter for the `email` property. This ensures an email address is valid before setting.

        Args
         value (str): The `email` value being set.

        Raises:
            simple_acme_dns.errors.InvalidEmail: When the `value` is not a valid email address
        """
        if not validators.email(value):
            msg = f"Value '{value}' is not a valid email address."
            raise errors.InvalidEmail(msg)

        self._email = value

    @property
    def verification_tokens(self) -> dict:
        """
        Getter for the `verification_tokens` property. This checks that verification tokens already
        exist whenever they are referenced.

        Returns:
            dict: A dictionary where the key is the verification domain name, and the value is a list of tokens for
                that domain. A DNS TXT entry must be created for each domain name with each of it's corresponding
                tokens as the TXT value.

        Raises:
            simple_acme_dns.errors.InvalidVerificationToken: When no verification tokens have been requested using
                the `request_verification_tokens()` method.
        """
        if not self._verification_tokens:
            msg = 'No verification tokens found. You must run request_verification_tokens() first.'
            raise errors.InvalidVerificationToken(msg)

        return self._verification_tokens

    @property
    def domains(self) -> list:
        """
        Getter for the `domains` property. This checks that domains are already set whenever it's referenced.

        Returns:
            list: A list of domain names currently set.

        Raises:
             simple_acme_dns.errors.InvalidDomain: When no `domains` have been set.
        """
        if not self._domains:
            msg = 'No domains found. You must set the domains value first.'
            raise errors.InvalidDomain(msg)

        return self._domains

    @domains.setter
    def domains(self, value) -> None:
        """
        Setter for the `domains` property. This checks that the assigned domains value is a list of valid FQDNs.

        Args:
            value (str): A list of domains to be set.

        Returns:
            list: A list of valid domain names.

        Raises:
            simple_acme_dns.errors.InvalidDomain: When one or more domains are invalid.
        """
        # Ensure set value is a list
        if not isinstance(value, list):
            msg = "Domains must be of type 'list'."
            raise errors.InvalidDomain(msg)

        # Ensure each domain within the list is an RFC2181 compliant hostname
        for domain in value:
            # Check that value (minus the wildcard if present) is a valid FQDN
            if not validators.domain(self.strip_wildcard(domain)):
                msg = f"Invalid domain name '{domain}'. Domain name must adhere to RFC2181."
                raise errors.InvalidDomain(msg)

        # If we've made it this far, the value is valid. Set it.
        self._domains = value

    @property
    def csr(self) -> bytes:
        """
        Getter for the `csr` property. This checks that a CSR is already set whenever it's referenced.

        Returns:
            bytes: The current PEM encoded CSR data bytes-string if present.

        Raises:
            simple_acme_dns.errors.InvalidCSR: When the `csr` value has not been set yet.
        """
        # Throw an error if the CSR is referenced before it is set
        if not self._csr:
            raise errors.InvalidCSR("CSR value must be set before referencing 'csr'.")

        return self._csr

    @csr.setter
    def csr(self, value: bytes) -> None:
        """
        Setter for the 'csr' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The CSR value being set.

        Raises:
            simple_acme_dns.errors.InvalidCSR: When the `csr` value being set is not of type `bytes`.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidCSR("CSR must be type 'bytes'.")

        self._csr = value

    @property
    def certificate(self) -> bytes:
        """
        Getter for the `certificate` property. This checks that a certificate is already set whenever it's referenced.

        Returns:
            bytes: The current PEM encoded certificate data bytes-string if present.
        """
        return self._certificate

    @certificate.setter
    def certificate(self, value: bytes) -> None:
        """
        Setter for the 'certificate' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The certificate value being set.

        Raises:
            simple_acme_dns.errors.InvalidCertificate: When the `certificate` value being set is not of type `bytes`.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidCertificate("Certificate must be type 'bytes'.")

        self._certificate = value

    @property
    def private_key(self) -> bytes:
        """
        Getter for the 'private_key' property. This checks that a private_key is already set whenever it's referenced.

        Returns:
            bytes: The PEM encoded `private_key` bytes-string that is currently set if present.
        """
        return self._private_key

    @private_key.setter
    def private_key(self, value: bytes) -> None:
        """
        Setter for the 'private_key' property. This ensures the set value is a bytes-string.

        Args:
            value (bytes): The `private_key` value being set.

        Raises:
            simple_acme_dns.errors.InvalidPrivateKey: When no `private_key` value has been set.
        """
        # Convert string assignments to bytes
        if not isinstance(value, bytes):
            raise errors.InvalidPrivateKey("Private key must be type 'bytes'.")

        self._private_key = value

Static methods

def load_account(json_data: str) ‑> ACMEClient

Loads an existing account from a JSON data string created by the export_account() method.

Args

json_data : str
The JSON account data string to import.

Returns

ACMEClient
The imported ACMEClient object.

Examples

>>> client = simple_acme_dns.ACMEClient.load_account('{"account": {"body": {"key": {"n": "vtByzpW..."}}}}')
Expand source code
@staticmethod
def load_account(json_data: str) -> 'ACMEClient':
    """
    Loads an existing account from a JSON data string created by the `export_account()` method.

    Args:
        json_data (str): The JSON account data string to import.

    Returns:
        simple_acme_dns.ACMEClient: The imported ACMEClient object.

    Examples:
        >>> client = simple_acme_dns.ACMEClient.load_account('{"account": {"body": {"key": {"n": "vtByzpW..."}}}}')
    """
    acct_data = json.loads(json_data)
    obj = ACMEClient()

    # Format the serialized data back into the object
    obj.directory = acct_data.get('directory', None)
    obj.domains = acct_data.get('domains', [])
    obj.certificate = acct_data.get('certificate', '').encode()
    obj.private_key = acct_data.get('private_key', '').encode()
    obj.email = acct_data['account']['body']['contact'][0].replace('mailto:', '')
    obj.account = messages.RegistrationResource.json_loads(json.dumps(acct_data['account']))
    obj.account_key = jose.JWKRSA.json_loads(acct_data['account_key'])

    # Re-initialize the ACME client and registration
    obj.net = client.ClientNetwork(obj.account_key, user_agent='simple_acme_dns/1.0.0')
    obj.directory_obj = messages.Directory.from_json(obj.net.get(obj.directory).json())
    obj.acme_client = client.ClientV2(obj.directory_obj, net=obj.net)
    obj.account = obj.acme_client.query_registration(obj.account)

    return obj
def load_account_from_file(filepath: str) ‑> ACMEClient

Loads an existing account from a JSON file created by the export_account_to_file() method.

Args

filepath : str
The JSON file path to import.

Returns

ACMEClient
The imported ACMEClient object.

Raises

InvalidPath
When the JSON file path does not exist.

Examples

>>> client = simple_acme_dns.ACMEClient.load_account('/tmp/my_acme_account.json')
Expand source code
@staticmethod
def load_account_from_file(filepath: str) -> 'ACMEClient':
    """
    Loads an existing account from a JSON file created by the `export_account_to_file()` method.

    Args:
        filepath (str): The JSON file path to import.

    Returns:
        simple_acme_dns.ACMEClient: The imported ACMEClient object.

    Raises:
        simple_acme_dns.errors.InvalidPath: When the JSON file path does not exist.

    Examples:
        >>> client = simple_acme_dns.ACMEClient.load_account('/tmp/my_acme_account.json')
    """
    filepath = pathlib.Path(filepath).absolute()

    # Ensure our file exists, throw an error otherwise
    if filepath.exists():
        # Open our file and read it's contents.
        with open(filepath, 'r', encoding="utf-8") as json_file:
            json_data = json_file.read()

        # Load contents into a new object.
        obj = ACMEClient.load_account(json_data)
        obj.account_path = filepath
    else:
        raise errors.InvalidPath(f"No JSON account file found at '{filepath}'")

    return obj
def strip_wildcard(domain: str) ‑> str

Strips the wildcard portion of a domain (*.) if present.

Args

domain : str
The domain string to strip wildcards from.

Returns

str
The domain string without the wildcard portion.
Expand source code
@staticmethod
def strip_wildcard(domain: str) -> str:
    """
    Strips the wildcard portion of a domain (*.) if present.

    Args:
        domain (str): The domain string to strip wildcards from.

    Returns:
        str: The domain string without the wildcard portion.
    """
    # If wildcard domain, strip of the wildcard to validate domain
    return domain[2:] if domain[:2].startswith("*.") else domain

Instance variables

var acme_client : acme.client.ClientV2

Getter for the client property. This checks that the ACME client is set up whenever it's referenced.

Returns

acme.client.ClientV2
The ClientV2 object needed to interact with the ACME server.

Raises

InvalidAccount
When no account registration is configured for this object.
Expand source code
@property
def acme_client(self) -> client.ClientV2:
    """
    Getter for the `client` property. This checks that the ACME client is set up whenever it's referenced.

    Returns:
        acme.client.ClientV2: The ClientV2 object needed to interact with the ACME server.

    Raises:
        simple_acme_dns.errors.InvalidAccount: When no account registration is configured for this object.
    """
    if not isinstance(self._acme_client, client.ClientV2):
        msg = 'No account registration found. You must register a new account or load an existing account first.'
        raise errors.InvalidAccount(msg)

    return self._acme_client
var certificate : bytes

Getter for the certificate property. This checks that a certificate is already set whenever it's referenced.

Returns

bytes
The current PEM encoded certificate data bytes-string if present.
Expand source code
@property
def certificate(self) -> bytes:
    """
    Getter for the `certificate` property. This checks that a certificate is already set whenever it's referenced.

    Returns:
        bytes: The current PEM encoded certificate data bytes-string if present.
    """
    return self._certificate
var challenges : dict

Getter for the challenges property. Returns current DNS challenges found in our current ACME order.

Returns

dict
A dictionary where the key is the domain name, and the value is a list of Challenge objects.

Raises

OrderNotFound
When this property is called before the order object exists.
Expand source code
@property
def challenges(self) -> dict:
    """
    Getter for the `challenges` property. Returns current DNS challenges found in our current ACME order.

    Returns:
        dict: A dictionary where the key is the domain name, and the value is a list of Challenge objects.

    Raises:
        simple_acme_dns.errors.OrderNotFound: When this property is called before the `order` object exists.
    """
    # Variables
    challs = {}

    # Do not allow this property to be called if an order has not been created beforehand.
    if not self.order:
        raise errors.OrderNotFound("Cannot get 'challenges' without an ACME order.")

    # Loop through each of our authorizations
    for auth in list(self.order.authorizations):
        # Loop through each authorization's available challenges
        for challenge in auth.body.challenges:
            # If this challenge is a DNS01 Challenege object, add it to our challenges.
            if isinstance(challenge.chall, challenges.DNS01):
                # Capture the original domain requested
                domain = auth.body.identifier.value
                # Add this challenge to the dictionary item for this authorization's domain name
                challs[domain] = challs[domain] if domain in challs else []
                challs[domain].append(challenge)

    # If no challenges were found, throw an error
    if not challs:
        msg = f"ACME server at '{self.directory}' does not support DNS-01 challenge."
        raise errors.ChallengeUnavailable(msg.format(directory=str(self.directory)))

    return challs
var csr : bytes

Getter for the csr property. This checks that a CSR is already set whenever it's referenced.

Returns

bytes
The current PEM encoded CSR data bytes-string if present.

Raises

InvalidCSR
When the csr value has not been set yet.
Expand source code
@property
def csr(self) -> bytes:
    """
    Getter for the `csr` property. This checks that a CSR is already set whenever it's referenced.

    Returns:
        bytes: The current PEM encoded CSR data bytes-string if present.

    Raises:
        simple_acme_dns.errors.InvalidCSR: When the `csr` value has not been set yet.
    """
    # Throw an error if the CSR is referenced before it is set
    if not self._csr:
        raise errors.InvalidCSR("CSR value must be set before referencing 'csr'.")

    return self._csr
var domains : list

Getter for the domains property. This checks that domains are already set whenever it's referenced.

Returns

list
A list of domain names currently set.

Raises

InvalidDomain
When no domains have been set.
Expand source code
@property
def domains(self) -> list:
    """
    Getter for the `domains` property. This checks that domains are already set whenever it's referenced.

    Returns:
        list: A list of domain names currently set.

    Raises:
         simple_acme_dns.errors.InvalidDomain: When no `domains` have been set.
    """
    if not self._domains:
        msg = 'No domains found. You must set the domains value first.'
        raise errors.InvalidDomain(msg)

    return self._domains
var email : str

Getter for the email property. This checks that an email exists when it's referenced.

Returns

str
A string representation of the email address

Raises

InvalidEmail
When email is not set.
Expand source code
@property
def email(self) -> str:
    """
    Getter for the `email` property. This checks that an email exists when it's referenced.

    Returns:
        str:  A string representation of the email address

    Raises:
        simple_acme_dns.errors.InvalidEmail: When `email` is not set.

    """
    if not self._email:
        msg = 'No account email found. You must set the _email value first.'
        raise errors.InvalidEmail(msg)

    return self._email
var private_key : bytes

Getter for the 'private_key' property. This checks that a private_key is already set whenever it's referenced.

Returns

bytes
The PEM encoded private_key bytes-string that is currently set if present.
Expand source code
@property
def private_key(self) -> bytes:
    """
    Getter for the 'private_key' property. This checks that a private_key is already set whenever it's referenced.

    Returns:
        bytes: The PEM encoded `private_key` bytes-string that is currently set if present.
    """
    return self._private_key
var verification_tokens : dict

Getter for the verification_tokens property. This checks that verification tokens already exist whenever they are referenced.

Returns

dict
A dictionary where the key is the verification domain name, and the value is a list of tokens for that domain. A DNS TXT entry must be created for each domain name with each of it's corresponding tokens as the TXT value.

Raises

InvalidVerificationToken
When no verification tokens have been requested using the request_verification_tokens() method.
Expand source code
@property
def verification_tokens(self) -> dict:
    """
    Getter for the `verification_tokens` property. This checks that verification tokens already
    exist whenever they are referenced.

    Returns:
        dict: A dictionary where the key is the verification domain name, and the value is a list of tokens for
            that domain. A DNS TXT entry must be created for each domain name with each of it's corresponding
            tokens as the TXT value.

    Raises:
        simple_acme_dns.errors.InvalidVerificationToken: When no verification tokens have been requested using
            the `request_verification_tokens()` method.
    """
    if not self._verification_tokens:
        msg = 'No verification tokens found. You must run request_verification_tokens() first.'
        raise errors.InvalidVerificationToken(msg)

    return self._verification_tokens

Methods

def check_dns_propagation(self, timeout: int = 300, interval: int = 2, authoritative: bool = False, round_robin: bool = True, verbose: bool = False) ‑> bool

Check's each of our domain's TXT record until the value matches it's verification token or until the timeout is reached. This method should be executed before executing the request_certificates() method. This method can take several minutes to complete, ensure you adjust the timeout value accordingly.

Args

timeout : int
The amount of time (in seconds) to continue trying to verify the TXT records.
interval : int
The amount of time (in seconds) between DNS requests per domain.
authoritative : bool
Identify and use the authoritative nameserver for each domain instead of the nameservers values.
round_robin : bool
Rotate between each nameserver instead of the default failover behavior.
verbose : bool
Print DNS answers to the console.

Returns

bool
A boolean indicating whether all the domains correctly return their verification token in the corresponding TXT records.

Examples

>>> client.nameservers = ["8.8.8.8", "1.1.1.1"]
>>> client.check_dns_propagation(
...     timeout=180,
...     interval=5,
...     authoritative=False,
...     round_robin=True,
...     verbose=False
... )
Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
Token 'O32-fd_...' for '_acme-challenge.test2.example.com' not found in [] via 8.8.8.8
Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
Token 'O32-fd_...' for '_acme-challenge.test2.example.com' found in ['O32-fd_...'] via 1.1.1.1
Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' found in ['moY3Cd0...'] via 8.8.8.8
True
Expand source code
def check_dns_propagation(
        self,
        timeout: int = 300,
        interval: int = 2,
        authoritative: bool = False,
        round_robin: bool = True,
        verbose: bool = False
) -> bool:
    """
    Check's each of our domain's TXT record until the value matches it's verification token or until the timeout is
    reached. This method should be executed before executing the `request_certificates()` method. This method can
    take several minutes to complete, ensure you adjust the timeout value accordingly.

    Args:
        timeout (int): The amount of time (in seconds) to continue trying to verify the TXT records.
        interval (int): The amount of time (in seconds) between DNS requests per domain.
        authoritative (bool): Identify and use the authoritative nameserver for each domain instead of the
            `nameservers` values.
        round_robin (bool): Rotate between each nameserver instead of the default failover behavior.
        verbose (bool): Print DNS answers to the console.

    Returns:
        bool: A boolean indicating whether all the `domains` correctly return their verification token in
            the corresponding TXT records.

    Examples:
        >>> client.nameservers = ["8.8.8.8", "1.1.1.1"]
        >>> client.check_dns_propagation(
        ...     timeout=180,
        ...     interval=5,
        ...     authoritative=False,
        ...     round_robin=True,
        ...     verbose=False
        ... )
        Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
        Token 'O32-fd_...' for '_acme-challenge.test2.example.com' not found in [] via 8.8.8.8
        Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
        Token 'O32-fd_...' for '_acme-challenge.test2.example.com' found in ['O32-fd_...'] via 1.1.1.1
        Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 8.8.8.8
        Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' not found in [] via 1.1.1.1
        Token 'moY3Cd0...' for '_acme-challenge.test1.example.com' found in ['moY3Cd0...'] via 8.8.8.8
        True
    """
    # Variables
    verified = []
    resolvers = []
    timeout = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

    # Create a DNS resolver objects for each domain being verified.
    for domain, tokens in self._verification_tokens.items():
        # Create a resolver for each token required for verification of this domain.
        for token in tokens:
            resolv = tools.DNSQuery(
                domain,
                rtype='TXT',
                authoritative=authoritative,
                nameservers=self.nameservers,
                round_robin=round_robin
            )
            resolvers.append((domain, token, resolv))

    # Loop until we have exceeded our timeout value
    while datetime.datetime.now() < timeout:
        # Loop through each domain being verified
        for domain, token, resolver in resolvers:
            # Only try to verify the domain if it has not already been verified
            if token not in verified:
                resolver.resolve()
                # Save this domain as verified if our token was found in the TXT record values
                if token in resolver.values:
                    verified.append(token)
                # If verbose mode is enabled, print the results to the console
                if verbose:
                    action = ('found' if token in verified else 'not found')
                    values = resolver.values
                    nameserver = resolver.last_nameserver
                    msg = f"Token '{token}' for '{domain}' {action} in {values} via {nameserver}"
                    print(msg)

        # Check that all resolvers completed verification
        if len(verified) == len(resolvers):
            return True

        # Avoid flooding the DNS server(s) by briefly pausing between DNS checks
        time.sleep(interval)

    return False
def deactivate_account(self, delete: bool = True) ‑> None

Deactivates the current account registration. This action is irreversible.

Args

delete : bool
Indicate whether the associated account file on the local system should also be deleted after deactivation.

Examples

>>> client.deactivate_account()
Expand source code
def deactivate_account(self, delete: bool = True) -> None:
    """
    Deactivates the current account registration. This action is irreversible.

    Args:
        delete (bool): Indicate whether the associated account file on the local system should also be
            deleted after deactivation.

    Examples:
        >>> client.deactivate_account()
    """
    # Tell the ACME server to deactivate this account
    self.acme_client.deactivate_registration(self.account)

    # If this object contains a linked file path, and deletion is requested, delete the linked file
    if self.account_path and delete:
        # Delete the file if it's present
        try:
            pathlib.Path(self.account_path).unlink()
        except FileNotFoundError:
            pass
def export_account(self, save_certificate: bool = True, save_private_key: bool = False) ‑> str

Exports the object as a JSON string. This allows the ACME account data to be exported to a string that can be re-imported for use later.

Args

save_certificate : bool
Indicate whether the existing certificate should also be stored in the JSON string.
save_private_key : booL
Indicate whether the private key should also be stored in the JSON string.

Returns

str
The current object encoded as a JSON string.

Examples

>>> client.export_account(save_certificate=True, save_private_key=True)
'{"account": {"body": {"key": {"n": "vtByzpW..."}}}}'
Expand source code
def export_account(self, save_certificate: bool = True, save_private_key: bool = False) -> str:
    """
    Exports the object as a JSON string. This allows the ACME account data to be exported to a string that can
    be re-imported for use later.

    Args:
        save_certificate (bool): Indicate whether the existing certificate should also be stored in the
            JSON string.
        save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

    Returns:
        str: The current object encoded as a JSON string.

    Examples:
        >>> client.export_account(save_certificate=True, save_private_key=True)
        '{"account": {"body": {"key": {"n": "vtByzpW..."}}}}'
    """
    # Format our object into a serializable format
    acct_data = {
        'account': self.account.to_json(),
        'account_key': self.account_key.json_dumps(),
        'directory': self.directory,
        'domains': self.domains,
        'certificate': self.certificate.decode() if save_certificate else '',
        'private_key': self.private_key.decode() if save_private_key else ''
    }

    return json.dumps(acct_data)
def export_account_to_file(self, path: str = '.', name: str = 'account.json', save_certificate: bool = True, save_private_key: bool = False) ‑> None

Exports the object as a JSON string. This allows the ACME account data to be exported to a file that can be re-imported for use later.

Args

path : str
The directory path to save the account file to. Defaults to the current working directory.
name : str
The file name to save. Defaults to account.json.
save_certificate : bool
Indicate whether the existing certificate should also be stored in the JSON string.
save_private_key : booL
Indicate whether the private key should also be stored in the JSON string.

Raises

InvalidPath
when the requested directory path does not exist.

Examples

>>> client.export_account_to_file(
...     path="/tmp/",
...     name="my_acme_account.json",
...     save_certificate=True,
...     save_private_key=True
... )
Expand source code
def export_account_to_file(
        self,
        path: str = '.',
        name: str = 'account.json',
        save_certificate: bool = True,
        save_private_key: bool = False
) -> None:
    """
    Exports the object as a JSON string. This allows the ACME account data to be exported to a file that can
    be re-imported for use later.

    Args:
        path (str): The directory path to save the account file to. Defaults to the current working directory.
        name (str): The file name to save. Defaults to `account.json`.
        save_certificate (bool): Indicate whether the existing certificate should also be stored in the
            JSON string.
        save_private_key (booL): Indicate whether the private key should also be stored in the JSON string.

    Raises:
        simple_acme_dns.errors.InvalidPath: when the requested directory path does not exist.

    Examples:
        >>> client.export_account_to_file(
        ...     path="/tmp/",
        ...     name="my_acme_account.json",
        ...     save_certificate=True,
        ...     save_private_key=True
        ... )
    """
    dir_path = pathlib.Path(path).absolute()

    # Ensure our path is an existing directory, throw an error otherwise
    if dir_path.is_dir():
        # Open the file and write our JSON content
        with open(str(dir_path.joinpath(name)), 'w', encoding="utf-8") as account_file:
            account_file.write(self.export_account(save_certificate, save_private_key))
            self.account_path = str(dir_path.joinpath(name))
    else:
        msg = f"Directory at '{path}' does not exist."
        raise errors.InvalidPath(msg)
def generate_csr(self) ‑> bytes

Generates a new CSR using the object's domains and private_key attributes.

Returns

bytes
The X509 CSR data bytes-string. This method will update the csr attribute of the object with the same value.

Examples

>>> client.generate_csr()
b'-----BEGIN CERTIFICATE REQUEST-----\nMIHxMIGZAgECMAAwWTATBgckjkn...'
Expand source code
def generate_csr(self) -> bytes:
    """
    Generates a new CSR using the object's `domains` and `private_key` attributes.

    Returns:
        bytes: The X509 CSR data bytes-string. This method will update the `csr` attribute of the object
            with the same value.

    Examples:
        >>> client.generate_csr()
        b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxMIGZAgECMAAwWTATBgckjkn...'
    """
    self.csr = crypto_util.make_csr(self.private_key, self.domains)
    return self.csr
def generate_private_key(self, key_type: str = 'ec256') ‑> bytes

Generates a new RSA or EC private key.

Args

key_type : str
The requested private_key type. Options are: [ec256, ec384, rsa2048, rsa4096]

Returns

bytes
The PEM encoded private key data bytes-string. This method will update the private_key property of the object with the same value.

Raises

InvalidKeyType
When an unknown/unsupported key_type is requested.

Examples

>>> client.generate_private_key(key_type="ec384")
b'-----BEGIN EC PRIVATE KEY-----\nMIGkAgEBBDAZRFNLcQdVJmLh42p8F4D92...'
Expand source code
def generate_private_key(self, key_type: str = 'ec256') -> bytes:
    """
    Generates a new RSA or EC private key.

    Args:
        key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

    Returns:
        bytes: The PEM encoded private key data bytes-string. This method will update the `private_key` property
            of the object with the same value.

    Raises:
        simple_acme_dns.errors.InvalidKeyType: When an unknown/unsupported `key_type` is requested.

    Examples:
        >>> client.generate_private_key(key_type="ec384")
        b'-----BEGIN EC PRIVATE KEY-----\\nMIGkAgEBBDAZRFNLcQdVJmLh42p8F4D92...'
    """
    # Generate a EC256 private key
    if key_type == 'ec256':
        key = ec.generate_private_key(ec.SECP256R1(), default_backend())
        self.private_key = key.private_bytes(
            encoding=Encoding.PEM,
            format=PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=NoEncryption())
    # Generate a EC384 private key
    elif key_type == 'ec384':
        key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        self.private_key = key.private_bytes(
            encoding=Encoding.PEM,
            format=PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=NoEncryption()
        )
    # Generate a RSA2048 private key
    elif key_type == 'rsa2048':
        key = OpenSSL.crypto.PKey()
        key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
        self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
    # Generate a RSA4096 private key
    elif key_type == 'rsa4096':
        key = OpenSSL.crypto.PKey()
        key.generate_key(OpenSSL.crypto.TYPE_RSA, 4096)
        self.private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
    # Otherwise, the requested key type is not supported. Throw an error
    else:
        options = ['ec256', 'ec384', 'rsa2048', 'rsa4096']
        msg = f"Invalid private key rtype '{key_type}'. Options {options}"
        raise errors.InvalidKeyType(msg)
    return self.private_key
def generate_private_key_and_csr(self, key_type: str = 'ec256') ‑> tuple

Generates a new private key and CSR.

Args

key_type : str
The requested private_key type. Options are: [ec256, ec384, rsa2048, rsa4096]

Returns

tuple
A tuple with the first value containing the private key, the second value contains the CSR. This method will update the private_key and csr properties of this object with the same values.

Examples

>>> client.generate_private_key_and_csr(key_type="rsa2048")
(b'-----BEGIN PRIVATE KEY-----\nMIIEvAIBA...', b'-----BEGIN CERTIFICATE REQUEST-----\nMIHxM...')
Expand source code
def generate_private_key_and_csr(self, key_type: str = 'ec256') -> tuple:
    """
    Generates a new private key and CSR.

    Args:
        key_type (str): The requested `private_key` type. Options are: [`ec256`, `ec384`, `rsa2048`, `rsa4096`]

    Returns:
        tuple: A tuple with the first value containing the private key, the second value contains the CSR. This
            method will update the `private_key` and `csr` properties of this object with the same values.

    Examples:
        >>> client.generate_private_key_and_csr(key_type="rsa2048")
        (b'-----BEGIN PRIVATE KEY-----\\nMIIEvAIBA...', b'-----BEGIN CERTIFICATE REQUEST-----\\nMIHxM...')
    """
    self.generate_private_key(key_type=key_type)
    self.generate_csr()
    return self.private_key, self.csr
def new_account(self) ‑> None

Registers a new ACME account at the set ACME directory URL. By running this method, you are agreeing to the ACME servers terms of use.

Examples

>>> client.new_account()
Expand source code
def new_account(self) -> None:
    """
    Registers a new ACME account at the set ACME `directory` URL. By running this method, you are agreeing to the
    ACME servers terms of use.

    Examples:
        >>> client.new_account()
    """
    # Generate a new RSA2048 account key
    rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
    self.account_key = jose.JWKRSA(key=rsa_key)

    # Initialize our ACME client object
    self.net = client.ClientNetwork(self.account_key, user_agent='simple_acme_dns/v2')
    self.directory_obj = messages.Directory.from_json(self.net.get(self.directory).json())
    self.acme_client = client.ClientV2(self.directory_obj, net=self.net)

    # Complete registration
    registration = messages.NewRegistration.from_data(email=self.email, terms_of_service_agreed=True)
    self.account = self.acme_client.new_account(registration)
def request_certificate(self, wait: int = 0, timeout: int = 90) ‑> bytes

Requests a final verification answer from the ACME server and requests the certificate if verification was successful. If you request the certificate before DNS has propagated and verification fails, you must start the verification process over entirely by requesting new verification tokens.

Args

wait : int
The amount of time (in seconds) to wait before requesting a challenge answer from the server. This is only necessary if you are not using the check_dns_propagation() method to verify the DNS records exist and would rather wait a specific amount of time instead.
timeout : int
The amount of time (in seconds) to wait for the ACME server to respond with a certificate. If the ACME server does not respond within this time-frame, the request will be considered a failure.

Returns

bytes
The PEM encoded certificate data bytes-string. This method will update the certificate attribute with this value.

Examples

>>> client.request_certificate()
b'-----BEGIN CERTIFICATE-----\nMIIEfzCCA2egAwI...
Expand source code
def request_certificate(self, wait: int = 0, timeout: int = 90) -> bytes:
    """
    Requests a final verification answer from the ACME server and requests the certificate if verification was
    successful. If you request the certificate before DNS has propagated and verification fails, you must start
    the verification process over entirely by requesting new verification tokens.

    Args:
        wait (int): The amount of time (in seconds) to wait before requesting a challenge answer from the
            server. This is only necessary if you are not using the `check_dns_propagation()` method to verify the
            DNS records exist and would rather wait a specific amount of time instead.
        timeout (int): The amount of time (in seconds) to wait for the ACME server to respond with a certificate.
            If the ACME server does not respond within this time-frame, the request will be considered a failure.

    Returns:
        bytes: The PEM encoded certificate data bytes-string. This method will update the `certificate` attribute
            with this value.

    Examples:
        >>> client.request_certificate()
        b'-----BEGIN CERTIFICATE-----\\nMIIEfzCCA2egAwI...
    """
    # Allow the user to specify an amount of time to wait before requesting the certificate
    time.sleep(wait)
    deadline = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

    # For each domain being challenged, request answers for their challenges
    for _, challenge_list in self.challenges.items():
        # Request an answer for each of this domain's challenges
        for challenge in challenge_list:
            self.answers.append(
                self.acme_client.answer_challenge(challenge, self.responses[challenge.chall.token])
            )

    # Request our final order and save the certificate if successful
    self.final_order = self.acme_client.poll_and_finalize(self.order, deadline=deadline)
    self.certificate = self.final_order.fullchain_pem.encode()
    return self.certificate
def request_verification_tokens(self) ‑> dict

Requests verification tokens from the ACME server for each domains value. These tokens must be uploaded as a DNS TXT record for each corresponding domain to complete verification.

Returns

dict
A dictionary where the key is the domain name the DNS name, and the value is a list of verification token strings that must be uploaded as a TXT record for that DNS name.

Examples

>>> client.request_verification_tokens()
{
    "_acme-challenge.test1.example.com": ["moY32lkdsZ3VWHM1mdM..."],
    "_acme-challenge.test2.example.com": ["asldfkjslweietj23_b...", "nMIIEvAIBA2-212_w..."]
}
Expand source code
def request_verification_tokens(self) -> dict:
    """
    Requests verification tokens from the ACME server for each `domains` value. These tokens must be uploaded as
    a DNS TXT record for each corresponding domain to complete verification.

    Returns:
        dict: A dictionary where the key is the domain name the DNS name, and the value is a list of verification
            token strings that must be uploaded as a TXT record for that DNS name.

    Examples:
        >>> client.request_verification_tokens()
        {
            "_acme-challenge.test1.example.com": ["moY32lkdsZ3VWHM1mdM..."],
            "_acme-challenge.test2.example.com": ["asldfkjslweietj23_b...", "nMIIEvAIBA2-212_w..."]
        }
    """
    # Variables
    verification_tokens = {}
    self.responses = {}
    self.order = self.acme_client.new_order(self.csr)

    # Loop through each domain being challenged
    for domain, challenge_items in self.challenges.items():
        # Ensure the ACME label is prefixed to this domain and the wildcard is removed
        domain = f"{DNS_LABEL}.{self.strip_wildcard(domain)}"

        # Loop through each challenge for this domain and extract the response and verification token from each
        for challenge in challenge_items:
            # Create a dict list item for this domain to store it's associated verification tokens in
            verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []

            # Obtain the response and validation items from this challenge
            response, validation = challenge.response_and_validation(self.acme_client.net.key)
            verification_tokens[domain].append(validation)

            # Save the response, so it can be looked up later using the challenge token
            self.responses[challenge.chall.token] = response

    # Set our new verification tokens and return the value
    self._verification_tokens = verification_tokens
    return self.verification_tokens
def revoke_certificate(self, reason: int = 0) ‑> None

Attempts to revoke the existing certificate from the issuing ACME server.

Args

reason : int
The numeric reason for revocation identifier. In most cases, this can be left as 0. For more information, refer to: https://letsencrypt.org/docs/revoking/#specifying-a-reason-code

Examples

>>> client.revoke_certificate()
Expand source code
def revoke_certificate(self, reason: int = 0) -> None:
    """
    Attempts to revoke the existing certificate from the issuing ACME server.

    Args:
        reason (int): The numeric reason for revocation identifier. In most cases, this can be left as `0`.
            For more information, refer to: https://letsencrypt.org/docs/revoking/#specifying-a-reason-code

    Examples:
        >>> client.revoke_certificate()
    """
    # Load the certificate crypto object and request revocation from the ACME server
    cert_obj = jose.ComparableX509(OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, self.certificate))
    self.acme_client.revoke(cert_obj, reason)