ZK queue

Queue

  • Define

static public class Queue extends SyncPrimitive

class define in SyncPrimitive, it’s inner static class.

same as the previous article ZK barrier

  • Properties
    no properties

  • Constructor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    Queue(String address, String name) {
    super(address);
    this.root = name;
    // Create ZK node name
    if (zk != null) {
    try {
    Stat s = zk.exists(root, false);
    if (s == null) {
    zk.create(root, new byte[0],
    Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT);
    }
    } catch (KeeperException e) {
    ...
    } catch (InterruptedException e) {
    ...
    }
    }
    }

the constructor is not same as barrier, it no need common path.

  • push
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    boolean produce(int i) throws KeeperException, InterruptedException{
    ByteBuffer b = ByteBuffer.allocate(4);
    byte[] value;

    // Add child with value i
    b.putInt(i);
    value = b.array();
    zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT_SEQUENTIAL);
    return true;
    }

create a node with value.

  • pop
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    int consume() throws KeeperException, InterruptedException{
    int retvalue = -1;
    Stat stat = null;

    // Get the first element available
    while (true) {
    synchronized (mutex) {
    List<String> list = zk.getChildren(root, true);
    if (list.size() == 0) {
    System.out.println("Going to wait");
    mutex.wait();
    } else {
    String min = list.get(0).substring(7);
    for(String s : list){
    String tempValue = s.substring(7);
    min = tempValue.compareTo(min) < 0? tempValue: min;
    }

    System.out.println("Temporary value: " +
    root + "/element" + min);
    byte[] b = zk.getData(root +
    "/element" + min, false, stat);
    zk.delete(root + "/element" + min, 0);
    ByteBuffer buffer = ByteBuffer.wrap(b);
    retvalue = buffer.getInt();

    return retvalue;
    }
    }
    }
    }

Synchronously pop the minimum node name’s node.

Test queue

  • a producer

    1
    2
    3
    4
    5
    6
    7
    final String[] args = {"qTest", "192.168.3.11:2181", "7", "p"};
    new Thread(new Runnable(){
    @Override
    public void run() {
    queueTest(args);
    }
    }).start();
  • few customer

1
2
3
4
5
6
7
8
9
final String[] args = {"qTest", "192.168.3.11:2181", "3", "c"};
for (int i=0; i<Integer.valueOf(args[2]); i++) {
new Thread(new Runnable(){
@Override
public void run() {
queueTest(args);
}
}).start();
}

Thinking

Queue’s sequential rely on the node name’s sequential.
If nodes’ name sequential messed up, the queue is messed as well.

The queue rely on name form, so the data’s key must all be fetch from zookeeper.
If the key quantity is big, the I/O cost well be appreciable.