File: //lib/python3/dist-packages/awscli/customizations/eks/update_kubeconfig.py
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import logging
from botocore.compat import OrderedDict
from awscli.customizations.commands import BasicCommand
from awscli.customizations.utils import uni_print
from awscli.customizations.eks.exceptions import EKSClusterError
from awscli.customizations.eks.kubeconfig import (Kubeconfig,
KubeconfigError,
KubeconfigLoader,
KubeconfigWriter,
KubeconfigValidator,
KubeconfigAppender)
from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump
LOG = logging.getLogger(__name__)
DEFAULT_PATH = os.path.expanduser("~/.kube/config")
# Use the endpoint for kubernetes 1.10
# To get the most recent endpoint we will need to
# Do a check on the cluster's version number
API_VERSION = "client.authentication.k8s.io/v1alpha1"
class UpdateKubeconfigCommand(BasicCommand):
NAME = 'update-kubeconfig'
DESCRIPTION = BasicCommand.FROM_FILE(
'eks',
'update-kubeconfig',
'_description.rst'
)
ARG_TABLE = [
{
'name': 'name',
'help_text': ("The name of the cluster for which "
"to create a kubeconfig entry. "
"This cluster must exist in your account and in the "
"specified or configured default Region "
"for your AWS CLI installation."),
'required': True
},
{
'name': 'kubeconfig',
'help_text': ("Optionally specify a kubeconfig file to append "
"with your configuration. "
"By default, the configuration is written to the "
"first file path in the KUBECONFIG "
"environment variable (if it is set) "
"or the default kubeconfig path (.kube/config) "
"in your home directory."),
'required': False
},
{
'name': 'role-arn',
'help_text': ("To assume a role for cluster authentication, "
"specify an IAM role ARN with this option. "
"For example, if you created a cluster "
"while assuming an IAM role, "
"then you must also assume that role to "
"connect to the cluster the first time."),
'required': False
},
{
'name': 'dry-run',
'action': 'store_true',
'default': False,
'help_text': ("Print the merged kubeconfig to stdout instead of "
"writing it to the specified file."),
'required': False
},
{
'name': 'verbose',
'action': 'store_true',
'default': False,
'help_text': ("Print more detailed output "
"when writing to the kubeconfig file, "
"including the appended entries.")
},
{
'name': 'alias',
'help_text': ("Alias for the cluster context name. "
"Defaults to match cluster ARN."),
'required': False
}
]
def _display_entries(self, entries):
"""
Display entries in yaml format
:param entries: a list of OrderedDicts to be printed
:type entries: list
"""
uni_print("Entries:\n\n")
for entry in entries:
uni_print(ordered_yaml_dump(entry))
uni_print("\n")
def _run_main(self, parsed_args, parsed_globals):
client = EKSClient(self._session,
parsed_args.name,
parsed_args.role_arn,
parsed_globals)
new_cluster_dict = client.get_cluster_entry()
new_user_dict = client.get_user_entry()
config_selector = KubeconfigSelector(
os.environ.get("KUBECONFIG", ""),
parsed_args.kubeconfig
)
config = config_selector.choose_kubeconfig(
new_cluster_dict["name"]
)
updating_existing = config.has_cluster(new_cluster_dict["name"])
appender = KubeconfigAppender()
new_context_dict = appender.insert_cluster_user_pair(config,
new_cluster_dict,
new_user_dict,
parsed_args.alias)
if parsed_args.dry_run:
uni_print(config.dump_content())
else:
writer = KubeconfigWriter()
writer.write_kubeconfig(config)
if updating_existing:
uni_print("Updated context {0} in {1}\n".format(
new_context_dict["name"], config.path
))
else:
uni_print("Added new context {0} to {1}\n".format(
new_context_dict["name"], config.path
))
if parsed_args.verbose:
self._display_entries([
new_context_dict,
new_user_dict,
new_cluster_dict
])
class KubeconfigSelector(object):
def __init__(self, env_variable, path_in, validator=None,
loader=None):
"""
Parse KUBECONFIG into a list of absolute paths.
Also replace the empty list with DEFAULT_PATH
:param env_variable: KUBECONFIG as a long string
:type env_variable: string
:param path_in: The path passed in through the CLI
:type path_in: string or None
"""
if validator is None:
validator = KubeconfigValidator()
self._validator = validator
if loader is None:
loader = KubeconfigLoader(validator)
self._loader = loader
if path_in is not None:
# Override environment variable
self._paths = [self._expand_path(path_in)]
else:
# Get the list of paths from the environment variable
if env_variable == "":
env_variable = DEFAULT_PATH
self._paths = [self._expand_path(element)
for element in env_variable.split(os.pathsep)
if len(element.strip()) > 0]
if len(self._paths) == 0:
self._paths = [DEFAULT_PATH]
def choose_kubeconfig(self, cluster_name):
"""
Choose which kubeconfig file to read from.
If name is already an entry in one of the $KUBECONFIG files,
choose that one.
Otherwise choose the first file.
:param cluster_name: The name of the cluster which is going to be added
:type cluster_name: String
:return: a chosen Kubeconfig based on above rules
:rtype: Kubeconfig
"""
# Search for an existing entry to update
for candidate_path in self._paths:
try:
loaded_config = self._loader.load_kubeconfig(candidate_path)
if loaded_config.has_cluster(cluster_name):
LOG.debug("Found entry to update at {0}".format(
candidate_path
))
return loaded_config
except KubeconfigError as e:
LOG.warning("Passing {0}:{1}".format(candidate_path, e))
# No entry was found, use the first file in KUBECONFIG
#
# Note: This could raise KubeconfigErrors if paths[0] is corrupted
return self._loader.load_kubeconfig(self._paths[0])
def _expand_path(self, path):
""" A helper to expand a path to a full absolute path. """
return os.path.abspath(os.path.expanduser(path))
class EKSClient(object):
def __init__(self, session, cluster_name, role_arn, parsed_globals=None):
self._session = session
self._cluster_name = cluster_name
self._role_arn = role_arn
self._cluster_description = None
self._globals = parsed_globals
def _get_cluster_description(self):
"""
Use an eks describe-cluster call to get the cluster description
Cache the response in self._cluster_description.
describe-cluster will only be called once.
"""
if self._cluster_description is None:
if self._globals is None:
client = self._session.create_client("eks")
else:
client = self._session.create_client(
"eks",
region_name=self._globals.region,
endpoint_url=self._globals.endpoint_url,
verify=self._globals.verify_ssl
)
full_description = client.describe_cluster(name=self._cluster_name)
self._cluster_description = full_description["cluster"]
if "status" not in self._cluster_description:
raise EKSClusterError("Cluster not found")
if self._cluster_description["status"] not in ["ACTIVE", "UPDATING"]:
raise EKSClusterError("Cluster status is {0}".format(
self._cluster_description["status"]
))
return self._cluster_description
def get_cluster_entry(self):
"""
Return a cluster entry generated using
the previously obtained description.
"""
cert_data = self._get_cluster_description().get("certificateAuthority",
{"data": ""})["data"]
endpoint = self._get_cluster_description().get("endpoint")
arn = self._get_cluster_description().get("arn")
return OrderedDict([
("cluster", OrderedDict([
("certificate-authority-data", cert_data),
("server", endpoint)
])),
("name", arn)
])
def get_user_entry(self):
"""
Return a user entry generated using
the previously obtained description.
"""
region = self._get_cluster_description().get("arn").split(":")[3]
generated_user = OrderedDict([
("name", self._get_cluster_description().get("arn", "")),
("user", OrderedDict([
("exec", OrderedDict([
("apiVersion", API_VERSION),
("args",
[
"--region",
region,
"eks",
"get-token",
"--cluster-name",
self._cluster_name,
]),
("command", "aws")
]))
]))
])
if self._role_arn is not None:
generated_user["user"]["exec"]["args"].extend([
"--role",
self._role_arn
])
if self._session.profile:
generated_user["user"]["exec"]["env"] = [OrderedDict([
("name", "AWS_PROFILE"),
("value", self._session.profile)
])]
return generated_user