Zookeeper Example

https://zookeeper.apache.org/doc/r3.5.2-alpha/javaExample.html

Executor

1
2
3
public class Executor
implements Watcher, Runnable,
DataMonitor.DataMonitorListener

Declare class first, implements Zookeeper Watcher
, thread Runnable and DataMonitor’s inner interface Listener.


1
2
3
4
5
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}

Arguments sequence is

  1. host:port
  2. znode path
  3. process output file path
  4. process command array

1
2
3
4
5
6
7
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}

In constuctor, Executor use it’s Object self as a parameter.
ZooKeeper need a Watch in third parameter, it triger
Executor’s implements to triger Monitor again.

DataMonitor need a DataMonitorListener to know what
operation need to notify when event happen.


1
2
3
4
5
6
7
8
9
10
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}

It’s Runnable implements.
When Executor be created and run in main, it check monitor’s status.
When first initiated, monitor dead should in false, main thread will wait for zookeeper client to wake it up.


1
2
3
4
5
6
7
8
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}

local process executed here, it’s output will be write into file path.


feedbackTarget.sh

1
2
3
4
5
6
7
8
#!/bin/bash
while true
do
echo "hello zookeeper" >&1
echo "hello error" >&2
sleep 3s
done
exit 0

192.168.3.11:2181 /key ./example/leon sh ./example/feedbackTarget.sh

DataMonitor

1
public class DataMonitor implements Watcher, StatCallback

implements Watcher as a watcher, but not use as watcher.
implements StatCallback, zookeeper client should callback this monitor when node status changed.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case SyncConnected:
break;
case Expired:
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}

Monitor feedback what it seen to the listener,
and register the next chainedWatcher if exitst.


1
2
3
4
5
6
public void processResult(int rc, String path, Object ctx, Stat stat) {
...
b = zk.getData(znode, false, null);
...
listener.exists(b);
}

Put zookeeper client’s callback to executor.