Source code for hammers.mycnf
# coding: utf-8
import os
import codecs
import glob
import logging
import stat
import configparser
__all__ = ['MyCnf', 'MYCNF_PATHS']
KEYERROR_LIKE_OPTIONERRORS = (
configparser.NoSectionError,
configparser.NoOptionError,
)
# https://dev.mysql.com/doc/refman/5.7/en/option-files.html
# https://mariadb.com/kb/en/mariadb/configuring-mariadb-with-mycnf/
MYCNF_PATHS = [
'/etc/my.cnf',
'/etc/mysql/my.cnf',
# 'SYSCONFDIR/my.cnf',
# 'defaults-extra-file' # The file specified with --defaults-extra-file, if any
'~/.my.cnf',
'~/.mylogin.cnf',
]
[docs]class MyCnf(object):
"""
MySQL configuration fetcher. Attempts to emulate the behavior of
the MySQL client to the best of its ability, falling through multiple
locations where configuration could exist to determine its value.
.. seealso::
* `MySQL 5.7 Reference Manual, 4.2.6 Using Option Files <https://dev.mysql.com/doc/refman/5.7/en/option-files.html>`_
* `Configuring MariaDB with my.cnf <https://mariadb.com/kb/en/mariadb/configuring-mariadb-with-mycnf/>`_
"""
L = logging.getLogger('.'.join([__name__, 'MyCnf']))
def __init__(self, paths=None):
if paths is None:
paths = MYCNF_PATHS
self.path_stack = list(reversed(paths))
self.cp = configparser.ConfigParser(allow_no_value=True)
self.load()
def valid_path(self, path):
self.L.debug('checking path {} for validity'.format(path))
try:
cnf_stat = os.stat(path)
except (IOError, OSError):
self.L.debug('failed to stat path {} (can\'t read/doesn\'t exist?)'.format(path))
return False
if cnf_stat.st_mode & stat.S_IWOTH:
self.L.debug('path {} is world-writable, ignoring'.format(path))
return False
return True
def read(self, path):
with codecs.open(path) as f:
for line in f:
if line.startswith('!'):
self.L.debug('found magic directive, line: "{}"'.format(line))
self.magic(path, line)
else:
yield line
def magic(self, sourcefile, line):
directive, args = line.split(None, 1)
directive = directive.lstrip('!')
return {
'include': self.include,
'includedir': self.includedir,
}[directive](sourcefile, args)
def include(self, source_file, include):
new_path = os.path.join(os.path.dirname(source_file), include)
self.L.debug('adding path from source file "{}": {}'.format(
source_file, new_path))
self.path_stack.append(new_path)
def includedir(self, source_file, includedir):
new_paths = list(glob.iglob(os.path.join(os.path.dirname(source_file), includedir, '*.cnf')))
self.L.debug('adding paths from source file "{}" found in dir "{}": {}'.format(
source_file, includedir, new_paths))
self.path_stack.extend(new_paths)
def load(self):
while self.path_stack:
path = self.path_stack.pop()
self.L.debug('processing possible path "{}"'.format(path))
path = os.path.expanduser(path)
if not self.valid_path(path):
continue
self.L.debug('loading/merging file "{}"'.format(path))
self.read_file(path)
def read_file(self, path):
self.cp.read_file(self.read(path))
def __iter__(self):
return iter(self.cp)
def __getitem__(self, key):
try:
d = dict(self.cp[key])
except KEYERROR_LIKE_OPTIONERRORS as e:
raise KeyError(str(e))
return d
if __name__ == '__main__':
import json
logging.basicConfig(level=logging.DEBUG)
mycnf = MyCnf(MYCNF_PATHS)
print(json.dumps({sec: mycnf[sec] for sec in mycnf}, indent=4))