Source code for hammers.mycnf
# coding: utf-8
from __future__ import absolute_import, print_function, unicode_literals
import os
import codecs
import glob
import logging
import stat
CP_MODERN = True
try:
import configparser
except ImportError:
try:
from backports import configparser
except ImportError:
import io
import ConfigParser as configparser
CP_MODERN = False
__all__ = ['MyCnf', 'MYCNF_PATHS']
KEYERROR_LIKE_OPTIONERRORS = (
configparser.NoSectionError,
configparser.NoOptionError,
)
# https://dev.mysql.com/doc/refman/5.7/en/option-files.html
# https://mariadb.com/kb/en/mariadb/configuring-mariadb-with-mycnf/
MYCNF_PATHS = [
'/etc/my.cnf',
'/etc/mysql/my.cnf',
# 'SYSCONFDIR/my.cnf',
# 'defaults-extra-file' # The file specified with --defaults-extra-file, if any
'~/.my.cnf',
'~/.mylogin.cnf',
]
[docs]class MyCnf(object):
"""
MySQL configuration fetcher. Attempts to emulate the behavior of
the MySQL client to the best of its ability, falling through multiple
locations where configuration could exist to determine its value.
.. seealso::
* `MySQL 5.7 Reference Manual, 4.2.6 Using Option Files <https://dev.mysql.com/doc/refman/5.7/en/option-files.html>`_
* `Configuring MariaDB with my.cnf <https://mariadb.com/kb/en/mariadb/configuring-mariadb-with-mycnf/>`_
"""
L = logging.getLogger('.'.join([__name__, 'MyCnf']))
def __init__(self, paths=None):
if paths is None:
paths = MYCNF_PATHS
self.path_stack = list(reversed(paths))
self.cp = configparser.ConfigParser(allow_no_value=True)
self.load()
def valid_path(self, path):
self.L.debug('checking path {} for validity'.format(path))
try:
cnf_stat = os.stat(path)
except (IOError, OSError):
self.L.debug('failed to stat path {} (can\'t read/doesn\'t exist?)'.format(path))
return False
if cnf_stat.st_mode & stat.S_IWOTH:
self.L.debug('path {} is world-writable, ignoring'.format(path))
return False
return True
def read(self, path):
with codecs.open(path) as f:
for line in f:
if line.startswith('!'):
self.L.debug('found magic directive, line: "{}"'.format(line))
self.magic(path, line)
else:
yield line
def magic(self, sourcefile, line):
directive, args = line.split(None, 1)
directive = directive.lstrip('!')
return {
'include': self.include,
'includedir': self.includedir,
}[directive](sourcefile, args)
def include(self, source_file, include):
new_path = os.path.join(os.path.dirname(source_file), include)
self.L.debug('adding path from source file "{}": {}'.format(
source_file, new_path))
self.path_stack.append(new_path)
def includedir(self, source_file, includedir):
new_paths = list(glob.iglob(os.path.join(os.path.dirname(source_file), includedir, '*.cnf')))
self.L.debug('adding paths from source file "{}" found in dir "{}": {}'.format(
source_file, includedir, new_paths))
self.path_stack.extend(new_paths)
def load(self):
while self.path_stack:
path = self.path_stack.pop()
self.L.debug('processing possible path "{}"'.format(path))
path = os.path.expanduser(path)
if not self.valid_path(path):
continue
self.L.debug('loading/merging file "{}"'.format(path))
self.read_file(path)
def read_file(self, path):
if CP_MODERN:
self.cp.read_file(self.read(path))
else:
buf = io.StringIO('\n'.join(self.read(path)))
self.cp.readfp(buf)
def __iter__(self):
if CP_MODERN:
return iter(self.cp)
else:
return iter(['DEFAULT'] + self.cp.sections())
def __getitem__(self, key):
try:
if CP_MODERN:
d = dict(self.cp[key])
else:
d = dict(self.cp.items(key))
except KEYERROR_LIKE_OPTIONERRORS as e:
raise KeyError(str(e))
return d
if __name__ == '__main__':
import json
logging.basicConfig(level=logging.DEBUG)
mycnf = MyCnf(MYCNF_PATHS)
print(json.dumps({sec: mycnf[sec] for sec in mycnf}, indent=4))