Skip to content

Commit

Permalink
Initial incomplete checkin of the event loop API
Browse files Browse the repository at this point in the history
  • Loading branch information
trustin committed Apr 3, 2012
1 parent d66cf2c commit 116054a
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 275 deletions.
30 changes: 25 additions & 5 deletions transport/src/main/java/io/netty/channel/Channel.java
Expand Up @@ -15,16 +15,17 @@
*/
package io.netty.channel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;

import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelConfig;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;


/**
* A nexus to a network socket or a component which is capable of I/O
Expand Down Expand Up @@ -136,6 +137,8 @@ public interface Channel extends Comparable<Channel> {
*/
Integer getId();

EventLoop eventLoop();

/**
* Returns the {@link ChannelFactory} which created this channel.
*/
Expand Down Expand Up @@ -372,4 +375,21 @@ public interface Channel extends Comparable<Channel> {
* Attaches an object to this {@link Channel} to store a stateful information
*/
void setAttachment(Object attachment);

Unsafe unsafe();

public interface Unsafe {

This comment has been minimized.

Copy link
@normanmaurer

normanmaurer Apr 3, 2012

Member

Unsafe is a bit strange name for it..,,

This comment has been minimized.

Copy link
@trustin

trustin Apr 3, 2012

Author Member

It provides a public set of operations that accesses JDK Channels directly. Things will go wrong if a user calls them. So, I named it unsafe. :-) As you might already noticed, it will replace JdkChannel. Let me know if you have better name.

This comment has been minimized.

Copy link
@normanmaurer

normanmaurer Apr 3, 2012

Member

Yeah I noticed ;) I wonder if we can hide it a bit more from the user.. Like just specify it in an abstract class and not the interface directly. I will think a bit about a better name.

void setEventLoop(EventLoop eventLoop);
void clearEventLoop();
java.nio.channels.Channel ch();

void bind(SocketAddress local) throws IOException;
void connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
boolean read() throws IOException;
boolean write() throws IOException;
void unbind() throws IOException;
void disconnect() throws IOException;
void close() throws IOException;
}
}
9 changes: 9 additions & 0 deletions transport/src/main/java/io/netty/channel/EventLoop.java
@@ -0,0 +1,9 @@
package io.netty.channel;

import java.util.concurrent.ExecutorService;

public interface EventLoop extends ExecutorService {
ChannelFuture attach(Channel channel);
void attach(Channel channel, ChannelFuture future);

This comment has been minimized.

Copy link
@normanmaurer

normanmaurer Apr 3, 2012

Member

isn't one of these methods in the interface enough ? Just to keep it "small"

This comment has been minimized.

Copy link
@trustin

trustin Apr 3, 2012

Author Member

The second one is provided for the two purposes:

  1. allow a user to specify one's own future implementation
  2. allow a user to specify null to avoid GC overhead

Removing the first one will make user code so bloated, so .. I just left all of them.

This comment has been minimized.

Copy link
@normanmaurer

normanmaurer Apr 3, 2012

Member

I see.. Thanks for explaination..

boolean inEventLoop();
}
206 changes: 206 additions & 0 deletions transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java
@@ -0,0 +1,206 @@
package io.netty.channel;

import io.netty.util.internal.QueueFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {

private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue(Runnable.class);
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;

protected SingleThreadEventLoop() {
this(Executors.defaultThreadFactory());
}

protected SingleThreadEventLoop(ThreadFactory threadFactory) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
SingleThreadEventLoop.this.run();
} finally {
synchronized (stateLock) {
state = 3;
}
try {
cleanup();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
});
}

public ChannelFuture attach(Channel channel) {
ChannelFuture future = new DefaultChannelFuture(channel, false);
attach(channel, future);
return future;
}

protected void interruptThread() {
thread.interrupt();
}

protected Runnable pollTask() {
assert inEventLoop();
return taskQueue.poll();
}

protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
return taskQueue.take();
}

protected Runnable peekTask() {
assert inEventLoop();
return taskQueue.peek();
}

protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}

protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}

protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
return taskQueue.remove(task);
}

protected abstract void run();

protected void cleanup() {
// Do nothing. Subclasses will override.
}

protected abstract void wakeup(boolean inEventLoop);

@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;
}

@Override
public void shutdown() {
boolean inEventLoop = inEventLoop();
boolean wakeup = false;
if (inEventLoop) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
try {
cleanup();
} finally {
threadLock.release();
}
break;
case 1:
state = 2;
wakeup = true;
break;
}
}
}

if (wakeup) {
wakeup(inEventLoop);
}
}

@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}

@Override
public boolean isShutdown() {
return state >= 2;
}

@Override
public boolean isTerminated() {
return state == 3;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}

if (inEventLoop()) {
throw new IllegalStateException("cannot await termination of the current thread");
}

if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}

return isTerminated();
}

@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

if (inEventLoop()) {
if (isShutdown()) {
reject();
}
addTask(task);
wakeup(true);
} else {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
wakeup(false);
}
}

private static void reject() {
throw new RejectedExecutionException("event loop shut down");
}
}
31 changes: 0 additions & 31 deletions transport/src/main/java/io/netty/channel/socket/Worker.java

This file was deleted.

0 comments on commit 116054a

Please sign in to comment.