Ижевск, 4 мая 2018
Activiti — это движок бизнес-процессов, который можно встроить в свое приложение. Бизнес-процессы задаются в стандарте BPMN 2.0 с некоторыми расширениями:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
targetNamespace=""
>
<process id="sample">
<startEvent id="startEvent"/>
<sequenceFlow id="flow1" sourceRef="startEvent" targetRef="serviceTask"/>
<serviceTask id="serviceTask"
activiti:async="true"
activiti:class="urbanowicz.activiti.delegates.SampleDelegate"
/>
<sequenceFlow id="flow2" sourceRef="serviceTask" targetRef="endEvent"/>
<endEvent id="endEvent"/>
</process>
</definitions>
Здесь activiti:async="true"
означает, что движок сохранит в БД
состояние процесса, а исполнение кода отложит в очередь.
Проще всего встроить Activiti в Spring Boot приложение. Достаточно указать родительский проект в pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
И добавить Activiti в зависимости:
<dependency>
<groupId>org.activiti</groupId>
<artifactId>activiti-spring-boot-starter-basic</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
Последние две зависимости позволят запускать движок в юнит-тестах.
По умолчанию, Activiti делает откат состояния и повторение шагов диаграммы еще
два раза с интервалом в десять секунд, если в процессе исполнения происходит
ошибка. К сожалению, если вставить выброс исключения в
urbanowicz.activiti.delegates.SampleDelegate
и написать тест, то
никаких повторов наблюдаться не будет.
public class ModTest {
@Rule
public final ActivitiRule activitiRule = new ActivitiRule(
new StandaloneInMemProcessEngineConfiguration()
.setAsyncExecutorActivate(true)
.buildProcessEngine()
);
@Test
@Deployment(resources = "processes/sample.bpmn20.xml")
public void test() throws InterruptedException {
activitiRule.getRuntimeService()
.startProcessInstanceByKey("sample");
Thread.sleep(100000);
}
}
Вероятно, это баг в DefaultJobManager#executeMessageJob
: код
сначала готовит план, а затем до начала его исполнения удаляет
JobEntity
. К моменту исполнения плана обработчики ошибок
не могут поставить ту же JobEntity
на повторное исполнение по
причине ее отсутствия. В качестве быстрой заплатки, мы можем вставить это
удаление в конец плана. Свой джоб-менеджер мы можем задать с помощью
метода конфигурации setJobManager(new ModJobManager())
.
public class ModJobManager extends DefaultJobManager {
@Override
protected void executeMessageJob(JobEntity jobEntity) {
executeJobHandler(jobEntity);
if (jobEntity.getId() != null) {
Context.getAgenda()
.planOperation(() -> Context.getCommandContext().getJobEntityManager().delete(jobEntity));
}
}
}
Обработка ошибок по умолчанию — не всегда то, чего мы хотим. Например, могут
потребоваться разные интервалы в зависимости от номера попытки (1 минута,
2 минута, 4 минуты и так далее). Или мы можем хотеть запретить повторы
некоторых видов ошибок. Для этого мы используем свою фабрику команд повтора
ошибочных запусков и регистрируем ее в конфигурации методом
setFailedJobCommandFactory(new ModFailedJobCommandFactory())
.
public class ModFailedJobCommandFactory implements FailedJobCommandFactory {
@Override
public Command<Object> getCommand(String jobId, Throwable exception) {
return new ModJobRetryCmd(jobId, exception);
}
}
Для настройки поведения можно ввести интерфейс для стратегии, в котором три метода. Первый будет возвращать время задержки после данного запуска, второй — следует ли повторять еще, и третий — реакция на попадание операции в мертвую очередь. Если стратегия говорит, что повтор не требуется, то мы сбрасываем количество оставшихся попыток. Факт попадания операции в метрвую очередь мы проверяем прямым запросом (Activiti использует старый идентификатор операции).
public class ModJobRetryCmd extends JobRetryCmd {
ModJobRetryCmd(String jobId, Throwable exception) {
super(jobId, exception);
}
@Override
public Object execute(CommandContext commandContext) {
Optional.ofNullable(commandContext.getJobEntityManager().findById(jobId))
.filter(job -> retryStrategy.shouldStopRetry(job, exception))
.ifPresent(job -> job.setRetries(1));
Object result = super.execute(commandContext);
Optional.ofNullable(commandContext.getDeadLetterJobEntityManager().findById(jobId))
.ifPresent(job -> retryStrategy.movedToDeadLetter(job, exception));
return result;
}
@Override
protected Date calculateDueDate(CommandContext commandContext, int originalWaitTimeInSeconds, Date oldDate) {
return super.calculateDueDate(
commandContext,
Optional.ofNullable(commandContext.getJobEntityManager().findById(jobId))
.flatMap(job -> retryStrategy.delayFor(job, exception))
.map(Duration::getSeconds)
.map(Long::intValue)
.orElse(originalWaitTimeInSeconds),
oldDate
);
}
}
Мертвая очередь доступна только через отдельное API. Если же мы хотим, чтобы она была представлена в списке обычных задач, можно сделать это с помощью нашей стратегии:
@Override
public void movedToDeadLetter(DeadLetterJobEntity job, Throwable exception) {
Task task = activitiRule.getTaskService().newTask();
activitiRule.getTaskService().saveTask(task);
activitiRule.getTaskService().setVariable(task.getId(), "jobId", job.getId());
}
Этот код создает пользовательскую задачу, которая не привязана к процессам. Для реакции на завершение таких задач понадобится регистрация слушателя. Его можно зарегистрировать глобально в конфигурации:
.setTypedEventListeners(Collections.singletonMap(
ActivitiEventType.TASK_COMPLETED.name(),
Collections.singletonList(new TaskEntityEventListener() {
@Override
protected void onTaskEntityEvent(TaskEntity taskEntity) {
String jobId = (String) taskEntity.getVariable("jobId");
activitiRule.getManagementService().moveDeadLetterJobToExecutableJob(jobId, 3);
}
})
))
Разумеется, нужно добавить в него проверки на тип задачи, так как этот слушатель ловит все события завершения пользовательских задач.
Полный код примера доступен на гитхабе: https://github.com/hun10/activiti-mod.