Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
expose Stat for writes
  • Loading branch information
jzillmann committed May 2, 2012
1 parent 15a1464 commit e72d8fd
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/I0Itec/zkclient/IZkConnection.java
Expand Up @@ -39,7 +39,7 @@ public interface IZkConnection {

public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException;

public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException;
public Stat writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException;

public States getZookeeperState();

Expand Down
37 changes: 30 additions & 7 deletions src/main/java/org/I0Itec/zkclient/InMemoryConnection.java
Expand Up @@ -43,8 +43,23 @@

public class InMemoryConnection implements IZkConnection {

public static class DataAndVersion {
private byte[] _data;
private int _version;
public DataAndVersion(byte[] data, int version) {
_data = data;
_version = version;
}
public byte[] getData() {
return _data;
}
public int getVersion() {
return _version;
}
}

private Lock _lock = new ReentrantLock(true);
private Map<String, byte[]> _data = new HashMap<String, byte[]>();
private Map<String, DataAndVersion> _data = new HashMap<String, DataAndVersion>();
private Map<String, Long> _creationTime = new HashMap<String, Long>();
private final AtomicInteger sequence = new AtomicInteger(0);

Expand Down Expand Up @@ -130,7 +145,7 @@ public String create(String path, byte[] data, CreateMode mode) throws KeeperExc
if (exists(path, false)) {
throw new KeeperException.NodeExistsException();
}
_data.put(path, data);
_data.put(path, new DataAndVersion(data, 0));
_creationTime.put(path, System.currentTimeMillis());
checkWatch(_nodeWatches, path, EventType.NodeCreated);
// we also need to send a child change event for the parent
Expand Down Expand Up @@ -235,32 +250,40 @@ public byte[] readData(String path, Stat stat, boolean watch) throws KeeperExcep
}
_lock.lock();
try {
byte[] bs = _data.get(path);
if (bs == null) {
DataAndVersion dataAndVersion = _data.get(path);
if (dataAndVersion == null) {
throw new ZkNoNodeException(new KeeperException.NoNodeException());
}
byte[] bs = dataAndVersion.getData();
if (stat != null)
stat.setVersion(dataAndVersion.getVersion());
return bs;
} finally {
_lock.unlock();
}
}

@Override
public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException {
_lock.lock();
public Stat writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException {
int newVersion=-1;
_lock.lock();
try {
checkWatch(_dataWatches, path, EventType.NodeDataChanged);
if (!exists(path, false)) {
throw new KeeperException.NoNodeException();
}
_data.put(path, data);
newVersion = _data.get(path).getVersion() + 1;
_data.put(path, new DataAndVersion(data, newVersion));
String parentPath = getParentPath(path);
if (parentPath != null) {
checkWatch(_nodeWatches, parentPath, EventType.NodeChildrenChanged);
}
} finally {
_lock.unlock();
}
Stat stat = new Stat();
stat.setVersion(newVersion);
return stat;
}

private void checkWatch(Set<String> watches, String path, EventType eventType) {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/I0Itec/zkclient/ZkClient.java
Expand Up @@ -773,8 +773,8 @@ public byte[] call() throws Exception {
return (T) derializable(data);
}

public void writeData(String path, Object object) {
writeData(path, object, -1);
public Stat writeData(String path, Object object) {
return writeData(path, object, -1);
}

/**
Expand Down Expand Up @@ -804,14 +804,14 @@ public <T extends Object> void updateDataSerialized(String path, DataUpdater<T>
} while (retry);
}

public void writeData(final String path, Object datat, final int expectedVersion) {
public Stat writeData(final String path, Object datat, final int expectedVersion) {
final byte[] data = serialize(datat);
retryUntilConnected(new Callable<Object>() {
return (Stat) retryUntilConnected(new Callable<Object>() {

@Override
public Object call() throws Exception {
_connection.writeData(path, data, expectedVersion);
return null;
Stat stat =_connection.writeData(path, data, expectedVersion);
return stat;
}
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/I0Itec/zkclient/ZkConnection.java
Expand Up @@ -103,12 +103,12 @@ public byte[] readData(String path, Stat stat, boolean watch) throws KeeperExcep
return _zk.getData(path, watch, stat);
}

public void writeData(String path, byte[] data) throws KeeperException, InterruptedException {
writeData(path, data, -1);
public Stat writeData(String path, byte[] data) throws KeeperException, InterruptedException {
return writeData(path, data, -1);
}

public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
_zk.setData(path, data, version);
public Stat writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
return _zk.setData(path, data, version);
}

public States getZookeeperState() {
Expand Down

0 comments on commit e72d8fd

Please sign in to comment.