Commit 1aea32a6 by 姚主播

Initial commit

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
.static_storage/
.media/
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# idea
.idea/
\ No newline at end of file
# back_base
python back base
### 使用时,将back_base项目文件关联到新项目的base文件夹(作为子模块)
# coding: utf-8
import os
import socket
import sys
MACHINE_NAME = socket.gethostname()
def current_path():
path = os.path.realpath(sys.path[0])
if os.path.isfile(path):
path = os.path.dirname(path)
return os.path.abspath(path)
else:
import inspect
caller_file = inspect.stack()[1][1]
return os.path.abspath(os.path.dirname(caller_file))
ROOT_PATH = os.path.dirname(os.path.dirname(current_path()))
# 只加载某几个模块,为空则全部加载
LOAD_MODULE = []
# 不加载哪几个模块
REJECT_MODULE = []
BUSINESS_REDIS = {
"host": "localhost",
"port": 6379,
"db": 1,
}
# =================
# tornado资源配置
settings = {
"cookie_secret": "e446976943b4e8442f099fed1f3fea28462d5832f483a0ed9a3d5d3859f==08d",
"session_secret": "3cdcb1f00803b6e78ab50b466a40b9977db396840c28307f428b25e2277f0bcc",
"session_timeout": 60 * 30 * 2 * 24 * 30,
"store_options": {
'redis_host': BUSINESS_REDIS["host"],
'redis_port': BUSINESS_REDIS["port"],
'redis_db': '0',
},
"static_path": os.path.join(ROOT_PATH, "static"),
"template_path": os.path.join(ROOT_PATH, "templates"),
"gzip": True,
"debug": False,
}
# 日志路径设置
LOGFILE = ''
# 消息服务器
MSG_HOST = 'http://msg.kfw.net'
# =================
# 数据库连接装载
BASE_DB = "test"
SPATIAL_DB = "kfw_spatial"
# 单库
database = {
BASE_DB: {
'engine': 'mysql',
'db': 'test',
'host': '123.206.26.57',
'port': 3306,
'user': 'kfw',
'passwd': 'kfw123',
'charset': 'utf8mb4',
'conn': 5
}
}
# 主从分离
'''
DATABASE = {
'master':{
'test': {
'engine': 'mysql',
'db': 'db_name',
'host': 'localhost',
'port': 3306,
'user': 'bozyao',
'passwd': 'bozyao',
'charset': 'utf8mb4',
'conn': 20
},
'slave':[
'test': {
'engine': 'mysql',
'db': 'db_name',
'host': 'localhost',
'port': 3306,
'user': 'bozyao',
'passwd': 'bozyao',
'charset': 'utf8mb4',
'conn': 20
},
]
}
'''
# coding: utf-8
import json
import logging
import random
import string
import time
from datetime import datetime, date
from functools import partial, wraps
import tornado.web
from concurrent.futures import ThreadPoolExecutor
from tornado.escape import json_decode, to_unicode
from .error_code import ERROR_CODE, ERROR_MSG, ERROR_MSG_CODE
from .tools.session import Session
# TOFO 启动参数、配置文件
USE_GEVENT = 0
EXECUTOR = None
if USE_GEVENT == 1:
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
from concurrent.futures import Future
EXECUTOR = Pool(size=200)
elif USE_GEVENT == 0:
from concurrent.futures import ThreadPoolExecutor
EXECUTOR = ThreadPoolExecutor(max_workers=100)
URL_PREFIX = ""
def format_date(obj):
"""json dumps时使用的输出格式(暂时先这两个,以后可以加入,如自定义的对象)
@param obj: 需要转换的对象
@return: 转换后的样子
"""
import decimal
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
elif isinstance(obj, decimal.Decimal):
return float(obj)
class Application(tornado.web.Application):
def load_handler_module(self, handler_module, perfix=".*"):
"""从模块加载RequestHandler
@param handler_module: 模块
@param perfix: url 前缀
"""
is_handler = lambda cls: isinstance(cls, type) \
and issubclass(cls, RequestHandler)
has_pattern = lambda cls: hasattr(cls, 'url_pattern') \
and cls.url_pattern
handlers = []
for i in dir(handler_module):
cls = getattr(handler_module, i)
if is_handler(cls) and has_pattern(cls):
handlers.append((cls.url_pattern, cls))
handlers.append((cls.url_pattern + "/.*", cls))
logging.info("Load url_pattern: %s:%s" % (cls.url_pattern, cls))
self.add_handlers(perfix, handlers)
def _get_host_handlers(self, request):
"""覆写父类方法, 一次获取所有可匹配的结果. 父类中该方法一次匹配成功就返回, 忽略后续
匹配结果. 现通过使用生成器, 如果一次匹配的结果不能使用可以继续匹配.
@param request: tornado request对象
"""
host = request.host.lower().split(':')[0]
handlers = (i for p, h in self.handlers for i in h if p.match(host))
if not handlers and "X-Real-Ip" not in request.headers:
handlers = [i for p, h in self.handlers for i in h if p.match(self.default_host)]
return handlers
class RequestHandler(tornado.web.RequestHandler):
"""覆写tornado的RequestHandler
使其可以配置url_pattern就可以被加载
"""
def __init__(self, *argc, **argkw):
super(RequestHandler, self).__init__(*argc, **argkw)
if self.request.body and self.request.headers.get('Content-Type', '') \
not in ['application/x-www-form-urlencoded', 'multipart/form-data']:
try:
self.request.body_arguments = json_decode(self.request.body)
self.request.headers['Content-Type'] = 'application/json'
except:
pass
self.session = None
self.data = self.request.arguments
self.rid = ''.join(random.sample(string.digits, 6))
self.start_time = time.time()
if self.application.use_session:
self.session = Session(self.application.session_manager, self)
self.set_header('Content-Type', 'application/json')
self.set_header('R-ID', self.rid)
if self.settings.get("debug", False):
self.access_control_allow()
def access_control_allow(self):
# 允许 JS 跨域调用
self.set_header("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS")
self.set_header("Access-Control-Allow-Headers", "Content-Type, Depth, User-Agent, X-File-Size, "
"X-Requested-With, X-Requested-By, If-Modified-Since, "
"X-File-Name, Cache-Control, Token")
self.set_header('Access-Control-Allow-Origin', '*')
def check_session(self, crypt=False, html=False):
# 废弃 无加密
if self.session:
return True
self.set_status(403)
# self.ret_error("VERIFY_FAILED", crypt=crypt)
self.ret_error("SESSIONERR")
return False
def get(self):
self.set_status(404)
self.ret_error("NOT_FOUND", "404")
def post(self):
self.set_status(404)
self.ret_error("NOT_FOUND", "404")
def delete(self):
self.set_status(404)
self.ret_error("NOT_FOUND", "404")
def put(self):
self.set_status(404)
self.ret_error("NOT_FOUND", "404")
def write(self, chunk):
logging.info(self.ret_log(chunk)[:100])
if self._finished:
return
super(RequestHandler, self).write(chunk)
# 捕捉未处理的异常,统一处理返回
def write_error(self, status_code, **kwargs):
self.set_status(status_code)
error_info = ["请求出了点问题,攻城狮正在火速解决~", "SERVERERR"]
try:
tmp_error_info = kwargs.get("exc_info", ("", ("", "")))[1]
error_info[0] = tmp_error_info.args[0]
if self.settings.get("debug", False) and len(tmp_error_info.args) > 1:
error_info[0] += ":" + tmp_error_info.args[1]
except Exception as e:
logging.error("Return error error, info:%s" % e)
logging.error("status:%s error_info:%s" % (status_code, kwargs))
return self.ret_error(error_info[1], error_info[0])
# 输出日志处理
def ret_log(self, data):
end_time = time.time()
cost = int((end_time - self.start_time) * 1000000)
s = [self.rid, 'apilog', self.request.remote_ip, str(self.get_status()), self.request.method, self.request.path,
str(cost), json.dumps(self.data, default=format_date)[:1024], json.dumps(data, default=format_date)[:2048]]
return '|'.join(s)
# 返回json的统一处理,格式统一
def ret_data(self, data={}, max_age=0):
if type(data) != dict:
try:
data = json.loads(data)
except Exception as e:
logging.error("Load json error: %s->%s" % (e, data))
return self.ret_error("CONTENTERR", "服务器开小差了,攻城狮正在火速解决~")
# if not data.get("error_code", 0):
# data["error_code"] = 0
if max_age and not self.session:
self.set_header("Cache-Control", "public max-age=%s" % max_age)
self.write(json.dumps(data, default=format_date))
return
# 统一错误输出
def ret_error(self, error_info="", msg=''):
data = {"error_code": ERROR_CODE.get(error_info, 11111), "error_msg": msg}
self.write(json.dumps(data, default=format_date))
return
# 路由装饰器
def route(url_pattern=""):
"""路由装饰器, 只能装饰 RequestHandler 子类
对于没有设置 url_pattern 的Handler,将默认设置url_pattern
Args:
url_pattern: 路由解释
"""
def handler_wapper(cls):
assert (issubclass(cls, RequestHandler))
if not url_pattern:
cls.url_pattern = URL_PREFIX + "/" + str(cls)[8:-9].lower().replace(".", "/").replace("/__init__", "")
else:
cls.url_pattern = url_pattern
return cls
return handler_wapper
def check_session(permission="login"):
"""检查session信息的装饰器
Args:
permission: 权限, 默认login, admin的和业务相关,自行重写,暂时无用
Returns:
装饰后的函数(方法)
"""
def fc(func):
def _(self, *args, **argitems):
if permission == "admin":
if not self.session or self.session.get("role", 0) != 100:
self.ret_error("SESSIONERR", "需要登录后才可以操作哦")
return
else:
return func(self, *args, **argitems)
if not self.session or not self.session.get("user_id", 0):
cookie_session_key = "sid"
if self.get_cookie(cookie_session_key, ""):
self.clear_all_cookies()
# self.clear_cookie(cookie_session_key)
self.ret_error("SESSIONERR", "需要登录后才可以操作哦")
return
elif permission == "benefit" and self.session.get("role", 0) != 1:
self.ret_error("SESSIONERR", "没有分润权限")
return
else:
self.user_id = self.session.get("user_id", 0)
self.current_user = self.session.get("user_id", 0)
self.open_id = self.session.get("open_id", "")
logging.info("User session checked by user_id: %s" % self.session.get("user_id", 0))
return func(self, *args, **argitems)
return _
return fc
def async_(fun):
@tornado.web.asynchronous
@wraps(fun)
def __(*args, **kwargs):
self = args[0]
def callback_(ret):
if ret.result():
self.ret_data(ret.result())
if not self._finished:
try:
self.finish()
except Exception as e:
logging.log(e)
future = EXECUTOR.submit(fun, *args, **kwargs)
tornado.ioloop.IOLoop.current().add_future(future, callback_)
return __
# coding: utf-8
import datetime
import logging
import os
import random
import threading
import time
import types
debug = True
dbpool = {}
def timeit(func):
def _(*args, **kwargs):
start = time.time()
err = ''
try:
retval = func(*args, **kwargs)
return retval
except Exception as e:
err = str(e)
raise
finally:
end = time.time()
conn = args[0]
dbcf = conn.param
logging.info('name=%s|user=%s|addr=%s:%d|db=%s|time=%d|sql=%s|err=%s',
conn.name, dbcf.get('user', ''),
dbcf.get('host', ''), dbcf.get('port', 0),
os.path.basename(dbcf.get('db', '')),
int((end - start) * 1000000),
repr(args[1])[:100], err)
return _
class DBPoolBase:
def acquire(self, name):
pass
def release(self, name, conn):
pass
class DBResult:
def __init__(self, fields, data):
self.fields = fields
self.data = data
def todict(self):
ret = []
for item in self.data:
ret.append(dict(zip(self.fields, item)))
return ret
def __iter__(self):
for row in self.data:
yield dict(zip(self.fields, row))
def row(self, i, isdict=True):
if isdict:
return dict(zip(self.fields, self.data[i]))
return self.data[i]
def __getitem__(self, i):
return dict(zip(self.fields, self.data[i]))
class DBFunc:
def __init__(self, data):
self.value = data
class DBConnection:
def __init__(self, param, lasttime, status):
self.name = None
self.param = param
self.conn = None
self.status = status
self.lasttime = lasttime
self.pool = None
def is_available(self):
if self.status == 0:
return True
return False
def useit(self):
self.status = 1
self.lasttime = time.time()
def releaseit(self):
self.status = 0
def connect(self):
pass
def close(self):
pass
def alive(self):
pass
def cursor(self):
return self.conn.cursor()
@timeit
def execute(self, sql, param=None):
# logging.debug(sql[:200])
cur = self.conn.cursor()
try:
if param:
cur.execute(sql, param)
else:
cur.execute(sql)
except Exception as e:
raise e
# logging.warning(e)
self.connect()
if param:
cur.execute(sql, param)
else:
cur.execute(sql)
ret = cur.fetchall()
cur.close()
return ret
@timeit
def executemany(self, sql, param):
cur = self.conn.cursor()
try:
ret = cur.executemany(sql, param)
except:
self.connect()
ret = cur.executemany(sql, param)
cur.close()
return ret
@timeit
def query(self, sql, param=None, isdict=True):
'''sql查询,返回查询结果'''
cur = self.conn.cursor()
try:
if not param:
cur.execute(sql)
else:
cur.execute(sql, param)
except:
self.connect()
if not param:
cur.execute(sql)
else:
cur.execute(sql, param)
res = cur.fetchall()
cur.close()
# logging.info('desc:', cur.description)
if res and isdict:
ret = []
xkeys = [i[0] for i in cur.description]
for item in res:
ret.append(dict(zip(xkeys, item)))
else:
ret = res
return ret
@timeit
def get(self, sql, param=None, isdict=True):
'''sql查询,只返回一条'''
cur = self.conn.cursor()
try:
if not param:
cur.execute(sql)
else:
cur.execute(sql, param)
except:
self.connect()
if not param:
cur.execute(sql)
else:
cur.execute(sql, param)
res = cur.fetchone()
cur.close()
if res and isdict:
xkeys = [i[0] for i in cur.description]
return dict(zip(xkeys, res))
else:
return res
def value2sql(self, v, charset='utf-8'):
if isinstance(v, str):
# v = v.encode(charset)
if v.startswith(('now()', 'md5(')):
return v
# v = self.escape(v)
return "'%s'" % v if not v or v[0] != "'" else v
elif isinstance(v, datetime.datetime):
return "'%s'" % str(v)
elif isinstance(v, list):
import json
return "'%s'" % json.dumps(v)
elif isinstance(v, DBFunc):
return v.value
else:
if v is None:
return 'NULL'
return str(v)
def dict2sql(self, d, sp=','):
space_char = '"' if isinstance(self, PGConnection) else '`'
"""字典可以是 {name:value} 形式,也可以是 {name:(operator, value)}"""
x = []
for k, v in d.items():
if isinstance(v, tuple):
x.append('%s' % self.exp2sql(k, v[0], v[1]))
else:
x.append('%s%s%s=%s' % (space_char,
k.strip(' %s'%space_char).replace('.', '%s.%s'%(space_char, space_char)),
space_char, self.value2sql(v)))
return sp.join(x)
def exp2sql(self, key, op, value):
space_char = '"' if isinstance(self, PGConnection) else '`'
item = '(%s%s%s %s ' % (space_char, key.strip(space_char).replace('.', '%s.%s'%(space_char, space_char)),
space_char, op.strip())
if op in ['in', 'not in']:
item += '(%s))' % ','.join([self.value2sql(x) for x in value])
elif op == 'between':
item += ' %s and %s)' % (self.value2sql(value[0]), self.value2sql(value[1]))
else:
item += self.value2sql(value) + ')'
return item
def dict2insert(self, d):
space_char = '"' if isinstance(self, PGConnection) else '`'
keys = d.keys()
vals = []
for k in keys:
vals.append('%s' % self.value2sql(d[k]))
new_keys = [space_char + k.strip(space_char) + space_char for k in keys]
return ','.join(new_keys), ','.join(vals)
def insert(self, table, values):
# sql = "insert into %s set %s" % (table, self.dict2sql(values))
keys, vals = self.dict2insert(values)
sql = "insert into %s(%s) values (%s)" % (table, keys, vals)
ret = self.execute(sql)
if ret:
ret = self.last_insert_id()
return ret
def insert_ignore(self, table, values):
keys, vals = self.dict2insert(values)
sql = "insert ignore into %s(%s) values (%s)" % (table, keys, vals)
ret = self.execute(sql)
if ret:
ret = self.last_insert_id()
return ret
def update(self, table, values, where=None):
sql = "update %s set %s" % (table, self.dict2sql(values))
if where:
sql += " where %s" % self.dict2sql(where, ' and ')
return self.execute(sql)
def delete(self, table, where):
sql = "delete from %s" % table
if where:
sql += " where %s" % self.dict2sql(where, ' and ')
return self.execute(sql)
def select(self, table, where=None, fields='*', other=None, isdict=True):
sql = "select %s from %s" % (fields, table)
if where:
sql += " where %s" % self.dict2sql(where, ' and ')
if other:
sql += ' ' + other
return self.query(sql, None, isdict=isdict)
def select_one(self, table, where=None, fields='*', other=None, isdict=True):
sql = "select %s from %s" % (fields, table)
if where:
sql += " where %s" % self.dict2sql(where, ' and ')
if other:
sql += ' ' + other
return self.get(sql, None, isdict=isdict)
def get_page_data(self, table, where=None, fields='*', other=None, isdict=True, page_num=1, page_count=20):
"""根据条件进行分页查询
get_page_data
return all_count, page_num, data
all_count: 所有数据条目数
page_num: 当前返回数据的页码
data: 页面数据内容
"""
page_num = int(page_num)
page_count = int(page_count)
count_data = self.select_one(table, where, "count(*) as count", other, isdict=isdict)
if count_data and count_data['count']:
all_count = count_data["count"]
else:
all_count = 0
if all_count == 0:
return 0, page_num, []
offset = page_num * page_count - page_count
if offset > all_count:
data = []
else:
other += " limit %d offset %d" % (page_count, offset)
data = self.select(table, where, fields, other, isdict=isdict)
return all_count, page_num, data
def last_insert_id(self):
pass
def start(self): # start transaction
pass
def commit(self):
self.conn.commit()
def rollback(self):
self.conn.rollback()
def escape(self, s):
return s
def with_mysql_reconnect(func):
def _(self, *args, **argitems):
try:
import MySQLdb
mysqllib = MySQLdb
except:
logging.info("MySQLdb load error! Load pymysql...")
import pymysql
mysqllib = pymysql
trycount = 3
while True:
try:
x = func(self, *args, **argitems)
except mysqllib.OperationalError as e:
# logging.err('mysql error:', e)
if e[0] >= 2000: # client error
# logging.err('reconnect ...')
self.conn.close()
self.connect()
trycount -= 1
if trycount > 0:
continue
raise
else:
return x
return _
def with_pg_reconnect(func):
def _(self, *args, **argitems):
import psycopg2
trycount = 3
while True:
try:
x = func(self, *args, **argitems)
except psycopg2.OperationalError as e:
# logging.err('mysql error:', e)
if e[0] >= 2000: # client error
# logging.err('reconnect ...')
self.conn.close()
self.connect()
trycount -= 1
if trycount > 0:
continue
raise
else:
return x
return _
class PGConnection(DBConnection):
name = "pg"
def __init__(self, param, lasttime, status):
DBConnection.__init__(self, param, lasttime, status)
self.connect()
def useit(self):
self.status = 1
self.lasttime = time.time()
def releaseit(self):
self.status = 0
def connect(self):
engine = self.param['engine']
if engine == 'pg':
import psycopg2
self.conn = psycopg2.connect(host=self.param['host'],
port=self.param['port'],
user=self.param['user'],
password=self.param['passwd'],
database=self.param['db']
)
self.conn.autocommit = 1
else:
raise ValueError('engine error:' + engine)
# logging.note('mysql connected', self.conn)
def close(self):
self.conn.close()
self.conn = None
@with_pg_reconnect
def alive(self):
if self.is_available():
cur = self.conn.cursor()
cur.query("show tables;")
cur.close()
self.conn.ping()
@with_pg_reconnect
def execute(self, sql, param=None):
return DBConnection.execute(self, sql, param)
@with_pg_reconnect
def executemany(self, sql, param):
return DBConnection.executemany(self, sql, param)
@with_pg_reconnect
def query(self, sql, param=None, isdict=True):
return DBConnection.query(self, sql, param, isdict)
@with_pg_reconnect
def get(self, sql, param=None, isdict=True):
return DBConnection.get(self, sql, param, isdict)
def escape(self, s, enc='utf-8'):
if type(s) == types.UnicodeType:
s = s.encode(enc)
import psycopg2
ns = psycopg2._param_escape(s)
return unicode(ns, enc)
def last_insert_id(self):
ret = self.query('select last_insert_id()', isdict=False)
return ret[0][0]
def start(self):
sql = "start transaction"
return self.execute(sql)
def insert(self, table, values):
# sql = "insert into %s set %s" % (table, self.dict2sql(values))
ret = 0
try:
keys, vals = self.dict2insert(values)
sql = "insert into %s(%s) values (%s) RETURNING id" % (table, keys, vals)
data = self.query(sql)
if data:
ret = data[0].get('id')
except Exception as e:
logging.error(e)
return ret
def insert_ignore(self, table, values):
return self.insert(table, values)
class MySQLConnection(DBConnection):
name = "mysql"
def __init__(self, param, lasttime, status):
DBConnection.__init__(self, param, lasttime, status)
self.connect()
def useit(self):
self.status = 1
self.lasttime = time.time()
def releaseit(self):
self.status = 0
def connect(self):
engine = self.param['engine']
if engine == 'mysql':
try:
import MySQLdb
mysqllib = MySQLdb
except:
logging.info("MySQLdb load error! Load pymysql...")
import pymysql
mysqllib = pymysql
self.conn = mysqllib.connect(host=self.param['host'],
port=self.param['port'],
user=self.param['user'],
passwd=self.param['passwd'],
db=self.param['db'],
charset=self.param['charset'],
connect_timeout=self.param.get('timeout', 20),
)
self.conn.autocommit(1)
# if self.param.get('autocommit',None):
# logging.note('set autocommit')
# self.conn.autocommit(1)
# initsqls = self.param.get('init_command')
# if initsqls:
# logging.note('init sqls:', initsqls)
# cur = self.conn.cursor()
# cur.execute(initsqls)
# cur.close()
else:
raise ValueError('engine error:' + engine)
# logging.note('mysql connected', self.conn)
def close(self):
self.conn.close()
self.conn = None
@with_mysql_reconnect
def alive(self):
if self.is_available():
cur = self.conn.cursor()
cur.execute("show tables;")
cur.close()
self.conn.ping()
@with_mysql_reconnect
def execute(self, sql, param=None):
return DBConnection.execute(self, sql, param)
@with_mysql_reconnect
def executemany(self, sql, param):
return DBConnection.executemany(self, sql, param)
@with_mysql_reconnect
def query(self, sql, param=None, isdict=True):
return DBConnection.query(self, sql, param, isdict)
@with_mysql_reconnect
def get(self, sql, param=None, isdict=True):
return DBConnection.get(self, sql, param, isdict)
def escape(self, s, enc='utf-8'):
if type(s) == types.UnicodeType:
s = s.encode(enc)
ns = self.conn.escape_string(s)
return unicode(ns, enc)
def last_insert_id(self):
ret = self.query('select last_insert_id()', isdict=False)
return ret[0][0]
def start(self):
sql = "start transaction"
return self.execute(sql)
class SQLiteConnection(DBConnection):
name = "sqlite"
def __init__(self, param, lasttime, status):
DBConnection.__init__(self, param, lasttime, status)
def connect(self):
engine = self.param['engine']
if engine == 'sqlite':
import sqlite3
self.conn = sqlite3.connect(self.param['db'], isolation_level=None)
else:
raise ValueError('engine error:' + engine)
def useit(self):
DBConnection.useit(self)
if not self.conn:
self.connect()
def releaseit(self):
DBConnection.releaseit(self)
self.conn.close()
self.conn = None
def escape(self, s, enc='utf-8'):
s = s.replace("'", "\\'")
s = s.replace('"', '\\"')
return s
def last_insert_id(self):
ret = self.query('select last_insert_rowid()', isdict=False)
return ret[0][0]
def start(self):
sql = "BEGIN"
return self.conn.execute(sql)
class DBPool(DBPoolBase):
def __init__(self, dbcf):
# one item: [conn, last_get_time, stauts]
self.dbconn_idle = []
self.dbconn_using = []
self.dbcf = dbcf
self.max_conn = 20
self.min_conn = 1
if 'conn' in self.dbcf:
self.max_conn = self.dbcf['conn']
self.connection_class = {}
x = globals()
for v in x.values():
try:
class_type = types.ClassType
except:
class_type = type
if type(v) == class_type and v != DBConnection and issubclass(v, DBConnection):
self.connection_class[v.name] = v
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.open(self.min_conn)
def synchronize(func):
def _(self, *args, **argitems):
self.lock.acquire()
x = None
try:
x = func(self, *args, **argitems)
finally:
self.lock.release()
return x
return _
def open(self, n=1):
param = self.dbcf
newconns = []
for i in range(0, n):
try:
myconn = self.connection_class[param['engine']](param, time.time(), 0)
newconns.append(myconn)
except Exception as e:
logging.info(e)
logging.error("%s connection error!" % param)
self.dbconn_idle += newconns
def clear_timeout(self):
# logging.info('try clear timeout conn ...')
now = time.time()
dels = []
allconn = len(self.dbconn_idle) + len(self.dbconn_using)
for c in self.dbconn_idle:
if allconn == 1:
break
if now - c.lasttime > 10:
dels.append(c)
allconn -= 1
logging.warn('close timeout db conn:%d', len(dels))
for c in dels:
c.close()
self.dbconn_idle.remove(c)
@synchronize
def acquire(self, timeout=None):
try_count = 10
while len(self.dbconn_idle) == 0:
try_count -= 1
if not try_count:
break
if len(self.dbconn_idle) + len(self.dbconn_using) < self.max_conn:
self.open()
continue
self.cond.wait(timeout)
if not self.dbconn_idle:
return None
conn = self.dbconn_idle.pop(0)
conn.useit()
self.dbconn_using.append(conn)
if random.randint(0, 100) > 80:
self.clear_timeout()
return conn
@synchronize
def release(self, conn):
self.dbconn_using.remove(conn)
conn.releaseit()
self.dbconn_idle.insert(0, conn)
self.cond.notify()
@synchronize
def alive(self):
for conn in self.dbconn_idle:
conn.alive()
def size(self):
return len(self.dbconn_idle), len(self.dbconn_using)
class DBConnProxy:
def __init__(self, master_conn, slave_conn):
# self.name = ''
self.master = master_conn
self.slave = slave_conn
self._modify_methods = set(
['execute', 'executemany', 'last_insert_id', 'insert', 'update', 'delete', 'insert_list'])
self._master_methods = {
'selectw_one': 'select_one',
'selectw': 'select',
'queryw': 'query',
'getw': 'get',
}
def __getattr__(self, name):
if name in self._modify_methods:
return getattr(self.master, name)
elif name in self._master_methods:
return getattr(self.master, self._master_methods[name])
else:
return getattr(self.slave, name)
class RWDBPool:
def __init__(self, dbcf):
self.dbcf = dbcf
self.name = ''
self.policy = dbcf.get('policy', 'round_robin')
self.master = DBPool(dbcf.get('master', None))
self.slaves = []
self._slave_current = -1
for x in dbcf.get('slave', []):
self.slaves.append(DBPool(x))
def get_slave(self):
if self.policy == 'round_robin':
size = len(self.slaves)
self._slave_current = (self._slave_current + 1) % size
return self.slaves[self._slave_current]
else:
raise ValueError('policy not support')
def get_master(self):
return self.master
def acquire(self, timeout=None):
# logging.debug('rwdbpool acquire')
master_conn = None
slave_conn = None
try:
master_conn = self.master.acquire(timeout)
slave_conn = self.get_slave().acquire(timeout)
return DBConnProxy(master_conn, slave_conn)
except:
if master_conn:
master_conn.pool.release(master_conn)
if slave_conn:
slave_conn.pool.release(slave_conn)
raise
def release(self, conn):
# logging.debug('rwdbpool release')
conn.master.pool.release(conn.master)
conn.slave.pool.release(conn.slave)
def size(self):
ret = {'master': self.master.size(), 'slave': []}
for x in self.slaves:
key = '%s@%s:%d' % (x.dbcf['user'], x.dbcf['host'], x.dbcf['port'])
ret['slave'].append((key, x.size()))
return ret
def checkalive(name=None):
global dbpool
while True:
if name is None:
checknames = dbpool.keys()
else:
checknames = [name]
for k in checknames:
pool = dbpool[k]
pool.alive()
time.sleep(300)
def install(cf, force=False):
global dbpool
if dbpool and not force:
return dbpool
dbpool = {}
for name, item in cf.items():
# item = cf[name]
dbp = None
if 'master' in item:
dbp = RWDBPool(item)
else:
dbp = DBPool(item)
dbpool[name] = dbp
return dbpool
def acquire(name, timeout=None):
global dbpool
# logging.info("acquire:", name)
pool = dbpool.get(name, None)
x = None
if pool:
x = pool.acquire(timeout)
if x:
x.name = name
return x
def release(conn):
global dbpool
# logging.info("release:", name)
if not conn:
return None
pool = dbpool[conn.name]
return pool.release(conn)
def execute(db, sql, param=None):
return db.execute(sql, param)
def executemany(db, sql, param):
return db.executemany(sql, param)
def query(db, sql, param=None, isdict=True):
return db.query(sql, param, isdict)
def with_database(name, errfunc=None, errstr=''):
def f(func):
def _(self, *args, **argitems):
self.db = acquire(name)
x = None
try:
x = func(self, *args, **argitems)
except:
if errfunc:
return getattr(self, errfunc)(error=errstr)
else:
raise
finally:
release(self.db)
self.db = None
return x
return _
return f
def with_database_class(name):
def _(cls):
try:
cls.db = acquire(name)
except:
cls.db = None
finally:
release(cls.db)
return cls
return _
# coding: utf-8
class GeneralError(Exception):
def __init__(self, error_msg):
super(GeneralError, self).__init__()
self.error_msg = error_msg
def __str__(self):
return self.error_msg
if __name__ == "__main__":
try:
raise GeneralError('客户异常')
except Exception as e:
print(e)
# coding: utf-8
# ::::::::::::::::::
# 错误信息及code列表
ERROR_CODE = {
# 第三方或者其他服务错误
"DBERR" : "2000",
"THIRDERR" : "2001",
"DATAERR" : "2003",
"IOERR" : "2004",
"REDISERR" : "2005",
"SERVERERR" : "2006",
# 返回内容错误
"NODATA" : "2300",
"DATAEXIST" : "2301",
"CONTENTERR" : "2302",
"NOT_FOUND" : "404",
# 未知错误
"UNKOWNERR" : "2400",
# 参数错误
"PARAMERR" : "2401",
# User 相关
"SESSIONERR" : "2100",
"USERERR" : "2102",
"ROLEERR" : "2103",
"PWDERR" : "2104",
"BALANCEERR" : "2110",
# 验证失败
"VERIFYERR" : "2105",
}
ERROR_MSG = {
# 第三方或者其他服务错误
"DBERR" : "数据库查询错误",
"THIRDERR" : "第三方系统错误",
"DATAERR" : "数据错误",
"IOERR" : "文件读写错误",
"REDISERR" : "REDIS错误",
"SERVERERR" : "内部错误",
# 返回内容错误
"NODATA" : "无数据",
"DATAEXIST" : "数据已存在",
"CONTENTERR" : "内容错误",
"NOT_FOUND" : "未知请求",
# 未知错误
"UNKOWNERR" : "未知错误",
# 参数错误
"PARAMERR" : "参数错误",
# User 相关
"SESSIONERR" : "未登录",
"USERERR" : "用户不存在",
"ROLEERR" : "用户身份错误",
"PWDERR" : "密码错误",
"BALANCEERR" : "钱包账号余额不足",
# 验证失败
"VERIFYERR" : "验证码错误",
}
ERROR_MSG_CODE = {
# 第三方或者其他服务错误
"数据库查询错误" : "2000",
"第三方系统错误" : "2001",
"数据错误" : "2003",
"文件读写错误" : "2004",
"REDIS错误" : "2005",
"内部错误" : "2006",
# 返回内容错误
"无数据" : "2300",
"数据已存在" : "2301",
"内容错误" : "2302",
"未知请求" : "404",
# 未知错误
"未知错误" : "2400",
# 参数错误
"参数错误" : "2401",
# User 相关
"未登录" : "2100",
"用户不存在" : "2102",
"用户身份错误" : "2103",
"密码错误" : "2104",
"钱包账号余额不足" : "2110",
# 验证失败
"验证码错误" : "2105",
}
#coding: utf-8
__author__ = 'yaobo'
def check_format(email):
import re
ret = re.findall('^[^@]{2,}@[\w\d]{2,}\.[\w]{2,}$', email)
return ret
if __name__ == "__main__":
print(check_format("afbd@dd"))
print(check_format("sdnv@dd.co"))
\ No newline at end of file
# coding: utf-8
# __author__ = 'bozyao'
import hashlib
import hmac
import json
import logging
import uuid
from datetime import datetime, date
import redis
def format_date(obj):
"""json dumps时使用的输出格式(暂时先这两个,以后可以加入,如自定义的对象)
@param obj: 需要转换的对象
@return: 转换后的样子
"""
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
class SessionData(dict):
def __init__(self, session_id, hmac_key):
self.session_id = session_id
self.hmac_key = hmac_key
class Session(SessionData):
def __init__(self, session_manager, request_handler):
self.session_manager = session_manager
self.request_handler = request_handler
try:
current_session = session_manager.get(request_handler)
except InvalidSessionException:
current_session = session_manager.get()
for key, data in current_session.items():
self[key] = data
self.session_id = current_session.session_id
self.hmac_key = current_session.hmac_key
def save(self, raw_data):
if not isinstance(raw_data, dict):
return
for key, data in raw_data.items():
self[key] = data
self.session_manager.set(self, self.request_handler)
return self.session_id
def remove(self):
self.session_manager.remove(self)
def get_all(self):
return self.session_manager.get_all()
class SessionManager(object):
def __init__(self, secret, store_options, session_timeout, m_db=None):
self.secret = secret
self.session_timeout = session_timeout
self.m_db = m_db
try:
self.redis = redis.StrictRedis(host=store_options['redis_host'],
port=store_options['redis_port'],
db=store_options['redis_db'])
except Exception as e:
print(e)
logging.error("Session redis connect error! %s" % e)
def _fetch(self, session_id):
try:
session_data = raw_data = self.redis.get(session_id)
# if not raw_data:
# session_data = raw_data = self.get_m_db_data(session_id)
if raw_data:
# self.redis.set(session_id, raw_data)
self.redis.setex(session_id, self.session_timeout, raw_data)
try:
session_data = json.loads(raw_data)
except:
session_data = {}
if isinstance(session_data, dict):
return session_data
else:
return {}
except IOError:
return {}
def get_session_id(self, request_handler, session_key="sid"):
session_id = request_handler.get_cookie(session_key, "")
return session_id
def _gen_session(self):
session_id = self._generate_id()
hmac_key = self._generate_hmac(session_id)
def get(self, request_handler=None):
if not request_handler:
session_id = None
hmac_key = ''
else:
session_id = self.get_session_id(request_handler)
hmac_key = request_handler.get_secure_cookie("verification")
# hmac_key = ''
if not session_id:
session_exists = False
session_id = self._generate_id()
hmac_key = self._generate_hmac(session_id)
# hmac_key = ''
else:
session_exists = True
check_hmac = self._generate_hmac(session_id)
if hmac_key != check_hmac:
raise InvalidSessionException()
session = SessionData(session_id, hmac_key)
if session_exists:
session_data = self._fetch(session_id)
for key, data in session_data.items():
session[key] = data
return session
def set(self, session, request_handler=None, session_key="sid"):
request_handler.set_cookie(session_key, session.session_id)
request_handler.set_secure_cookie("verification", session.hmac_key)
session_dict = dict(session.items())
session_data = json.dumps(session_dict, default=format_date)
self.set_m_db_data(session.session_id, session_data)
if 'user_id' in session_dict:
s_k = 's%s' % session_dict['user_id']
if not self.redis.sismember(s_k, session.session_id):
c = self.redis.scard(s_k)
while c >= 20:
self.redis.delete(self.redis.spop(s_k))
c -= 1
self.redis.sadd(s_k, session.session_id)
self.redis.setex(session.session_id, self.session_timeout, session_data)
def clear(self, user_id):
s_k = 's%s' % user_id
session_set = self.redis.smembers(s_k)
if not session_set:
return
for session_id in session_set:
self.redis.delete(session_id)
return len(session_set)
def remove(self, session):
# self.rm_m_db_data(session.session_id)
if 'user_id' in session:
s_k = 's%s' % session['user_id']
self.redis.srem(s_k, session.session_id)
self.redis.delete(session.session_id)
def get_all(self):
return len(self.redis.keys())
def _generate_id(self):
# new_id = hashlib.sha256(self.secret + str(uuid.uuid4()))
new_id = hashlib.sha1((self.secret + str(uuid.uuid4())).encode("utf8"))
return new_id.hexdigest()
def _generate_hmac(self, session_id):
return hmac.new(session_id, self.secret, hashlib.sha256).hexdigest()
def set_m_db_data(self, k, v):
if not self.m_db:
return 0
if not v:
return 0
# data = self.m_db.get("select * from user_session where sid = '%s'" % k)
data = self.get_m_db_data(k)
if data:
flag = self.m_db.execute("update user_session set vl = '%s' where sid = '%s'" % (v, k))
else:
flag = self.m_db.insert("user_session", {
"sid": k,
"vl": v
})
return flag
def get_m_db_data(self, k):
if not self.m_db:
return None
sql = "select vl from user_session where sid = '%s'" % k
data = self.m_db.get(sql)
if data:
data = data["vl"]
return data
def rm_m_db_data(self, k):
if not self.m_db:
return 0
return self.m_db.execute("delete from user_session where sid='%s'" % k)
class InvalidSessionException(Exception):
pass
# coding=utf-8
# __author__ = 'yaobo'
import inspect
import os
import random
import sys
_letter_cases = "abcdefhjkmnpqrtuvwxy" # 小写字母,去除可能干扰的i,l,o,z, g, s
_upper_cases = _letter_cases.upper() # 大写字母
_numbers = ''.join(map(str, range(3, 10))) # 数字
init_chars = ''.join((_letter_cases, _upper_cases, _numbers))
def current_path():
path = os.path.realpath(sys.path[0])
if os.path.isfile(path):
path = os.path.dirname(path)
return os.path.abspath(path)
else:
caller_file = inspect.stack()[1][1]
return os.path.abspath(os.path.dirname(caller_file))
cur_path = current_path()
def create_validate_code(size=(120, 30),
chars=init_chars,
img_type="GIF",
mode="RGB",
bg_color=(255, 255, 255),
fg_color=(0, 0, 125),
font_size=20,
font_type="fb.ttf",
length=4,
draw_lines=True,
n_line=(1, 5),
draw_points=True,
point_chance=4):
"""
@todo: 生成验证码图片
@param size: 图片的大小,格式(宽,高),默认为(120, 30)
@param chars: 允许的字符集合,格式字符串
@param img_type: 图片保存的格式,默认为GIF,可选的为GIF,JPEG,TIFF,PNG
@param mode: 图片模式,默认为RGB
@param bg_color: 背景颜色,默认为白色
@param fg_color: 前景色,验证码字符颜色,默认为蓝色#0000FF
@param font_size: 验证码字体大小
@param font_type: 验证码字体
@param length: 验证码字符个数
@param draw_lines: 是否划干扰线
@param n_lines: 干扰线的条数范围,格式元组,默认为(1, 2),只有draw_lines为True时有效
@param draw_points: 是否画干扰点
@param point_chance: 干扰点出现的概率,大小范围[0, 100]
@return: [0]: PIL Image实例
@return: [1]: 验证码图片中的字符串
"""
from PIL import Image, ImageDraw, ImageFont, ImageFilter
width, height = size # 宽, 高
img = Image.new(mode, size, bg_color) # 创建图形
draw = ImageDraw.Draw(img) # 创建画笔
def get_chars():
"""生成给定长度的字符串,返回列表格式"""
return random.sample(chars, length)
def create_lines():
"""绘制干扰线"""
line_num = random.randint(*n_line) # 干扰线条数
for i in range(line_num):
# 起始点
begin = (random.randint(0, size[0]), random.randint(0, size[1]))
# 结束点
end = (random.randint(0, size[0]), random.randint(0, size[1]))
draw.line([begin, end], fill=(0, 0, 0))
def create_points():
"""绘制干扰点"""
chance = min(100, max(0, int(point_chance))) # 大小限制在[0, 100]
for w in xrange(width):
for h in xrange(height):
tmp = random.randint(0, 100)
if tmp > 100 - chance:
draw.point((w, h), fill=(0, 0, 0))
def create_strs():
"""绘制验证码字符"""
c_chars = get_chars()
strs = ' %s ' % ' '.join(c_chars) # 每个字符前后以空格隔开
font = ImageFont.truetype(os.path.join(cur_path, font_type), font_size)
font_width, font_height = font.getsize(strs)
draw.text(((width - font_width) / 3, (height - font_height) / 3),
strs, font=font, fill=fg_color)
return ''.join(c_chars)
if draw_lines:
create_lines()
if draw_points:
create_points()
strs = create_strs()
# 图形扭曲参数
params = [1 - float(random.randint(1, 2)) / 100,
0,
0,
0,
1 - float(random.randint(1, 10)) / 100,
float(random.randint(1, 2)) / 500,
0.001,
float(random.randint(1, 2)) / 500
]
img = img.transform(size, Image.PERSPECTIVE, params) # 创建扭曲
img = img.filter(ImageFilter.EDGE_ENHANCE_MORE) # 滤镜,边界加强(阈值更大)
return img, strs
if __name__ == "__main__":
code_img = create_validate_code()[0]
code_img.save("validate.gif", "GIF")
# coding: utf-8
import json
import redis
import time
import copy
import datetime
import types
import random
import logging
try:
from conf.settings import BUSINESS_REDIS
logging.info("Redis config by local......")
except ImportError:
from base_conf.settings import BUSINESS_REDIS
logging.info("Redis config by base......")
class BaseCache(object):
def __init__(self, db=BUSINESS_REDIS['db'], host=BUSINESS_REDIS['host'], port=BUSINESS_REDIS['port'], kc={}):
"""
初始化redis对象r, 初始化配置信息kc(keys_config), 例如:
'cache_1':{
'db':0, #使用数据库,非必须参数,默认为参数db值
'key':'%s_key1', #实际存储key
'type':'str', #存储类型
'timeout':86400, #超时时间
'postpone':0, #读取数据时,是否顺延timeout,0=不顺延,1=顺延,非必须参数,默认为0
'random':43200, #写入数据时,timeout随机添加0-43200,防止批量刷新缓存后,同时失效,非必须参数,默认为0
}
"""
self.db = db
self.r = redis.StrictRedis(host=host, port=port, db=db)
self.kc = kc
def get_conf(self, ckey, rkey, charset='utf-8'):
"""
@param ckey: 配置文件的key
@return: 返回相应配置
"""
if not ckey:
return None
tmp_kc = copy.deepcopy(self.kc)
# 不允许存在无过期时间的数据
if ckey not in tmp_kc or 'timeout' not in tmp_kc[ckey] or 'key' not in tmp_kc[ckey]:
return None
# 处理rkey
if '%' in tmp_kc[ckey]['key']:
if not rkey:
return None
if isinstance(rkey, types.UnicodeType):
rkey = rkey.encode(charset, 'ignore')
tmp_kc[ckey]['key'] = tmp_kc[ckey]['key'] % rkey
# 处理timeout
if tmp_kc[ckey].get('random', 0):
tmp_kc[ckey]['timeout'] = tmp_kc[ckey]['timeout'] + random.randint(0, tmp_kc[ckey].get('random', 0))
# 切换数据库
if 'db' in tmp_kc[ckey] and tmp_kc[ckey]['db'] != self.db:
self.r.select(tmp_kc[ckey]['db'])
return tmp_kc[ckey]
def str2dict(self, val):
"""
@param val: 数据字符串
@return: 数据字典
"""
return json.loads(val)
def dict2str(self, val):
"""
@param val: 数据字典
@return: 数据字符串
"""
if isinstance(val, dict):
for k, v in val.items():
if isinstance(v, datetime.datetime):
val[k] = v.strftime("%Y-%m-%d %H:%M:%S")
return json.dumps(val)
else:
return val
def set(self, ckey, value, rkey='', charset='utf-8'):
"""
@param ckey: 配置文件key
@param rkey: 写入redis数据的key
@param value: 要写入redis的数据
@return: 成功与否
"""
c = self.get_conf(ckey, rkey)
if not c:
return
k = c['key']
t = c['type']
# 写入数据为空时,删除已有key
if not value and self.r.exists(k):
self.r.delete(k)
return True
if t == 'str':
value = self.dict2str(value)
self.r.set(k, value)
elif t == 'hash':
self.r.hmset(k, value)
elif t == 'list':
if isinstance(value, (types.TupleType, types.ListType)):
for d in value:
self.r.rpush(k, d)
else:
self.r.rpush(k, value)
elif t == 'set':
if isinstance(value, (types.TupleType, types.ListType)):
for d in value:
redis['write'].sadd(k, d)
else:
redis['write'].sadd(k, value)
elif t == 'sortedset':
self.r.zadd(k, value, int(time.time()))
if self.r.exists(k):
self.r.expire(k, c['timeout'])
return True
def get(self, ckey, rkey, ext={}, charset='utf-8'):
"""
@param ckey: 配置文件key
@param rkey: 写入redis数据的key
@return: 读取的值
"""
c = self.get_conf(ckey, rkey)
if not c:
return
k = c['key']
t = c['type']
if c['postpone'] and self.r.exists(k):
self.r.expire(k, c['timeout'])
s = ext.get('min', 0)
e = ext.get('max', -1)
if t == 'str':
return self.str2dict(self.r.get(k))
elif t == 'hash':
return self.r.hgetall(k)
elif t == 'list':
return self.r.lrange(k, s, e)
elif t == 'set':
return self.smembers(k)
elif t == 'sortedset':
return self.r.zrangebyscore(k, s, e, withscores=True)
return
def refresh(self, ckey, value, rkey, charset='utf-8'):
""" 重刷rkey的数据
@param ckey: 配置文件key
@param rkey: 写入redis数据的key
@return: 成功与否
"""
c = self.get_conf(ckey, rkey)
if not c:
return
if self.r.exists(k):
self.r.delete(k)
return self.set(ckey, value, rkey, charset)
if __name__ == "__main__":
pass
# coding: utf-8
import importlib
import logging
import os
import socket
import sys
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
from logging.handlers import TimedRotatingFileHandler
from tornado.options import define, options
from .base_lib.app_route import Application, URL_PREFIX
path = os.path.dirname(os.path.abspath(__file__))
if path not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
socket.setdefaulttimeout(10)
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
reload(sys)
sys.setdefaultencoding(default_encoding)
try:
print("Load local setting...")
new_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
new_path = os.path.dirname(new_path)
if new_path not in sys.path:
sys.path.append(new_path)
from conf.settings import settings, LOAD_MODULE, REJECT_MODULE, LOGFILE
except ImportError as e:
print(e)
print("Load local setting error, load base settings...")
from base_conf.settings import settings, LOAD_MODULE, REJECT_MODULE, LOGFILE
class MyHandler(TimedRotatingFileHandler):
def __init__(self, filename, when='MIDNIGHT', interval=1, backup_count=10, encoding=None, delay=False, utc=False):
self.delay = delay
self.bfn = filename
when = when.upper()
if when == 'M':
self.suffix = "%Y-%m-%d_%H-%M"
elif when == 'H':
self.suffix = "%Y-%m-%d_%H"
elif when == 'D' or when == 'MIDNIGHT':
self.suffix = "%Y-%m-%d"
else:
raise ValueError("Invalid rollover interval specified: %s" % when)
self.cfn = self.get_cfn()
TimedRotatingFileHandler.__init__(self, filename, when=when, interval=interval, backupCount=backup_count,
encoding=encoding, delay=delay, utc=utc)
def get_cfn(self):
return self.bfn + "." + time.strftime(self.suffix, time.localtime())
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = None
cur_time = int(time.time())
dst_now = time.localtime(cur_time)[-1]
self.cfn = self.get_cfn()
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
new_rollover_at = self.computeRollover(cur_time)
while new_rollover_at <= cur_time:
new_rollover_at = new_rollover_at + self.interval
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dst_at_rollover = time.localtime(new_rollover_at)[-1]
if dst_now != dst_at_rollover:
if not dst_now:
addend = -3600
else:
addend = 3600
new_rollover_at += addend
self.rolloverAt = new_rollover_at
def _open(self):
self.cfn = self.get_cfn()
if self.encoding is None:
stream = open(self.cfn, self.mode)
else:
import codecs
stream = codecs.open(self.current_file_name, self.mode, self.encoding)
if os.path.exists(self.bfn):
try:
os.remove(self.bfn)
except OSError:
pass
try:
os.symlink(self.cfn, self.bfn)
except OSError:
pass
return stream
def current_path(paths=sys.path[0]):
path = os.path.realpath(paths)
if os.path.isfile(path):
path = os.path.dirname(path)
return os.path.abspath(path)
else:
import inspect
caller_file = inspect.stack()[1][1]
return os.path.abspath(os.path.dirname(caller_file))
# 加载所有handler模块
def load_module(app, path):
logging.info("Load module path:%s" % path)
all_py = scan_dir(path)
# 循环获取所有py文件
for file_name in all_py:
i = file_name.replace(path, "")
mn = i[1:-3].replace("/", ".").replace("\\", ".")
# print file_name, i, mn, current_path(), __file__
m = importlib.import_module(mn)
# 获取有效的Handler类,方法名称
# 此处如果类名不是Handler结尾,会对自动生成url规则产生影响,暂限定
hd = [j for j in dir(m) if j[-7:] == "Handler" and j != 'RequestHandler' and j != 'Handler']
if hd:
if ((LOAD_MODULE and i in LOAD_MODULE) or not LOAD_MODULE) and i not in REJECT_MODULE:
logging.info("Load handler file: %s" % file_name)
app.load_handler_module(m)
else:
logging.info("Miss handler file: %s" % file_name)
return app
# 扫描目录,得到所有py文件
def scan_dir(path, hfs=[]):
fds = os.listdir(path)
for i in fds:
i = os.path.join(path, i)
if i[-3:] == ".py":
hfs.append(i)
elif os.path.isdir(i):
hfs = scan_dir(i, hfs)
return hfs
def config_logger(options):
import logging
# from tornado.options import options
if options.logging is None or options.logging.lower() == 'none':
return
logger = logging.getLogger()
logger.setLevel(getattr(logging, options.logging.upper()))
formatter = logging.Formatter(
fmt='%(asctime)s.%(msecs)03d %(levelname)1.1s %(process)5d:%(threadName)-7.7s '
'%(module)10.10s:%(lineno)04d $$ %(message)s',
datefmt='%y%m%d %H:%M:%S'
)
logger.handlers = []
if options.log_file_prefix:
print("Set logging config with file at %s" % options.log_file_prefix)
channel = MyHandler(filename=options.log_file_prefix, backup_count=30)
if logger.handlers:
del logger.handlers[:]
logger.addHandler(channel)
if options.log_to_stderr or (options.log_to_stderr is None and not logger.handlers):
print("Set logging config with stdout.")
channel = logging.StreamHandler()
logger.addHandler(channel)
if logger.handlers:
for l in logger.handlers:
l.setFormatter(formatter)
def run(path="", port=8800, url_prefix=URL_PREFIX, use_session=True, debug=False):
import base_lib.app_route
base_lib.app_route.URL_PREFIX = url_prefix
define("port", default=port, help="run on the given port", type=int)
if debug:
settings["debug"] = True
application = Application(None, **settings)
tornado.options.parse_command_line(final=True)
if LOGFILE and not options.log_file_prefix:
options.log_file_prefix = LOGFILE
if settings["debug"]:
options.logging = "DEBUG"
config_logger(options)
if not path:
path = current_path()
load_module(application, path)
http_server = tornado.httpserver.HTTPServer(application, xheaders=True)
from base_lib.tools import session
from base_lib.dbpool import acquire
if use_session:
sessiion_db = settings.get("session_db", "")
application.session_manager = session.SessionManager(
settings["session_secret"],
settings["store_options"],
settings["session_timeout"],
m_db=acquire(sessiion_db)
)
application.use_session = use_session
http_server.listen(options.port)
logging.info('Server start , debug: %s, port: %s' % (settings["debug"], options.port))
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
run()
# coding: utf-8
import json
import logging
import traceback
from datetime import datetime, date
try:
from conf.settings import database, BASE_DB, settings
logging.info("Data config by local......")
except ImportError:
from ..base_conf.settings import database, BASE_DB, settings
logging.info("Data config by base......")
from ..base_lib.dbpool import with_database_class, install
from ..base_lib import dbpool
if settings.get("debug", False):
logging.info("Running in debugging mode......")
dbpool.debug = True
install(database)
def format_date(obj):
"""
@todo: json dumps时使用的输出格式(暂时先这两个,以后可以加入,如自定义的对象)
@param obj: 需要转换的对象
@return: 转换后的样子
"""
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
# TODO 未考虑redis缓存数据
@with_database_class(BASE_DB)
class BaseModel:
def __init__(self):
"""
__table__: 实体表明
__fields__: 字段
"""
self.__table__ = ""
self.__fields__ = "*"
self.__id__ = "id"
# 考虑是否加入类型,默认值,范围等?
def get(self, key, value=""):
return self.real_dict().get(key, value)
def set(self, key, value=""):
self.__dict__[key] = value
def to_json(self, data=None):
"""输出有意义的属性json格式
@param data: 要转化的数据,默认为空,使用对象本身
@return: 属性json格式
"""
if not data:
data = self.real_dict()
return json.dumps(data, default=format_date)
def real_dict(self):
"""输出有意义的属性字典
@param:
@return: 属性字典
"""
data = self.__dict__.copy()
for key in list(data.keys()):
if key.find('__') == 0 or key == 'db':
data.pop(key)
return data
def load(self, json_data):
"""加载数据
@param json_data: dict数据
@return: 成功与否
"""
try:
# cur_keys = self.__fields__.replace(" ", "")
for key in json_data.keys():
# if key in cur_keys.split(","):
self.__dict__[key] = json_data[key]
return self
except Exception as e:
logging.error(e)
return None
def save(self):
"""保存自身,新增
@param:
@return: 成功与否
"""
if not self.__table__:
return 0
self.__id__ = "id"
try:
if self.real_dict().get(self.__id__, 0):
return self.update()
return self.db.insert(self.__table__, self.real_dict())
except:
logging.warning(traceback.format_exc())
# raise
return 0
def insert(self):
"""新增
@param:
@return: 成功与否
"""
if not self.__table__:
return 0
try:
return self.db.insert(self.__table__, self.real_dict())
except:
return 0
def update(self):
"""更新自身
@param:
@return: 成功与否
"""
self.__id__ = "id"
if not self.__table__ or not self.real_dict().get(self.__id__, 0):
return 0
try:
return self.db.update(self.__table__, self.real_dict(), {self.__id__: self.real_dict()[self.__id__]})
except:
return 0
def update_values(self, data, where=None):
"""批量更新
@param data: 要更新的键值对
@param where: 条件键值对,默认使用自己的id
@return: 成功与否
"""
if not self.__table__:
return 0
self.__id__ = "id"
if not where:
if self.real_dict().get(self.__id__, ""):
where = {self.__id__: self.real_dict()[self.__id__]}
else:
return 0
ret = 0
if data:
ret = self.db.update(self.__table__, data, where)
return ret
def delete_by_id(self, __id__):
"""通过id删除
@param __id__: id
@return: 成功与否
"""
if not self.__table__:
return 0
self.__id__ = "id"
ret = self.db.delete(self.__table__, {self.__id__: __id__})
return ret
def get_by_id(self, __id__):
"""通过id获取
@param __id__: id
@return: json数据结构 dict
"""
if not self.__table__:
return {}
self.__id__ = "id"
ret = self.db.select(self.__table__, {self.__id__: __id__}, self.__fields__)
if ret:
# return self.to_json(ret[0])
return ret[0]
return {}
def select(self, where={}, other='', fields=''):
"""自定义查询
@param where: dict键值对条件
@param other: 自定义的
@param fields: 查询字段
@return: dict数据结构 数组
"""
if not self.__table__:
return []
if not fields:
fields = self.__fields__
data = self.db.select(self.__table__, where, fields, other)
return data
def select_one(self, where={}, other='', fields=''):
"""自定义查询
@param where: dict键值对条件
@param other: 自定义的
@param fields: 查询字段
@return: dict数据结构 数组
"""
if not self.__table__:
return []
if not fields:
fields = self.__fields__
data = self.db.select_one(self.__table__, where, fields, other)
return data
def query(self, sql):
"""自定义SQL查询
@param sql: 直接sql查询
@return: dict数据结果(不一定和当前model结构对等)
"""
if not self.__table__:
return []
ret = self.db.query(sql)
return ret
def execute(self, sql):
"""自定义SQL执行
@param sql: 直接sql
@return: 执行结果
"""
if not self.__table__:
return []
ret = self.db.execute(sql)
return ret
def escape(self, s):
return self.db.escape(s)
def format_json_str_list(self, data=[]):
ret = []
for row in data:
ret.append(self.to_json(row))
return ret
def load_info_to_list(self, datas, field="poi_id", info_field="poi"):
"""加载信息到数据集中
Args:
datas: 要加工的数据
field: 源数据中的id字段
info_field: 要加入的信息对应的key
Returns:
加工完的数据
"""
ids = ",".join([str(data[field]) for data in datas if data[field]])
if not ids:
return datas
self.__id__ = "id"
infos = self.select(other="where %s in (%s)" % (self.__id__, ids))
for info in infos:
for data in datas:
if data[field] == info[self.__id__]:
data[info_field] = info
return datas
def add_count(self, field_name, count=1):
""" 给int字段加值
@param field_name: 字段名
@param count: 加的数量,默认1,可以是负数
@return: 成功与否
"""
self.__id__ = "id"
try:
if not self.real_dict().get(self.__id__, ""):
logging.error("%s is null" % self.__id__)
return False
sql = "update %s set %s = %s + %d where %s = %d" % (
self.__table__, field_name, field_name, count, self.__id__, self.real_dict()[self.__id__]
)
self.execute(sql)
return True
except Exception as e:
logging.error("Add count error, info:%s" % e)
return False
def get_count(self, where={}, other=""):
self.__id__ = "id"
tmp_data = self.select(where, fields="count(%s) as count" % self.__id__, other=other)
return tmp_data[0]["count"]
if __name__ == "__main__":
b = BaseModel()
b.name = "121"
print(b.to_json())
print(b.save())
print(b.__dict__)
--index-url http://pypi.douban.com/simple/
--extra-index-url http://mirrors.aliyun.com/pypi/simple/
--allow-external PIL
--allow-unverified PIL
tornado>=4.5
redis>=2.10.6
MySQL-python>=1.2.5
qiniu>=7.0.7
psycopg2>=2.7.1
# coding: utf-8
__author__ = "bozyao"
import sys
import os
path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if path not in sys.path:
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if __name__ == "__main__":
from main import run, current_path
path = os.path.join(current_path())
sys.path.append(path)
run(path, use_session=False, debug=True)
# coding: utf-8
import sys
import os
import logging
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if __name__ == "__main__":
from base.main import run, current_path
path = current_path()
logging.info("Start path: %s" % path)
run(path)
# coding: utf-8
from base_lib.app_route import route, check_session, RequestHandler, async
@route()
class TestHandler(RequestHandler):
def post(self):
self.ret_data(
{"data": 'hello world'}
)
@route("/test/(\d+)")
class TestHandler(RequestHandler):
@async
def get(self, n):
import time
time.sleep(int(n))
self.ret_data({"sleep_time": n})
@route("/test1/(\d+)")
class Test1Handler(RequestHandler):
def get(self, n):
import time
time.sleep(int(n))
self.ret_data({"sleep_time": n})
@route()
class WorldHandler(RequestHandler):
@check_session()
# 必须登录,cookie中有session_id=xxxxxx,xxxx在redis中有数据userid
def get(self):
self.ret_data(
{"msg": "Say 'Hello world!'"}
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment