Implementing timed concurrent execution of tasks with contextual failure reporting

Overview

The natural progression in implementing task execution is normally as follows although one may skip steps if well-informed.

  • Implement sequential execution of tasks.
  • Realise that sequential execution is too slow so implement basic concurrent execution of tasks using manual creation and management of background threads with manual completion checking.
  • Realise that java.util.concurrent.ExecutorService exists and migrate above to it and use manual completion checking using futures.
  • Realise that java.util.concurrent.CompletionService exists so migrate above to it and start using automatic completion checking.
  • Realise that two problems still exist.
    • Certain tasks fail or timeout holding up the invoking method indefinitely.
    • When tasks fail or timeout there is no contextual information i.e. a way to tell which tasks specifically failed or timed out.

    What now?

This article explores the end stage of the evolution of this thought process which has the following requirements.

  • Tasks must be executed concurrently.
  • A timeout must be imposed on task execution.
  • In the event of task timeout or failure contextual information must be accessible for informative error reporting i.e. to tell which tasks failed or timed out.

The design of the domain is what enables contextual information to be made available in the event of a timeout or failure giving precise information to the calling client.

Domain description

Here follows a description of the domain being used in the example implementation that follows. Essentially the application code that follows implements a monitoring service along with the associated domain. The idea is that as an application starts up it needs to know that the environment that it depends on is up and running so that it can terminate startup if anything is down. In this case three specific parts of the environment are checked.

  • Database
  • Remote service
  • Shared drive

The code for the above monitors follows.

Monitor interface

package monitor;
import execution.MonitorStatus;

public interface Monitor {
    String subject();
    MonitorStatus monitor();
}

Monitor implementations

Here a base class follows encapsulating the name of the monitor currently being run.

package monitor;

public abstract class AbstractMonitor implements Monitor {

    protected final String name;

    public AbstractMonitor(final String name) {
        this.name = name;
    }

    @Override
    public String subject() {
        return name;
    }

}
Database monitor
package monitor;

import execution.MonitorStatus;
import execution.MonitorStatus.Outcome;

public class DatabaseMonitor extends AbstractMonitor {

    public DatabaseMonitor(String name) {
        super(name);
    }

    @Override
    public MonitorStatus monitor() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new MonitorStatus(name, Outcome.UP);
    }

}
Remote service monitor
package monitor;

import execution.MonitorStatus;
import execution.MonitorStatus.Outcome;

public class RemoteServiceMonitor extends AbstractMonitor {

    public RemoteServiceMonitor(String name) {
        super(name);
    }

    @Override
    public MonitorStatus monitor() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new MonitorStatus(name, Outcome.DOWN);
    }

}
Shared drive monitor

The shared drive monitor is the one that is intended to timeout for demonstration purposes.

package monitor;

import execution.MonitorStatus;
import execution.MonitorStatus.Outcome;

public class SharedDriveMonitor extends AbstractMonitor {

    public SharedDriveMonitor(String name) {
        super(name);
    }

    @Override
    public MonitorStatus monitor() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new MonitorStatus(name, Outcome.DOWN);
    }

}

Execution domain

package execution;

public class MonitorStatus {

    public enum Outcome {
        UP, DOWN, FAILED, CANCELLED;
    }

    private final String  subject;

    private final Outcome type;

    public MonitorStatus(final String subject, final Outcome type) {
        this.subject = subject;
        this.type = type;
    }

    public Outcome type() {
        return type;
    }

    @Override
    public String toString() {
        return String.format("{subject='%s',type='%s'}", subject, type);
    }

}
package execution;

import java.util.concurrent.Callable;

import monitor.Monitor;

public class MonitorTask implements Callable<MonitorStatus> {

    private final Monitor monitor;

    public MonitorTask(final Monitor monitor) {
        this.monitor = monitor;
    }

    @Override
    public MonitorStatus call() throws Exception {
        return monitor.monitor();
    }

    public String name() {
        return monitor.subject();
    }

}

Monitor service

Monitor service interface
package service;

import java.util.List;
import java.util.concurrent.TimeUnit;

import monitor.Monitor;
import execution.MonitorStatus;

public interface MonitorService {

    List<MonitorStatus> monitor(final List<Monitor> monitors, final long timeout,
            final TimeUnit unit) throws InterruptedException;

}
Monitor service implementation using ExecutorService
package service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import monitor.DatabaseMonitor;
import monitor.Monitor;
import monitor.RemoteServiceMonitor;
import monitor.SharedDriveMonitor;
import execution.MonitorStatus;
import execution.MonitorTask;
import execution.MonitorStatus.Outcome;

public class MonitorServiceTimedExecutorServiceImpl implements MonitorService {

    public static final long             EXECUTION_TIMEOUT      = 2000;
    public static final TimeUnit         EXECUTION_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;

    private static final ExecutorService executor               = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        List<Monitor> monitors = new ArrayList<Monitor>();
        monitors.add(new DatabaseMonitor("DATABASE"));
        monitors.add(new RemoteServiceMonitor("REMOTE_SERVICE"));
        monitors.add(new SharedDriveMonitor("SHARED_DRIVE"));
        List<MonitorStatus> statuses;
        try {
            statuses = new MonitorServiceTimedExecutorServiceImpl().monitor(monitors,
                    EXECUTION_TIMEOUT, EXECUTION_TIMEOUT_UNIT);
            for (MonitorStatus status : statuses) {
                System.out.println(status);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public List<MonitorStatus> monitor(final List<Monitor> monitors, final long timeout,
            final TimeUnit timeUnit) throws InterruptedException {

        /*
         * validate
         */
        if (monitors == null || monitors.size() == 0)
            throw new IllegalArgumentException("no monitors supplied!");
        if (timeUnit == null) {
            throw new NullPointerException("time unit is null!");
        }

        /*
         * create tasks
         */
        List<MonitorTask> tasks = new ArrayList<MonitorTask>(monitors.size());
        for (Monitor monitor : monitors) {
            tasks.add(new MonitorTask(monitor));
        }

        /*
         * execute tasks with timeout
         */
        List<Future<MonitorStatus>> futures = executor.invokeAll(tasks, timeout, timeUnit);
        List<MonitorStatus> statuses = new ArrayList<MonitorStatus>(tasks.size());
        Iterator<MonitorTask> taskIter = tasks.iterator();

        /*
         * check progress
         */
        for (Future<MonitorStatus> future : futures) {

            MonitorTask task = taskIter.next();
            try {
                statuses.add(future.get());
            } catch (ExecutionException e) {
                statuses.add(new MonitorStatus(task.name(), Outcome.FAILED));
            } catch (CancellationException e) {
                statuses.add(new MonitorStatus(task.name(), Outcome.CANCELLED));
            }
        }

        return statuses;
    }

}

Output

{subject='DATABASE',type='UP'}
{subject='REMOTE_SERVICE',type='DOWN'}
{subject='SHARED_DRIVE',type='CANCELLED'}

Note above that the first two monitors executed successfully as they do not take more time than is specified in the timeout. However the third monitor exceeds the specified timeout and, as a result, is automatically cancelled by the executor service. You are able to tell what happened in the execution of every individual callable from the available contextual information.

Conclusion

The above idiom allows you to execute tasks concurrently with a safe timeout constraint and also allows you to access contextual data from about your callable allowing you to find out which particular callables failed or timed out. For brevity other implementations have been omitted. An implementation with executor completion service has deliberately not been done as its features are not necessary here.

Download

The sample code is available for download as an eclipse project.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s