Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,16 +15,17 @@ | |
*/ | ||
package io.netty.channel; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.net.SocketAddress; | ||
import java.nio.channels.NotYetConnectedException; | ||
import java.nio.channels.SelectionKey; | ||
|
||
import io.netty.channel.socket.DatagramChannel; | ||
import io.netty.channel.socket.ServerSocketChannel; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioSocketChannelConfig; | ||
|
||
import java.io.IOException; | ||
import java.net.InetSocketAddress; | ||
import java.net.SocketAddress; | ||
import java.nio.channels.NotYetConnectedException; | ||
import java.nio.channels.SelectionKey; | ||
|
||
|
||
/** | ||
* A nexus to a network socket or a component which is capable of I/O | ||
|
@@ -136,6 +137,8 @@ public interface Channel extends Comparable<Channel> { | |
*/ | ||
Integer getId(); | ||
|
||
EventLoop eventLoop(); | ||
|
||
/** | ||
* Returns the {@link ChannelFactory} which created this channel. | ||
*/ | ||
|
@@ -372,4 +375,21 @@ public interface Channel extends Comparable<Channel> { | |
* Attaches an object to this {@link Channel} to store a stateful information | ||
*/ | ||
void setAttachment(Object attachment); | ||
|
||
Unsafe unsafe(); | ||
|
||
public interface Unsafe { | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
trustin
Author
Member
|
||
void setEventLoop(EventLoop eventLoop); | ||
void clearEventLoop(); | ||
java.nio.channels.Channel ch(); | ||
|
||
void bind(SocketAddress local) throws IOException; | ||
void connect(SocketAddress remote) throws IOException; | ||
boolean finishConnect() throws IOException; | ||
boolean read() throws IOException; | ||
boolean write() throws IOException; | ||
void unbind() throws IOException; | ||
void disconnect() throws IOException; | ||
void close() throws IOException; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package io.netty.channel; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
|
||
public interface EventLoop extends ExecutorService { | ||
ChannelFuture attach(Channel channel); | ||
void attach(Channel channel, ChannelFuture future); | ||
This comment has been minimized.
Sorry, something went wrong.
normanmaurer
Member
|
||
boolean inEventLoop(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
package io.netty.channel; | ||
|
||
import io.netty.util.internal.QueueFactory; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.AbstractExecutorService; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { | ||
|
||
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue(Runnable.class); | ||
private final Thread thread; | ||
private final Object stateLock = new Object(); | ||
private final Semaphore threadLock = new Semaphore(0); | ||
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ | ||
private volatile int state; | ||
|
||
protected SingleThreadEventLoop() { | ||
this(Executors.defaultThreadFactory()); | ||
} | ||
|
||
protected SingleThreadEventLoop(ThreadFactory threadFactory) { | ||
thread = threadFactory.newThread(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
SingleThreadEventLoop.this.run(); | ||
} finally { | ||
synchronized (stateLock) { | ||
state = 3; | ||
} | ||
try { | ||
cleanup(); | ||
} finally { | ||
threadLock.release(); | ||
assert taskQueue.isEmpty(); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
|
||
public ChannelFuture attach(Channel channel) { | ||
ChannelFuture future = new DefaultChannelFuture(channel, false); | ||
attach(channel, future); | ||
return future; | ||
} | ||
|
||
protected void interruptThread() { | ||
thread.interrupt(); | ||
} | ||
|
||
protected Runnable pollTask() { | ||
assert inEventLoop(); | ||
return taskQueue.poll(); | ||
} | ||
|
||
protected Runnable takeTask() throws InterruptedException { | ||
assert inEventLoop(); | ||
return taskQueue.take(); | ||
} | ||
|
||
protected Runnable peekTask() { | ||
assert inEventLoop(); | ||
return taskQueue.peek(); | ||
} | ||
|
||
protected boolean hasTasks() { | ||
assert inEventLoop(); | ||
return !taskQueue.isEmpty(); | ||
} | ||
|
||
protected void addTask(Runnable task) { | ||
if (task == null) { | ||
throw new NullPointerException("task"); | ||
} | ||
if (isShutdown()) { | ||
reject(); | ||
} | ||
taskQueue.add(task); | ||
} | ||
|
||
protected boolean removeTask(Runnable task) { | ||
if (task == null) { | ||
throw new NullPointerException("task"); | ||
} | ||
return taskQueue.remove(task); | ||
} | ||
|
||
protected abstract void run(); | ||
|
||
protected void cleanup() { | ||
// Do nothing. Subclasses will override. | ||
} | ||
|
||
protected abstract void wakeup(boolean inEventLoop); | ||
|
||
@Override | ||
public boolean inEventLoop() { | ||
return Thread.currentThread() == thread; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
boolean inEventLoop = inEventLoop(); | ||
boolean wakeup = false; | ||
if (inEventLoop) { | ||
synchronized (stateLock) { | ||
assert state == 1; | ||
state = 2; | ||
wakeup = true; | ||
} | ||
} else { | ||
synchronized (stateLock) { | ||
switch (state) { | ||
case 0: | ||
state = 3; | ||
try { | ||
cleanup(); | ||
} finally { | ||
threadLock.release(); | ||
} | ||
break; | ||
case 1: | ||
state = 2; | ||
wakeup = true; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
if (wakeup) { | ||
wakeup(inEventLoop); | ||
} | ||
} | ||
|
||
@Override | ||
public List<Runnable> shutdownNow() { | ||
shutdown(); | ||
return Collections.emptyList(); | ||
} | ||
|
||
@Override | ||
public boolean isShutdown() { | ||
return state >= 2; | ||
} | ||
|
||
@Override | ||
public boolean isTerminated() { | ||
return state == 3; | ||
} | ||
|
||
@Override | ||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { | ||
if (unit == null) { | ||
throw new NullPointerException("unit"); | ||
} | ||
|
||
if (inEventLoop()) { | ||
throw new IllegalStateException("cannot await termination of the current thread"); | ||
} | ||
|
||
if (threadLock.tryAcquire(timeout, unit)) { | ||
threadLock.release(); | ||
} | ||
|
||
return isTerminated(); | ||
} | ||
|
||
@Override | ||
public void execute(Runnable task) { | ||
if (task == null) { | ||
throw new NullPointerException("task"); | ||
} | ||
|
||
if (inEventLoop()) { | ||
if (isShutdown()) { | ||
reject(); | ||
} | ||
addTask(task); | ||
wakeup(true); | ||
} else { | ||
synchronized (stateLock) { | ||
if (state == 0) { | ||
state = 1; | ||
thread.start(); | ||
} | ||
} | ||
addTask(task); | ||
if (isShutdown() && removeTask(task)) { | ||
reject(); | ||
} | ||
wakeup(false); | ||
} | ||
} | ||
|
||
private static void reject() { | ||
throw new RejectedExecutionException("event loop shut down"); | ||
} | ||
} |
This file was deleted.
Unsafe is a bit strange name for it..,,