ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java – 按顺序处理异步事件和发布结果

2019-06-24 20:57:31  阅读:301  来源: 互联网

标签:java asynchronous event-handling multithreading priority-queue


我的目标是按顺序发布异步事件,这些事件也按顺序到达并花费任意时间进行处理.所以下面是我目前的实现只使用wait和notify. MyThread处理事件,通过id将结果放入哈希表中,如果在按顺序发布此事件之前被阻塞,则通知Scheduler线程.

使用java.util.concurrent包实现此功能会有什么更好,更简洁的方法?

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;


public class AsyncHandler {
    private final Map<Integer, Object> locks = new ConcurrentHashMap<Integer, Object>();
    private final Map<Integer, Result> results = new ConcurrentHashMap<Integer, Result>();
    private static final Random rand = new Random();

    public AsyncHandler () {
        new Scheduler(this, locks, results).start();
    }

    public void handleEvent(Event event) {
        System.out.println("handleEvent(" + event.id + ")");
        new MyThread(this, event, locks, results).start();
    }

    public Result processEvent (Event event) {
        System.out.println("processEvent(" + event.id + ")");
        locks.put(event.id, new Object());

        try {
            Thread.sleep(rand.nextInt(10000));
        } catch (InterruptedException e) {
            System.out.println(e);
        }

        return new Result(event.id);
    }

    public void postProcessEvent (Result result) {
        System.out.println(result.id);
    }

    public static void main (String[] args) {
        AsyncHandler async = new AsyncHandler();

        for (int i = 0; i < 100; i++) {
            async.handleEvent(new Event(i));
        }
    }
}

class Event {
    int id;

    public Event (int id) {
        this.id = id;
    }
}

class Result {
    int id;

    public Result (int id) {
        this.id = id;
    }
}

class MyThread extends Thread {
    private final Event event;
    private final Map<Integer, Object> locks;
    private final Map<Integer, Result> results;
    private final AsyncHandler async;

    public MyThread (AsyncHandler async, Event event, Map<Integer, Object> locks, Map<Integer, Result> results) {
        this.async = async;
        this.event = event;
        this.locks = locks;
        this.results = results;
    }

    @Override
    public void run () {
        Result res = async.processEvent(event);
        results.put(event.id, res);

        Object lock = locks.get(event.id);

        synchronized (lock) {
            lock.notifyAll();
        }
    }
}

class Scheduler extends Thread {
    private int curId = 0;
    private final AsyncHandler async;
    private final Map<Integer, Object> locks;
    private final Map<Integer, Result> results;

    public Scheduler (AsyncHandler async, Map<Integer, Object> locks, Map<Integer, Result> results) {
        this.async = async;
        this.locks = locks;
        this.results = results;
    }

    @Override
    public void run () {
        while (true) {
            Result res = results.get(curId);
            if (res == null) {
                Object lock = locks.get(curId);

                //TODO: eliminate busy waiting
                if (lock == null) {
                    continue;
                }

                synchronized (lock) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        System.out.println(e);
                        System.exit(1);
                    }
                }
                res = results.get(curId);
            }

            async.postProcessEvent(res);
            results.remove(curId);
            locks.remove(curId);
            curId++;
        }
    }
}

解决方法:

是的并发库会简单得多.

ExecutorService旨在包装线程池和队列,以便为每个任务返回Future,并提供等待结果的任何线程.

如果您想按顺序处理结果,请按顺序处理未来结果的线程.

按顺序处理异步结果

public class Main {
    public static void main(String[] args) {
        Main main = new Main();
        for (int i = 0; i < 1000; i++) {
            final int finalI = i;
            main.submitTask(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    long millis = (long) (Math.pow(2000, Math.random()));
                    Thread.sleep(millis);
                    return millis;
                }
            }, new ResultHandler<Long>() {
                @Override
                public void onFuture(Future<Long> future) throws ExecutionException, InterruptedException {
                    System.out.println(new Date() + ": " + finalI + " - Slept for " + future.get() + " millis");
                }
            });
        }
        main.shutdown();
    }


    public interface ResultHandler<T> {
        void onFuture(Future<T> future) throws Exception;
    }

    private final ExecutorService pool = Executors.newFixedThreadPool(10);
    private final ExecutorService result = Executors.newSingleThreadExecutor();

    public synchronized <T> void submitTask(Callable<T> callable, final ResultHandler<T> resultHandler) {
        final Future<T> future = pool.submit(callable);
        result.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    resultHandler.onFuture(future);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void shutdown() {
        pool.shutdown();
        result.shutdown();
    }
}

版画

Wed Oct 02 16:32:07 CEST 2013: 0 - Slept for 1 millis
Wed Oct 02 16:32:07 CEST 2013: 1 - Slept for 1899 millis
Wed Oct 02 16:32:09 CEST 2013: 2 - Slept for 32 millis
Wed Oct 02 16:32:09 CEST 2013: 3 - Slept for 32 millis
Wed Oct 02 16:32:09 CEST 2013: 4 - Slept for 214 millis
Wed Oct 02 16:32:09 CEST 2013: 5 - Slept for 366 millis
... many deleted ...
Wed Oct 02 16:32:09 CEST 2013: 82 - Slept for 6 millis
Wed Oct 02 16:32:09 CEST 2013: 83 - Slept for 1636 millis
Wed Oct 02 16:32:10 CEST 2013: 84 - Slept for 44 millis
Wed Oct 02 16:32:10 CEST 2013: 85 - Slept for 1 millis

您可以看到,虽然某些任务比其他任务花费的时间长,但输出的顺序是添加任务的顺序.您还可以看到它在同一秒内处理许多任务(同时)

标签:java,asynchronous,event-handling,multithreading,priority-queue
来源: https://codeday.me/bug/20190624/1282079.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有