File: //usr/lib/python2.7/site-packages/azurelinuxagent/common/protocol/goal_state.py
# Microsoft Azure Linux Agent
#
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import os
import re
import time
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
from azurelinuxagent.common.AgentGlobals import AgentGlobals
from azurelinuxagent.common.datacontract import set_properties
from azurelinuxagent.common.exception import IncompleteGoalStateError
from azurelinuxagent.common.exception import ProtocolError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.protocol.extensions_goal_state_factory import ExtensionsGoalStateFactory
from azurelinuxagent.common.protocol.restapi import Cert, CertList, RemoteAccessUser, RemoteAccessUsersList
from azurelinuxagent.common.utils import fileutil
from azurelinuxagent.common.utils.cryptutil import CryptUtil
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, findtext, getattrib
GOAL_STATE_URI = "http://{0}/machine/?comp=goalstate"
CERTS_FILE_NAME = "Certificates.xml"
P7M_FILE_NAME = "Certificates.p7m"
PEM_FILE_NAME = "Certificates.pem"
TRANSPORT_CERT_FILE_NAME = "TransportCert.pem"
TRANSPORT_PRV_FILE_NAME = "TransportPrivate.pem"
_NUM_GS_FETCH_RETRIES = 6
class GoalState(object):
def __init__(self, wire_client):
"""
Fetches the goal state using the given wire client.
__init__ fetches only the goal state itself, not including inner properties such as ExtensionsConfig; to fetch the entire goal state
use the fetch_full_goal_state().
"""
uri = GOAL_STATE_URI.format(wire_client.get_endpoint())
for _ in range(0, _NUM_GS_FETCH_RETRIES):
self.xml_text = wire_client.fetch_config(uri, wire_client.get_header())
xml_doc = parse_doc(self.xml_text)
self.incarnation = findtext(xml_doc, "Incarnation")
role_instance = find(xml_doc, "RoleInstance")
if role_instance:
break
time.sleep(0.5)
else:
raise IncompleteGoalStateError("Fetched goal state without a RoleInstance [incarnation {inc}]".format(inc=self.incarnation))
try:
self.role_instance_id = findtext(role_instance, "InstanceId")
role_config = find(role_instance, "Configuration")
self.role_config_name = findtext(role_config, "ConfigName")
container = find(xml_doc, "Container")
self.container_id = findtext(container, "ContainerId")
AgentGlobals.update_container_id(self.container_id)
# these properties are populated by fetch_full_goal_state()
self._hosting_env_uri = findtext(xml_doc, "HostingEnvironmentConfig")
self.hosting_env = None
self._shared_conf_uri = findtext(xml_doc, "SharedConfig")
self.shared_conf = None
self._certs_uri = findtext(xml_doc, "Certificates")
self.certs = None
self._remote_access_uri = findtext(container, "RemoteAccessInfo")
self.remote_access = None
# TODO: extensions_config is an instance member only temporarily. Once we stop comparing extensionsConfig with
# vmSettings, it will be replaced with the extensions goal state
self.extensions_config = None
self._extensions_config_uri = findtext(xml_doc, "ExtensionsConfig")
except Exception as exception:
# We don't log the error here since fetching the goal state is done every few seconds
raise ProtocolError(msg="Error fetching goal state", inner=exception)
def fetch_full_goal_state(self, wire_client):
try:
logger.info('Fetching goal state [incarnation {0}]', self.incarnation)
xml_text = wire_client.fetch_config(self._hosting_env_uri, wire_client.get_header())
self.hosting_env = HostingEnv(xml_text)
xml_text = wire_client.fetch_config(self._shared_conf_uri, wire_client.get_header())
self.shared_conf = SharedConfig(xml_text)
if self._certs_uri is not None:
xml_text = wire_client.fetch_config(self._certs_uri, wire_client.get_header_for_cert())
self.certs = Certificates(xml_text)
if self._remote_access_uri is not None:
xml_text = wire_client.fetch_config(self._remote_access_uri, wire_client.get_header_for_cert())
self.remote_access = RemoteAccess(xml_text)
if self._extensions_config_uri is None:
self.extensions_config = ExtensionsGoalStateFactory.create_empty()
else:
xml_text = wire_client.fetch_config(self._extensions_config_uri, wire_client.get_header())
self.extensions_config = ExtensionsGoalStateFactory.create_from_extensions_config(self.incarnation, xml_text, wire_client)
except Exception as exception:
logger.warn("Fetching the goal state failed: {0}", ustr(exception))
raise ProtocolError(msg="Error fetching goal state", inner=exception)
finally:
logger.info('Fetch goal state completed')
class HostingEnv(object):
def __init__(self, xml_text):
self.xml_text = xml_text
xml_doc = parse_doc(xml_text)
incarnation = find(xml_doc, "Incarnation")
self.vm_name = getattrib(incarnation, "instance")
role = find(xml_doc, "Role")
self.role_name = getattrib(role, "name")
deployment = find(xml_doc, "Deployment")
self.deployment_name = getattrib(deployment, "name")
class SharedConfig(object):
def __init__(self, xml_text):
self.xml_text = xml_text
class Certificates(object):
def __init__(self, xml_text):
self.cert_list = CertList()
# Save the certificates
local_file = os.path.join(conf.get_lib_dir(), CERTS_FILE_NAME)
fileutil.write_file(local_file, xml_text)
# Separate the certificates into individual files.
xml_doc = parse_doc(xml_text)
data = findtext(xml_doc, "Data")
if data is None:
return
# if the certificates format is not Pkcs7BlobWithPfxContents do not parse it
certificateFormat = findtext(xml_doc, "Format")
if certificateFormat and certificateFormat != "Pkcs7BlobWithPfxContents":
logger.warn("The Format is not Pkcs7BlobWithPfxContents. Format is " + certificateFormat)
return
cryptutil = CryptUtil(conf.get_openssl_cmd())
p7m_file = os.path.join(conf.get_lib_dir(), P7M_FILE_NAME)
p7m = ("MIME-Version:1.0\n" # pylint: disable=W1308
"Content-Disposition: attachment; filename=\"{0}\"\n"
"Content-Type: application/x-pkcs7-mime; name=\"{1}\"\n"
"Content-Transfer-Encoding: base64\n"
"\n"
"{2}").format(p7m_file, p7m_file, data)
fileutil.write_file(p7m_file, p7m)
trans_prv_file = os.path.join(conf.get_lib_dir(), TRANSPORT_PRV_FILE_NAME)
trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME)
pem_file = os.path.join(conf.get_lib_dir(), PEM_FILE_NAME)
# decrypt certificates
cryptutil.decrypt_p7m(p7m_file, trans_prv_file, trans_cert_file, pem_file)
# The parsing process use public key to match prv and crt.
buf = []
begin_crt = False # pylint: disable=W0612
begin_prv = False # pylint: disable=W0612
prvs = {}
thumbprints = {}
index = 0
v1_cert_list = []
with open(pem_file) as pem:
for line in pem.readlines():
buf.append(line)
if re.match(r'[-]+BEGIN.*KEY[-]+', line):
begin_prv = True
elif re.match(r'[-]+BEGIN.*CERTIFICATE[-]+', line):
begin_crt = True
elif re.match(r'[-]+END.*KEY[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'prv', buf)
pub = cryptutil.get_pubkey_from_prv(tmp_file)
prvs[pub] = tmp_file
buf = []
index += 1
begin_prv = False
elif re.match(r'[-]+END.*CERTIFICATE[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'crt', buf)
pub = cryptutil.get_pubkey_from_crt(tmp_file)
thumbprint = cryptutil.get_thumbprint_from_crt(tmp_file)
thumbprints[pub] = thumbprint
# Rename crt with thumbprint as the file name
crt = "{0}.crt".format(thumbprint)
v1_cert_list.append({
"name": None,
"thumbprint": thumbprint
})
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), crt))
buf = []
index += 1
begin_crt = False
# Rename prv key with thumbprint as the file name
for pubkey in prvs:
thumbprint = thumbprints[pubkey]
if thumbprint:
tmp_file = prvs[pubkey]
prv = "{0}.prv".format(thumbprint)
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), prv))
logger.info("Found private key matching thumbprint {0}".format(thumbprint))
else:
# Since private key has *no* matching certificate,
# it will not be named correctly
logger.warn("Found NO matching cert/thumbprint for private key!")
# Log if any certificates were found without matching private keys
# This can happen (rarely), and is useful to know for debugging
for pubkey in thumbprints:
if not pubkey in prvs:
msg = "Certificate with thumbprint {0} has no matching private key."
logger.info(msg.format(thumbprints[pubkey]))
for v1_cert in v1_cert_list:
cert = Cert()
set_properties("certs", cert, v1_cert)
self.cert_list.certificates.append(cert)
@staticmethod
def _write_to_tmp_file(index, suffix, buf):
file_name = os.path.join(conf.get_lib_dir(), "{0}.{1}".format(index, suffix))
fileutil.write_file(file_name, "".join(buf))
return file_name
class RemoteAccess(object):
"""
Object containing information about user accounts
"""
#
# <RemoteAccess>
# <Version/>
# <Incarnation/>
# <Users>
# <User>
# <Name/>
# <Password/>
# <Expiration/>
# </User>
# </Users>
# </RemoteAccess>
#
def __init__(self, xml_text):
self.xml_text = xml_text
self.version = None
self.incarnation = None
self.user_list = RemoteAccessUsersList()
if self.xml_text is None or len(self.xml_text) == 0:
return
xml_doc = parse_doc(self.xml_text)
self.version = findtext(xml_doc, "Version")
self.incarnation = findtext(xml_doc, "Incarnation")
user_collection = find(xml_doc, "Users")
users = findall(user_collection, "User")
for user in users:
remote_access_user = RemoteAccess._parse_user(user)
self.user_list.users.append(remote_access_user)
@staticmethod
def _parse_user(user):
name = findtext(user, "Name")
encrypted_password = findtext(user, "Password")
expiration = findtext(user, "Expiration")
remote_access_user = RemoteAccessUser(name, encrypted_password, expiration)
return remote_access_user