首页
Python配置文件加密

加密文件读取类

加密及配置文件读取类如下:

# -*- coding: utf-8 -*-
import StringIO
import binascii
import codecs
import locale
import re
import token
import tokenize
from base64 import b64decode
from base64 import b64encode
from hashlib import md5
from traceback import print_exc

from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
try:
    from Crypto.Util.Padding import pad, unpad
except ImportError:
    from Crypto.Util.py3compat import bchr, bord

    def pad(data_to_pad, block_size):
        padding_len = block_size - len(data_to_pad) % block_size
        padding = bchr(padding_len) * padding_len
        return data_to_pad + padding

    def unpad(padded_data, block_size):
        pdata_len = len(padded_data)
        if pdata_len % block_size:
            raise ValueError("Input data is not padded")
        padding_len = bord(padded_data[-1])
        if padding_len < 1 or padding_len > min(block_size, pdata_len):
            raise ValueError("Padding is incorrect.")
        if padded_data[-padding_len:] != bchr(padding_len) * padding_len:
            raise ValueError("PKCS#7 padding is incorrect.")
        return padded_data[:-padding_len]

__all__ = ['RsaCrypto', 'AESCrypto', 'Settings']


class RsaCrypto(object):
    """RSA 加解密
    Usage:
        RsaCrypto().create_rsa_key()
    """

    def create_rsa_key(self):
        """生成RSA秘钥对"""
        try:
            key = RSA.generate(2048)
            encrypted_key = key.exportKey(pkcs=8)

            public_key = key.publickey().exportKey().decode('utf-8')
            private_key = encrypted_key.decode('utf-8')

            return {'state': 1, 'message': {'public_key': public_key, 'private_key': private_key}}
        except Exception as err:
            return {'state': 0, 'message': str(err)}

    def encrypt(self, public_key, plaintext):
        """加密方法"""
        try:
            recipient_key = RSA.import_key(public_key)
            cipher_rsa = PKCS1_v1_5.new(recipient_key)

            en_data = cipher_rsa.encrypt(plaintext.encode('utf-8'))
            hex_data = binascii.hexlify(en_data).decode('utf-8')

            return {'state': 1, 'message': hex_data}
        except Exception as err:
            return {'state': 0, 'message': str(err)}

    def decrypt(self, private_key, hex_data):
        """解密方法"""
        try:
            private_key = RSA.import_key(private_key)
            cipher_rsa = PKCS1_v1_5.new(private_key)

            en_data = binascii.unhexlify(hex_data.encode('utf-8'))
            data = cipher_rsa.decrypt(en_data, None).decode('utf-8')

            return {'state': 1, 'message': data}
        except Exception as err:
            return {'state': 0, 'message': str(err)}


class AESCrypto(object):
    """
    Usage:
        c = AESCrypto('password').encrypt('message')
        m = AESCrypto('password').decrypt(c)
    """

    def __init__(self, key):
        self.key = md5(key.encode('utf8')).hexdigest()

    def encrypt(self, raw):
        raw = pad(raw, AES.block_size)
        iv = Random.new().read(AES.block_size)
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return b64encode(iv + cipher.encrypt(raw))

    def decrypt(self, enc):
        enc = b64decode(enc)
        iv = enc[:16]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return unpad(cipher.decrypt(enc[16:]), AES.block_size).decode('utf8')


r_encoding = re.compile(r'\s*coding\s*[=:]\s*([-\w.]+)')


class Settings(object):

    def __init__(self, filename, decrypt=None, commentchar='#'):
        """
        对配置文件配置项进行解密处理
        """
        self._commentchar = commentchar
        self.__decrypt = decrypt or str
        self.__data = {}
        self.__read(filename)

    def _get_data(self):
        return self.__data

    data = property(_get_data)

    def __read(self, filename=''):
        encoding = None

        f = open(filename, 'rb')
        text = f.read()
        f.close()

        text = text + '\n'
        begin = 0
        if text.startswith(codecs.BOM_UTF8):
            begin = 3
            encoding = 'UTF-8'
        elif text.startswith(codecs.BOM_UTF16):
            begin = 2
            encoding = 'UTF-16'

        if not encoding:
            try:
                unicode(text, 'UTF-8')
                encoding = 'UTF-8'
            except BaseException:
                try:
                    encoding = locale.getdefaultlocale()[1]
                except BaseException:
                    encoding = None

                if not encoding:
                    encoding = 'UTF-8'
                try:
                    codecs.lookup(encoding)
                except BaseException:
                    encoding = 'UTF-8'

        self._encoding = encoding

        f = StringIO.StringIO(text)
        f.seek(begin)
        lineno = 0
        comments = []
        while True:
            lastpos = f.tell()
            line = f.readline()
            lineno += 1
            if not line:
                break
            line = line.strip()
            if line:
                if line.startswith(self._commentchar):
                    if lineno == 1:  # first comment line
                        b = r_encoding.search(line[1:])
                        if b:
                            self._encoding = b.groups()[0]
                            continue

                elif '=' in line:

                    pos = line.find('=')
                    begin, end = pos, pos + 1
                    keyname = line[:begin].strip()
                    rest = line[end:].strip()
                    # if key= then value will be set ''
                    if rest == '':
                        value = 'None'
                    else:
                        f.seek(lastpos + end)
                        try:
                            value, iden_existed, _lineno = self.__read_line(f)
                            _lineno -= 1
                        except Exception:
                            print_exc()
                            raise Exception("Parsing ini file error in %s:%d:%s" % (filename, lineno, line))

                    try:
                        txt = '#coding=%s\n%s' % (self._encoding, value)
                        # 解密函数被定义为'_'
                        v = eval(txt, {'_': self.__decrypt}, dict(self.data))
                        self.__data[keyname] = v
                    except Exception:
                        print_exc()
                        raise Exception("Converting value (%s) error in %s:%d:%s" % (value, filename, lineno, line))
                    lineno += _lineno

    def __read_line(self, f):
        g = tokenize.generate_tokens(f.readline)
        buf = []
        time = 0
        iden_existed = False
        while True:
            v = g.next()
            tokentype, t, (lineno, start), end, line = v
            if tokentype == 54:
                continue
            if tokentype in (token.INDENT, token.DEDENT, tokenize.COMMENT):
                continue
            if tokentype == token.NAME:
                iden_existed = True
            if tokentype == token.NEWLINE:
                return ''.join(buf), iden_existed, lineno
            else:
                if t == '=' and time == 0:
                    time += 1
                    continue
                buf.append(t)

使用例子

以django为例,读取local_settings配置。

修改django的settings.py文件,读取加密配置。

# from config.local_settings import *  # noqa
# 包含加密信息的配置文件
local_settings_file = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                                   'config',
                                                   'local_settings.py'))
# 解密使用的函数
decrypt_func = AESCrypto(key='secret key').decrypt
local_setting = Settings(local_settings_file, decrypt=decrypt_func)
# 读取更新配置
locals().update(local_setting.data)

配置文件例子:

# 加密后的配置项内容,需要使用'_'来定义才能被正确解密。加密内容可使用如下方式获得
# encrypt_str = AESCrypto('secret key').encrypt('hello world')
TEST_CONFIG = _('v7onhJeOJjImxnXkjspcMetB/wYhOEnWTFgNAuSM4pc=')