Своя обработка ошибок в Activiti

Дмитрий Урбанович

Ижевск, 4 мая 2018


Activiti — это движок бизнес-процессов, который можно встроить в свое приложение. Бизнес-процессы задаются в стандарте BPMN 2.0 с некоторыми расширениями:


<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
             xmlns:activiti="http://activiti.org/bpmn"
             targetNamespace=""
>
    <process id="sample">
        <startEvent id="startEvent"/>

        <sequenceFlow id="flow1" sourceRef="startEvent" targetRef="serviceTask"/>

        <serviceTask id="serviceTask"
                     activiti:async="true"
                     activiti:class="urbanowicz.activiti.delegates.SampleDelegate"
        />

        <sequenceFlow id="flow2" sourceRef="serviceTask" targetRef="endEvent"/>

        <endEvent id="endEvent"/>
    </process>
</definitions>

Здесь activiti:async="true" означает, что движок сохранит в БД состояние процесса, а исполнение кода отложит в очередь.

Проще всего встроить Activiti в Spring Boot приложение. Достаточно указать родительский проект в pom.xml:


<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

И добавить Activiti в зависимости:


<dependency>
    <groupId>org.activiti</groupId>
    <artifactId>activiti-spring-boot-starter-basic</artifactId>
    <version>6.0.0</version>
</dependency>

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

Последние две зависимости позволят запускать движок в юнит-тестах.

По умолчанию, Activiti делает откат состояния и повторение шагов диаграммы еще два раза с интервалом в десять секунд, если в процессе исполнения происходит ошибка. К сожалению, если вставить выброс исключения в urbanowicz.activiti.delegates.SampleDelegate и написать тест, то никаких повторов наблюдаться не будет.


public class ModTest {
    @Rule
    public final ActivitiRule activitiRule = new ActivitiRule(
            new StandaloneInMemProcessEngineConfiguration()
                    .setAsyncExecutorActivate(true)
                    .buildProcessEngine()
    );

    @Test
    @Deployment(resources = "processes/sample.bpmn20.xml")
    public void test() throws InterruptedException {
        activitiRule.getRuntimeService()
                .startProcessInstanceByKey("sample");

        Thread.sleep(100000);
    }
}

Вероятно, это баг в DefaultJobManager#executeMessageJob: код сначала готовит план, а затем до начала его исполнения удаляет JobEntity. К моменту исполнения плана обработчики ошибок не могут поставить ту же JobEntity на повторное исполнение по причине ее отсутствия. В качестве быстрой заплатки, мы можем вставить это удаление в конец плана. Свой джоб-менеджер мы можем задать с помощью метода конфигурации setJobManager(new ModJobManager()).


public class ModJobManager extends DefaultJobManager {
    @Override
    protected void executeMessageJob(JobEntity jobEntity) {
        executeJobHandler(jobEntity);
        if (jobEntity.getId() != null) {
            Context.getAgenda()
                    .planOperation(() -> Context.getCommandContext().getJobEntityManager().delete(jobEntity));
        }
    }
}

Обработка ошибок по умолчанию — не всегда то, чего мы хотим. Например, могут потребоваться разные интервалы в зависимости от номера попытки (1 минута, 2 минута, 4 минуты и так далее). Или мы можем хотеть запретить повторы некоторых видов ошибок. Для этого мы используем свою фабрику команд повтора ошибочных запусков и регистрируем ее в конфигурации методом setFailedJobCommandFactory(new ModFailedJobCommandFactory()).


public class ModFailedJobCommandFactory implements FailedJobCommandFactory {
    @Override
    public Command<Object> getCommand(String jobId, Throwable exception) {
        return new ModJobRetryCmd(jobId, exception);
    }
}

Для настройки поведения можно ввести интерфейс для стратегии, в котором три метода. Первый будет возвращать время задержки после данного запуска, второй — следует ли повторять еще, и третий — реакция на попадание операции в мертвую очередь. Если стратегия говорит, что повтор не требуется, то мы сбрасываем количество оставшихся попыток. Факт попадания операции в метрвую очередь мы проверяем прямым запросом (Activiti использует старый идентификатор операции).


public class ModJobRetryCmd extends JobRetryCmd {
    ModJobRetryCmd(String jobId, Throwable exception) {
        super(jobId, exception);
    }

    @Override
    public Object execute(CommandContext commandContext) {
        Optional.ofNullable(commandContext.getJobEntityManager().findById(jobId))
                .filter(job -> retryStrategy.shouldStopRetry(job, exception))
                .ifPresent(job -> job.setRetries(1));

        Object result = super.execute(commandContext);

        Optional.ofNullable(commandContext.getDeadLetterJobEntityManager().findById(jobId))
                .ifPresent(job -> retryStrategy.movedToDeadLetter(job, exception));

        return result;
    }

    @Override
    protected Date calculateDueDate(CommandContext commandContext, int originalWaitTimeInSeconds, Date oldDate) {
        return super.calculateDueDate(
                commandContext,
                Optional.ofNullable(commandContext.getJobEntityManager().findById(jobId))
                        .flatMap(job -> retryStrategy.delayFor(job, exception))
                        .map(Duration::getSeconds)
                        .map(Long::intValue)
                        .orElse(originalWaitTimeInSeconds),
                oldDate
        );
    }
}

Мертвая очередь доступна только через отдельное API. Если же мы хотим, чтобы она была представлена в списке обычных задач, можно сделать это с помощью нашей стратегии:


@Override
public void movedToDeadLetter(DeadLetterJobEntity job, Throwable exception) {
    Task task = activitiRule.getTaskService().newTask();
    activitiRule.getTaskService().saveTask(task);
    activitiRule.getTaskService().setVariable(task.getId(), "jobId", job.getId());
}

Этот код создает пользовательскую задачу, которая не привязана к процессам. Для реакции на завершение таких задач понадобится регистрация слушателя. Его можно зарегистрировать глобально в конфигурации:


.setTypedEventListeners(Collections.singletonMap(
        ActivitiEventType.TASK_COMPLETED.name(),
        Collections.singletonList(new TaskEntityEventListener() {
            @Override
            protected void onTaskEntityEvent(TaskEntity taskEntity) {
                String jobId = (String) taskEntity.getVariable("jobId");
                activitiRule.getManagementService().moveDeadLetterJobToExecutableJob(jobId, 3);
            }
        })
))

Разумеется, нужно добавить в него проверки на тип задачи, так как этот слушатель ловит все события завершения пользовательских задач.

Полный код примера доступен на гитхабе: https://github.com/hun10/activiti-mod.