Errai: The browser as a platform

Sunday, April 25, 2010

Introducing new asynchronous task APIs

One of the things that we've come to realize while developing Errai is that when you work with a framework such as Errai, where push messaging is basically free, easy and awesome -- you end up wanting to do things like stream live data across the wire. Stuff like stock quotes, news feeds, twitter feeds, clocks, weather patterns, and the migration activity of pigeons.

A lot of our demos have involved creating threads and pushing data across completely asynchronously. And it's always messy code. You need to worry about managing those threads, making sure they die when the session dies, or when the subject is unsubscribed, etc. Having to worry about all this creates security problems, resource management issues, and it makes your code messy.

Well, worry no longer! The latest commit into trunk introduces a new comprehensive (and simple) way of creating asynchronously running tasks -- as part of the the standard MessageBuilder API.

Take our TimeDisplay demo, where we stream a bunch of updates from the server to the client containing System.currentTimeMillis() results.

Up until now, the demo consisted of a thread that looped around and around and built new messages to send. Through the addition of a new API extensions, this demo is greatly simplified.

The first addition is some helper classes that help you create managed contexts to store stuff in the session. One is called SessionContext, and the other is called LocalContext. SessionContext allows you to create session scoped attributes, and LocalContext lets you create locally-scoped, as in, page-scoped. So if a user has multiple browser windows open, or multiple tabs, each window or tab is it's own LocalContext. This is a pretty powerful little tool.

The second addition is the implementation of what I'm calling provided message parts. Unlike regular message parts, these parts are resolved via providers at the time of transmission. This is a key aspect of what we're about to show below, as it creates message re-usability.

The third addition is the implementation of a repeating and delayed message transmission calls as part of the standard messaging API.

Let's take a look at the example:


AsyncTask task = MessageBuilder.createConversation(message)
    .toSubject("TimeChannel").signalling()
    .withProvided(TimeServerParts.TimeString, new <>ResourceProvider() {
        public String get() {
            return String.valueOf(System.currentTimeMillis());
        }
    }).noErrorHandling().replyRepeating(TimeUnit.MILLISECONDS, 100);



In this example, we create a conversational message which replies not just once, but replies continuously. Once every 100 milliseconds as it would turn out. The replyRepeating() and replyDelayed(), sendRepeating() and sendDelayed() methods all return an instance of AsyncTask, which is a handle on the task being performed. You can use this object to cancel the task.

Doing so is pretty easy.

    task.cancel(true)

Knowing all this, which isn't very much -- and that's the cool part -- let's put it all together:


@Service("TimeServer")
@RequireAuthentication
public class TimeDisplay implements MessageCallback {
    private MessageBus bus;

    @Inject
    public TimeDisplay(MessageBus bus) {
        this.bus = bus;
    }

    public void callback(final Message message) {
        if (message.getCommandType() == null) return;

        /**
         * Create a local context to store state that is unique to this client instance. (not session wide).
         */

        final LocalContext context = LocalContext.get(message);

        /**
         * Switch on the TimeServerCommand type provided
         */

        switch (TimeServerCommands.valueOf(message.getCommandType())) {
            case Start:
                /**
                 * We want to start streaming.
                 */

                AsyncTask task = MessageBuilder.createConversation(message)
                        .toSubject("TimeChannel").signalling()
                        .withProvided(TimeServerParts.TimeString, new <>ResourceProvider() {
                            public String get() {
                                return String.valueOf(System.currentTimeMillis());
                            }
                        }).noErrorHandling().replyRepeating(TimeUnit.MILLISECONDS, 100);

                /**
                 * Store the task as an attribute uniquely identified by it's class type.
                 */

                context.setAttribute(AsyncTask.class, task);

                /**
                 * Create a listener that will kill the task gracefully if the subject is unsubscribed.  This
                 * isn't 100% necessary, as the task will be auto-killed ungracefully.  But this provides
                 * and opportunity to clean up after ourselves.
                 */

                bus.addUnsubscribeListener(new UnsubscribeListener() {
                    public void onUnsubscribe(SubscriptionEvent event) {
                        if ("TimeChannel".equals(event.getSubject())) {
                            /**
                             * Delete this listener after this execution.
                             */

                            event.setDisposeListener(true);

                            /**
                             * Stop the task from running.
                             */

                            context.getAttribute(AsyncTask.class).cancel(true);

                            /**
                             * Destroy the local context.  Sort of unnecessary, but helps reduce memory usage.
                             */

                            context.destroy();
                        }
                    }
                });
                break;

            case Stop:
                /**
                 * Access our stored AsyncTask from this instance and cancel it.
                 */

                context.getAttribute(AsyncTask.class).cancel(true);

                /**
                 * Destroy the local context.  Sort of unnecessary, but helps reduce memory usage.
                 */

                context.destroy();
                break;
        }
    }
}



That's all there is to it. It's pretty sweet. This API works on both the client and the server side. All the thread scheduling is all transparently managed by an executor service on the server, and by a simple Timer based implementation in the client. I'm hoping people will find this a welcome addition to Errai-land.

There will be more details coming as I iron out the bugs. This code is all new, so proceed at your own risk and all that stuff.

Tuesday, April 20, 2010