Message Queue

Queue - abstraction of a message queue

Synopsis

Example:

import messaging.queue as queue

mq = queue.new({"type":"Foo", ... options ...});
# is identical too
mq = queue.foo.Foo(... options ...);

Description

This module provides an abstraction of a message queue. Its only purpose is to offer a unified method to create a new queue. The functionality is implemented in child modules such as messaging.queue.dqs.DQS.

Copyright (C) 2013-2016 CERN

messaging.queue.new(option)

Create a new message queue object; options must contain the type of queue (which is the name of the child class), see above.

Directory Queue Normal

DQN - abstraction of a dirq.queue.Queue message queue.

Synopsis

Example:

from messaging.message import Message
from messaging.queue.dqn import DQN

# create a message queue
mq = DQN(path = "/some/where")

# add a message to the queue
msg = Message(body="hello world")
print("msg added as %s" % mq.add_message(msg))

# browse the queue
for name in mq:
    if mq.lock(name):
        msg = mq.get_message(name)
        # unlock the element
        mq.unlock(name)
        # othwerwise, if you want to remove the element
        # mq.remove(name)

Description

This module provides an abstraction of a message queue. It derives from the dirq.queue.Queue module that provides a generic directory-based queue.

It uses the following dirq.queue.Queue schema to store a message:

schema = {
    "header" = "table",
    "binary" = "binary?",
    "text"   = "string?",
}

The message header is therefore stored as a table and the message body is stored either as a text or binary string.

Copyright (C) 2013-2016 CERN

class messaging.queue.dqn.DQN(**data)

Abstraction of a Normal Queue message queue.

add_message(msg)

Add the given message (a messaging.message.Message object) to the queue and return the corresponding element name.

Raise:
TypeError if the parameter is not a messaging.message.Message.
dequeue_message(element)

Dequeue the message from the given element and return a messaging.message.Message object.

Raise:
TypeError if the parameter is not a string.
get_message(element)

Get the message from the given element (which must be locked) and return a messaging.message.Message object.

Directory Queue Simple

DQS - abstraction of a dirq.QueueSimple.QueueSimple message queue.

Synopsis

Example:

from messaging.message import Message
from messaging.queue.dqs import DQS

# create a message queue
mq = DQS(path = "/some/where")

# add a message to the queue
msg = Message(body = "hello world")
print("msg added as %s" % mq.add_message(msg))

# browse the queue
for name in mq:
    if mq.lock(name):
        msg = mq.get_message(name)
        # unlock the element
        mq.unlock(name)
        # othwerwise, if you want to remove the element
        # mq.remove(name)

Description

This module provides an abstraction of a message queue. It derives from the dirq.QueueSimple.QueueSimple module that provides a generic directory-based queue.

It simply stores the serialized message (with optional compression) as a dirq.QueueSimple.QueueSimple element.

Copyright (C) 2013-2016 CERN

class messaging.queue.dqs.DQS(**data)

Abstraction of a dirq.QueueSimple.QueueSimple message queue.

add_message(msg)

Add the given message (a messaging.message.Message object) to the queue and return the corresponding element name.

Raise:
TypeError if the parameter is not a messaging.message.Message.
get_message(element)

Dequeue the message from the given element and return a messaging.message.Message object.