我在使用 Gunicorn 服务器运行的 Falcon 框架中编写了一个应用程序。服务器启动时,应用首先学习随机森林模型:
forest = sklearn.ensemble.ExtraTreesClassifier(n_estimators=150, n_jobs=-1)
forest.fit(x, t)
然后返回发布到它的请求的概率。当我在 iPython 中运行代码时,这在我的服务器上运行良好(训练这个模型需要 15 秒,在 12 个内核上运行)。
当我编写应用程序时,我设置n_estimators=10
并且一切正常。当我完成对应用程序的调整后,我n_estimators
又回到了 150。但是,当我运行 Gunicorn 然后使用gunicorn -c ./app.conf app:app
,从 htop 我可以看到forest.fit(x, t)
所有内核上运行了几秒钟,之后所有内核的使用率下降到 0。在那之后,该方法会无限期地运行,直到 Gunicorn 工作人员在 10 分钟后超时。
这是我第一次使用 Gunicorn 和 Falcon 或任何 WSGI 技术,我不知道可能导致问题的原因或如何解决问题。
编辑:
gunicorn 的设置文件:
# app.conf
# run with gunicorn -c ./app.conf app:app
import sys
sys.path.append('/home/user/project/Module')
bind = "127.0.0.1:8123"
timeout = 60*20 # Timeout worker after more than 20 minutes`
猎鹰代码:
class Processor(object):
""" Processor object handles the training of the models,
feature generation of requests and probability predictions.
"""
# data statistics used in feature calculations
data_statistics = {}
# Classification targets
targets = ()
# Select features for the models.
cols1 = [ #...
]
cols2 = [ #...
]
model_1 = ExtraTreesClassifier(n_estimators=150, n_jobs=-1)
model_2 = ExtraTreesClassifier(n_estimators=150, n_jobs=-1)
def __init__(self, features_dataset, tr_prepro):
# Get the datasets
da_1, da_2 = self.prepare_datasets(features_dataset)
# Train models
# ----THIS IS WHERE THE PROGRAM HANGS -----------------------------------
self.model_1.fit(da_1.x, utils.vectors_to_labels(da_1.t))
# -----------------------------------------------------------------------
self.model_2.fit(da_2.x, utils.vectors_to_labels(da_2.t))
# Generate data statistics for feature calculations
self.calculate_data_statistics(tr_prepro)
def prepare_datasets(self, features_dataset):
sel_cols = [ #...
]
# Build dataset
d = features_dataset[sel_cols].dropna()
da, scalers = ft.build_dataset(d, scaling='std', target_feature='outcome')
# Binirize data
da_bin = utils.binirize_dataset(da)
# What are the classification targets
self.targets = da_bin.t_labels
# Prepare the datasets
da_1 = da_bin.select_attributes(self.cols1)
da_2 = da_bin.select_attributes(self.cols2)
return da_1, da_2
def calculate_data_statistics(self, tr_prepro):
logger.info('Getting data and feature statistics...')
#...
logger.info('Done.')
def import_data(self, data):
# convert dictionary generated from json to Pandas DataFrame
return tr
def generate_features(self, tr):
# Preprocessing, Feature calculations, imputation
return tr
def predict_proba(self, data):
# Convert Data
tr = self.import_data(data)
# Generate features
tr = self.generate_features(tr)
# Select model based on missing values - either no. 1 or no. 2
tr_1 = #...
tr_2 = #...
# Get the probabilities from different models
if tr_1.shape[0] > 0:
tr_1.loc[:, 'prob'] = self.model_1.predict_proba(tr_1.loc[:, self.cols1])[:, self.targets.index('POSITIVE')]
if tr_2.shape[0] > 0:
tr_2.loc[:, 'prob'] = self.model_2.predict_proba(tr_2.loc[:, self.cols2])[:, self.targets.index('POSITIVE')]
return pd.concat([tr_1, tr_2], axis=0)
@staticmethod
def export_single_result(tr):
result = {'sample_id': tr.loc[0, 'sample_id'],
'batch_id': tr.loc[0, 'batch_id'],
'prob': tr.loc[0, 'prob']
}
return result
class JSONTranslator(object):
def process_request(self, req, resp):
"""Generic method for extracting json from requets
Throws
------
HTTP 400 (Bad Request)
HTTP 753 ('Syntax Error')
"""
if req.content_length in (None, 0):
# Nothing to do
return
body = req.stream.read()
if not body:
raise falcon.HTTPBadRequest('Empty request body',
'A valid JSON document is required.')
try:
req.context['data'] = json.loads(body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
raise falcon.HTTPError(falcon.HTTP_753,
'Malformed JSON',
'Could not decode the request body. The '
'JSON was incorrect or not encoded as '
'UTF-8.')
def process_response(self, req, resp, resource):
"""Generic method for putting response to json
Does not do anything if 'result_json' not in req.context.
"""
if 'result_json' not in req.context:
return
resp.body = json.dumps(req.context['result_json'])
class ProbResource(object):
def __init__(self, processor):
self.schema_raw = open(config.__ROOT__ + "app_docs/broadcast_schema.json").read()
self.schema = json.loads(self.schema_raw)
self.processor = processor
def validate_request(self, req):
""" Validate the request json against the schema.
Throws
------
HTTP 753 ('Syntax Error')
"""
data = req.context['data']
# validate the json
try:
v = jsonschema.Draft4Validator(self.schema) # using jsonschema draft 4
err_msg = str()
for error in sorted(v.iter_errors(data), key=str):
err_msg += str(error)
if len(err_msg) > 0:
raise falcon.HTTPError(falcon.HTTP_753,
'JSON failed validation',
err_msg)
except jsonschema.ValidationError as e:
print("Failed to use schema:\n" + str(self.schema_raw))
raise e
def on_get(self, req, resp):
"""Handles GET requests
Throws
------
HTTP 404 (Not Found)
"""
self.validate_request(req)
data = req.context['data']
try:
# get probability
tr = self.processor.predict_proba(data)
# convert pandas dataframe to dictionary
result = self.processor.export_single_result(tr)
# send the dictionary away
req.context['result_json'] = result
except Exception as ex:
raise falcon.HTTPError(falcon.HTTP_404, 'Error', ex.message)
resp.status = falcon.HTTP_200
# Get data
features_data = fastserialize.load(config.__ROOT__ + 'data/samples.s')
prepro_data = fastserialize.load(config.__ROOT__ + 'data/prepro/samples_preprocessed.s')
# Get the models - this is where the code hangs
sp = SampleProcessor(features_data, prepro_data)
app = falcon.API(middleware=[JSONTranslator()])
prob = ProbResource(sp)
app.add_route('/prob', prob)