当前位置:网站首页>Add asynchronous task processing model task to flask framework
Add asynchronous task processing model task to flask framework
2022-07-22 17:16:00 【junjunzai123】
- When deploying the model , Due to the background business and model prediction process , The batch prediction of the model is a time-consuming task , Therefore, asynchronous operation is required at this time , When the prediction is completed, the background is writing a scheduled task , Go to predict where it is finished and get the results . The logic code of this part is as follows :
# -*- coding: utf-8 -*-
# @Time : 2022/7/20 Afternoon 5:19
# @Author : junzai
# @File : test_cpv_predict.py
# @Software: PyCharm
import json
from celery import Celery
from flask import Flask, jsonify, request
from celery.result import AsyncResult
from rule_ner import *
from cpv_predict import main, load_cls_model, load_entity_model_config, load_entity_model
from conf import save_path, conf_path, cls_pred_model_path, cpv_result_path, oss_up_path, save_dir, data_dir
flask_app = Flask(__name__)
flask_app.config.update(
CELERY_BROKER_URL='redis://127.0.0.1:6379/0',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'
)
celery = Celery(flask_app.name, broker=flask_app.config['CELERY_RESULT_BACKEND'],
backend=flask_app.config['CELERY_BROKER_URL'])
redis_uuid_list = []
# Category prediction model loading
cls_model = load_cls_model(cls_pred_model_path)
# Entity recognition model loading
label_vocab, tokenizer, trans_func, batchify_fn = load_entity_model_config(data_dir)
rank, entity_model = load_entity_model(label_vocab, save_dir)
# Load rule dictionary
r_ner = Rule_NER('./userdict/bcat_3', sep='\002')
@celery.task()
def cpv_server(task_id, oss_download_path):
print("cpv Start using models to predict ")
json_data = main(task_id, oss_download_path, cls_model, label_vocab, trans_func, batchify_fn, rank, entity_model,
r_ner)
print("cpv After processing, the result is returned ")
return json_data
@celery.task()
def cpv_server(task_id, oss_download_path):
print("cpv Start using models to predict ")
time.sleep(10) # Model time-consuming tasks
print("cpv After processing, the result is returned ")
return ' Processing results '
@flask_app.route('/ner', methods=['POST'])
def ner():
task_id = request.form['task_id']
oss_download_path = request.form['oss_download_path']
print(" Prepare for time-consuming tasks ===")
task = cpv_server.delay(task_id, oss_download_path)
print(' Prepare to return time-consuming task results ')
print(str(task))
redis_uuid_list.append(str(task))
return 'success'
@flask_app.route('/result')
def cpv_result():
result = {
}
if redis_uuid_list:
id = redis_uuid_list[0]
a = AsyncResult(id=id, app=celery)
if a.successful():
result = a.get() # task in return The data of
data_dict = json.loads(result)
data_dict['status'] = 0
data_dict['message'] = ' success '
result = json.dumps(data_dict)
redis_uuid_list.pop(0)
print(' Task to complete , uuid Successfully deleted from the list ')
return result
elif a.failed():
result['status'] = 1
result['message'] = ' Failure '
return json.dumps(result)
elif a.status == 'PENDING':
result['status'] = 2
result['message'] = ' Waiting for execution '
return json.dumps(result)
elif a.status == 'RETRY':
result['status'] = 3
result['message'] = ' Retrying after task exception '
return json.dumps(result)
elif a.status == 'STARTED':
result['status'] = 4
result['message'] = ' The task has begun to be carried out '
return json.dumps(result)
else:
result['status'] = 5
result['message'] = ' No task result '
return json.dumps(result)
if __name__ == '__main__':
flask_app.run(host='0.0.0.0', port=5000)
- explain : After this code is completed , When the service starts, you need to start two services . The first service is the logic service of this program :** python xxx.py start-up , Or is it gunicorn -w 1 -b 0.0.0.0:5000 xxx:flask_app** start-up . After the service is started , It will provide real-time services to the background , The second service is celery service : Start command : celery -A cpv_celery worker --loglevel=info -P eventlet
Startup issues :
- If gunicorn No installation , It needs to be installed in advance . Installation command
pip install gunicorn
# The command to start the program
# -w: Number of processes representing work
# -b: Represents the bound ip and port
# The next two app: first app Indicates the name of the file , the second app Represents flask The strength of the app, If the file name and app Different from here , Adjust according to your own needs .
gunicorn -w 1 -b 0.0.0.0:5000 app:app
- If the asynchronous service starts , First you want to install eventlet and celery Two bags
pip install eventlet
pip install celery
# Start the asynchronous service after the installation
# among : cpv_celery The file name of the file serving asynchronously
celery -A cpv_celery worker --loglevel=info -P eventlet
It should be in the process of asynchronous use , Middleware uses redis database , So it's starting celery It needs to be installed before redis database
redis Database installation
wget http://download.redis.io/releases/redis-6.0.8.tar.gz
tar xzf redis-6.0.8.tar.gz
cd redis-6.0.8
make
After execution make After the command ,redis-6.0.8 Of src Compiled... Will appear in the directory redis Service program redis-server, There are also client programs for testing redis-cli:
Start below redis service
cd src
./redis-server
Pay attention to this way to start redis The default configuration is used . You can also tell redis Use the specified configuration file to start
cd src
./redis-server ../redis.conf
redis.conf Is a default profile . We can use our own configuration file as needed .
start-up redis After service process , You can use the test client program redis-cli and redis Service interaction . such as :
cd src
./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"
stay redis In the process of Engineering startup , commonly redis It starts with guard , So we can modify the configuration file redis.conf
# Just installed redis The default here is no, Change it to yes, Can become a guard to start
136 daemonize yes
And then we can move on redis Restart the service
cd src
./redis-server ../redis.conf
边栏推荐
- 怎么使用js实现计算器和计时器功能
- tf.reduce_ sum()
- 论文阅读【6】Autoaugment: Learning augmentation strategies from data
- MySQL foundation +mysql cluster review
- 工作流引擎在vivo营销自动化中的应用实践 | 引擎篇03
- [redis] redis high availability deployment scheme in distributed scenarios
- Xshell Plus6下载及安装使用的方法
- 14_ Response model
- Win11终端管理员打不开解决方法
- polygon 链 matic概念及底层机制
猜你喜欢
Server network performance tuning cases
Codeforce d2. RGB substring (hard version) sliding window
Overview of basic principles of network
Concis component library | dark pattern design
UE4 modify the default cache path
Opencv supports H264 video coding
Hzero enterprise level digital PAAS platform (II) | enterprise level authority system
Shell (I) (updating)
使用OpenCV实现哈哈镜效果
UE4 use of vegetation tools
随机推荐
UE4 combines the objects made by the brush into a whole
FPGA - memory resources of internal structure of 7 Series FPGA -02- FIFO resources
NFS网络文件系统
UE4 writes the blueprint in the actor class to realize reuse
[paper translation] generalized radio representation learning via cross supervision between images
JSON_EXTRACT返回不正确问题
UE4 level blueprint realizes door opening and closing
14-多线程1
力扣刷题:dfs递归解决二叉树剪枝
修复版动态视频壁纸微信小程序源码下载,支持多种类型流量主收益
How much do you know about hande digital platform system and trial?
Codeforce d2. RGB substring (hard version) sliding window
In the arm64 environment, the third-party library hajimehoshi/oto of golang relies on the solution of alsa lib and CGO
服務器buffer/cache 的產生原因和釋放buffer/cache
Force deduction problem: DFS recursion to solve binary tree pruning
【转载】UE4 面试基础知识(二)
1840. The highest building height is greedy
一文带你了解redux的工作流程——action/reducer/store
tf.placeholder
Four main steps of web application penetration testing