当前位置:网站首页>How to integrate timed batch tasks gracefully (glory Collection Edition)
How to integrate timed batch tasks gracefully (glory Collection Edition)
2022-07-22 12:08:00 【Dragon back ride Shi】
Catalog
3、Service Implementation class
4、SpringBatch Configuration class
7、 The startup class should be annotated
Preface
Recently, I have an urgent need , The requirements are as follows :
PC The web page triggers a device upgrade record ( The figure below ), The background should regularly update the equipment in batches . It's used regularly here Quartz, Batch data processing requires SpringBatch, A combination of the two , This requirement can be fulfilled .
Because before , No use. SpringBatch, So I checked the information on the Internet , It is found that there are not many references , So I can only slowly look through the official documents .
https://docs.spring.io/spring-batch/4.1.x/reference/html
There are many problems , Just record it .
One 、 Code implementation
1、pom file To configure
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
</dependencies>
2、application.yaml file
spring:
datasource:
username: thinklink
password: thinklink
url: jdbc:postgresql://172.16.205.54:5432/thinklink
driver-class-name: org.postgresql.Driver
batch:
job:
enabled: false
server:
port: 8073
#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
# The amount of data processed in each batch , The default is 5000
batch-size: 5000
3、Service Implementation class
The entry that triggers the batch task , Execute one job.
@Service("batchService")
public class BatchServiceImpl implements BatchService {
// Frame auto Injection
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job updateDeviceJob;
/**
* according to taskId Create a Job
* @param taskId
* @throws Exception
*/
@Override
public void createBatchJob(String taskId) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("taskId", taskId)
.addString("uuid", UUID.randomUUID().toString().replace("-",""))
.toJobParameters();
// Pass in a Job Task and parameters required by the task
jobLauncher.run(updateDeviceJob, jobParameters);
}
}
4、SpringBatch Configuration class
Be careful : This part is the most important
@Configuration
public class BatchConfiguration {
private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
@Value("${batch-size:5000}")
private int batchSize;
// Frame auto Injection
@Autowired
public JobBuilderFactory jobBuilderFactory;
// Frame auto Injection
@Autowired
public StepBuilderFactory stepBuilderFactory;
// Data filters , For the data read from the database , Pay attention to the operation
@Autowired
public TaskItemProcessor taskItemProcessor;
// receive job Parameters
public Map<String, JobParameter> parameters;
public Object taskId;
@Autowired
private JdbcTemplate jdbcTemplate;
// Read database operation
@Bean
@StepScope
public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
String querySql = " SELECT " +
" e. ID AS taskId, " +
" e.user_id AS userId, " +
" e.timing_startup AS startTime, " +
" u.device_id AS deviceId, " +
" d.app_name AS appName, " +
" d.compose_file AS composeFile, " +
" e.failure_retry AS failureRetry, " +
" e.tetry_times AS retryTimes, " +
" e.device_managered AS deviceManagered " +
" FROM " +
" eiot_upgrade_task e " +
" LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
" LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
" WHERE " +
" ( " +
" u.device_upgrade_status = 0 " +
" OR u.device_upgrade_status = 2" +
" )" +
" AND e.tetry_times > u.retry_times " +
" AND e. ID = ?";
return new JdbcCursorItemReaderBuilder<DispatchRequest>()
.name("itemReader")
.sql(querySql)
.dataSource(dataSource)
.queryArguments(new Object[]{parameters.get("taskId").getValue()})
.rowMapper(new DispatchRequest.DispatchRequestRowMapper())
.build();
}
// Write the results back to the database
@Bean
@StepScope
public ItemWriter<ProcessResult> itemWriter() {
return new ItemWriter<ProcessResult>() {
private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
Integer retryTimes = jdbcTemplate.queryForObject(
"select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
);
retryTimes += 1;
int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
"where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
if (updateCount <= 0) {
log.warn("no task updated");
} else {
log.info("count of {} task updated", updateCount);
}
// Last retry
if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
return 1;
} else {
return 0;
}
}
@Override
@Transactional
public void write(List<? extends ProcessResult> list) throws Exception {
Map taskMap = jdbcTemplate.queryForMap(
"select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
list.get(0).getDispatchRequest().getTaskId() // We identified a batch inside ,taskId It's all the same
);
int deviceManagered = (int)taskMap.get("device_managered");
Integer deviceCount = (Integer) taskMap.get("device_count");
if (deviceCount == null) {
log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
}
int taskStatus = (int)taskMap.get("task_status");
for (ProcessResult result: list) {
deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
}
if (deviceCount != null && deviceManagered == deviceCount) {
taskStatus = 2; // Task status 0: To be upgraded ,1: Upgrading ,2: Completed
}
jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +
"where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
}
};
}
/**
* Define a to distribute updates job
* @return
*/
@Bean
public Job updateDeviceJob(Step updateDeviceStep) {
return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
.listener(new JobListener()) // Set up Job The monitor for
.flow(updateDeviceStep)// Execute the of issuing updates Step
.end()
.build();
}
/**
* Define a to distribute updates step
* @return
*/
@Bean
public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
.<DispatchRequest, ProcessResult> chunk(batchSize)
.reader(itemReader) // according to taskId Read and update device information from the database
.processor(taskItemProcessor) // Every update , Execute the issue update interface
.writer(itemWriter)
.build();
}
// job Monitor
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info(jobExecution.getJobInstance().getJobName() + " before... ");
parameters = jobExecution.getJobParameters().getParameters();
taskId = parameters.get("taskId").getValue();
log.info("job param taskId : " + parameters.get("taskId"));
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info(jobExecution.getJobInstance().getJobName() + " after... ");
// When all job After execution , Query device update status , If there is a failure , It is necessary to re execute at regular intervals job
String sql = " SELECT " +
" count(*) " +
" FROM " +
" eiot_upgrade_device d " +
" LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
" WHERE " +
" u. ID = ? " +
" AND d.retry_times < u.tetry_times " +
" AND ( " +
" d.device_upgrade_status = 0 " +
" OR d.device_upgrade_status = 2 " +
" ) ";
// Get the number of failed devices
Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
log.info("update device failure count : " + count);
// Here's how to use Quartz Trigger timed tasks
// Get task time , Unit second
// String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
// It's convenient to test here , You should get... From the database taskId The corresponding retry interval , Unit second
Integer millSecond = 10;
if(count != null && count > 0){
String jobName = "UpgradeTask_" + taskId;
String reTaskId = taskId.toString();
Map<String,Object> params = new HashMap<>();
params.put("jobName",jobName);
params.put("taskId",reTaskId);
if (QuartzManager.checkNameNotExist(jobName))
{
QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
}
}
}
}
}
5、Processor Set up
Processing every piece of data , You can filter the data here .
@Component("taskItemProcessor")
public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
public static final int STATUS_DISPATCH_FAILED = 2;
public static final int STATUS_DISPATCH_SUCC = 1;
private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
private String dispatchUrl;
@Autowired
JdbcTemplate jdbcTemplate;
/**
* ad locum , perform Issue update instructions The operation of
* @param dispatchRequest
* @return
* @throws Exception
*/
@Override
public ProcessResult process(final DispatchRequest dispatchRequest) {
// Call interface , Give orders
String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
log.info("request url:" + url);
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
JSONObject jsonOuter = new JSONObject();
JSONObject jsonInner = new JSONObject();
try {
jsonInner.put("jobId",dispatchRequest.getTaskId());
jsonInner.put("name",dispatchRequest.getName());
jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
jsonInner.put("timestamp",dispatchRequest.getTimestamp());
jsonOuter.put("method","updateApp");
jsonOuter.put("params",jsonInner);
} catch (JSONException e) {
log.info("JSON convert Exception :" + e);
}catch (IOException e) {
log.info("Base64Util bytesToBase64Str :" + e);
}
log.info("request body json :" + jsonOuter);
HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
int status;
try {
ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
log.info("response :" + response);
if (response.getStatusCode() == HttpStatus.OK) {
status = STATUS_DISPATCH_SUCC;
} else {
status = STATUS_DISPATCH_FAILED;
}
}catch (Exception e){
status = STATUS_DISPATCH_FAILED;
}
return new ProcessResult(dispatchRequest, status);
}
}
6、 Encapsulate entity Bean
The entity that encapsulates the data returned by the database Bean, Note the static inner class
public class DispatchRequest {
private String taskId;
private String deviceId;
private String userId;
private String name;
private byte[] composeFile;
private String policy;
private String timestamp;
private String md5;
private int failureRetry;
private int retryTimes;
private int deviceManagered;
// Omit constructor ,setter/getter/tostring Method
//......
public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
@Override
public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
DispatchRequest dispatchRequest = new DispatchRequest();
dispatchRequest.setTaskId(resultSet.getString("taskId"));
dispatchRequest.setUserId(resultSet.getString("userId"));
dispatchRequest.setPolicy(resultSet.getString("startTime"));
dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
dispatchRequest.setName(resultSet.getString("appName"));
dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
return dispatchRequest;
}
}
}
7、 The startup class should be annotated
@SpringBootApplication
@EnableBatchProcessing
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
summary
Actually SpringBatch It's not as easy to use as expected , When fetching... From the database every time 5000 After data , Get into processor It is dealt with item by item in , You can't do it at this time , etc. 5000 After data processing , One more time ItemWriter Method .
In use , The worst place is ItemReader and ItemWriter These two places , How to execute custom Sql, Just refer to the code in the text .
as for Quartz Timing function , It's simple , Just create it regularly SpringBatch Inside Job, Let this job Just start up .
Only when you start , You will reach your ideal and destination , Only when you work hard ,
You will achieve brilliant success , Only when you sow , You will gain something . Only pursue ,
To taste the taste of success , Sticking to yesterday is called foothold , Sticking to today is called enterprising , Sticking to tomorrow is called success . Welcome all friends to praise + Collection !!!
边栏推荐
- 9.5~10.5 GHz频段室内离体信道的测量与建模
- SSM project integration [detailed]
- Execution sequence of try catch finally with return
- 布隆过滤器与布谷鸟过滤器(经典版)
- Uniapp encapsulation request
- 一种基于随机接入时机动态分配的接入方案
- Video 46 13.9 Semantic segmentation and data set 13.10 transpose convolution
- 31. [memcpy function and strcpy function]
- 数据库索引对写入操作的影响
- Week 5 Linear Models for Classification (Part A)
猜你喜欢
经典网络架构学习-ResNet
Explain the data structure of redis in detail
LeetCode·987. Vertical traversal, bucket sorting and Simulation of binary tree
JVM parameter configuration description
去中心化边缘渲染元宇宙协议Caduceus如何引爆Metaverse Summit 2022
传奇手游开服教程:传奇手游战神引擎架设教程
第01篇:分布式注册中心
【快速上手教程4】疯壳·开源编队无人机-OPENMV 脚本烧写
Force deduction ----- negative numbers in the statistical ordered matrix
Multithreading and high concurrency day09
随机推荐
Frame coding of h264 -- dialysis (I frame +p frame +b frame coding) principle and process
【 可选:在移动设备上运行您的应用】
FPGA - SPI bus control flash (1) (including code)
Web3 can't escape from the Wuzhishan of these old giants no matter how powerful it is
30.【静态数据成员】
[英雄星球七月集训LeetCode解题日报] 第21日 堆
多線程與高並發day09
MySQL 服务器进程 mysqld的组成
一个仅需三步配置就能生成免费个人博客的开源模板:vdoing-template
Do you dare to use BigDecimal without mastering these pits?
物品万维网资源管理框架研究与应用
PHP uses redis transaction to realize commodity spike
National technology n32g452 debugging interface multiplexing function JTDI jtdo jntrst
uniapp封装请求
机器学习基础篇(4)滤波器
一种基于随机接入时机动态分配的接入方案
FPGA——SPI总线控制flash(1)(含代码)
白嫖1000道面试题,看完哑巴都能面过
基于知识图谱的Wi-Fi室内定位技术研究
MySQL 8 中的数据类型转换 | 学习函数CAST() 和 CONVERT()