Review Board 1.7.22


QPID-3950: Allow browsing of queues with exclusive subscriptions

Review Request #4744 - Created April 16, 2012 and submitted

Alan Conway
Reviewers
qpid
gordon
qpid
The C++ broker already allows browsing subscriptons to exclusive queues. To be consistent we need to extend this to queues with exclusive subscritions as well. This is required by the new HA plugin (QPID-3603) so that replicating subscriptions can browse from exclusive queues in order to replicate their messages to a backup broker.
make check
/trunk/qpid/cpp/src/qpid/broker/Queue.h
Revision 1326757 New Change
1
#ifndef _broker_Queue_h
1
#ifndef _broker_Queue_h
2
#define _broker_Queue_h
2
#define _broker_Queue_h
3

    
   
3

   
4
/*
4
/*
5
 *
5
 *
6
 * Licensed to the Apache Software Foundation (ASF) under one
6
 * Licensed to the Apache Software Foundation (ASF) under one
7
 * or more contributor license agreements.  See the NOTICE file
7
 * or more contributor license agreements.  See the NOTICE file
8
 * distributed with this work for additional information
8
 * distributed with this work for additional information
9
 * regarding copyright ownership.  The ASF licenses this file
9
 * regarding copyright ownership.  The ASF licenses this file
10
 * to you under the Apache License, Version 2.0 (the
10
 * to you under the Apache License, Version 2.0 (the
11
 * "License"); you may not use this file except in compliance
11
 * "License"); you may not use this file except in compliance
12
 * with the License.  You may obtain a copy of the License at
12
 * with the License.  You may obtain a copy of the License at
13
 *
13
 *
14
 *   http://www.apache.org/licenses/LICENSE-2.0
14
 *   http://www.apache.org/licenses/LICENSE-2.0
15
 *
15
 *
16
 * Unless required by applicable law or agreed to in writing,
16
 * Unless required by applicable law or agreed to in writing,
17
 * software distributed under the License is distributed on an
17
 * software distributed under the License is distributed on an
18
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19
 * KIND, either express or implied.  See the License for the
19
 * KIND, either express or implied.  See the License for the
20
 * specific language governing permissions and limitations
20
 * specific language governing permissions and limitations
21
 * under the License.
21
 * under the License.
22
 *
22
 *
23
 */
23
 */
24

    
   
24

   
25
#include "qpid/broker/BrokerImportExport.h"
25
#include "qpid/broker/BrokerImportExport.h"
26
#include "qpid/broker/OwnershipToken.h"
26
#include "qpid/broker/OwnershipToken.h"
27
#include "qpid/broker/Consumer.h"
27
#include "qpid/broker/Consumer.h"
28
#include "qpid/broker/Message.h"
28
#include "qpid/broker/Message.h"
29
#include "qpid/broker/Messages.h"
29
#include "qpid/broker/Messages.h"
30
#include "qpid/broker/PersistableQueue.h"
30
#include "qpid/broker/PersistableQueue.h"
31
#include "qpid/broker/QueuePolicy.h"
31
#include "qpid/broker/QueuePolicy.h"
32
#include "qpid/broker/QueueBindings.h"
32
#include "qpid/broker/QueueBindings.h"
33
#include "qpid/broker/QueueListeners.h"
33
#include "qpid/broker/QueueListeners.h"
34
#include "qpid/broker/QueueObserver.h"
34
#include "qpid/broker/QueueObserver.h"
35

    
   
35

   
36
#include "qpid/framing/FieldTable.h"
36
#include "qpid/framing/FieldTable.h"
37
#include "qpid/sys/AtomicValue.h"
37
#include "qpid/sys/AtomicValue.h"
38
#include "qpid/sys/Monitor.h"
38
#include "qpid/sys/Monitor.h"
39
#include "qpid/sys/Timer.h"
39
#include "qpid/sys/Timer.h"
40
#include "qpid/management/Manageable.h"
40
#include "qpid/management/Manageable.h"
41
#include "qmf/org/apache/qpid/broker/Queue.h"
41
#include "qmf/org/apache/qpid/broker/Queue.h"
42
#include "qmf/org/apache/qpid/broker/Broker.h"
42
#include "qmf/org/apache/qpid/broker/Broker.h"
43
#include "qpid/framing/amqp_types.h"
43
#include "qpid/framing/amqp_types.h"
44

    
   
44

   
45
#include <boost/shared_ptr.hpp>
45
#include <boost/shared_ptr.hpp>
46
#include <boost/intrusive_ptr.hpp>
46
#include <boost/intrusive_ptr.hpp>
47
#include <boost/enable_shared_from_this.hpp>
47
#include <boost/enable_shared_from_this.hpp>
48

    
   
48

   
49
#include <list>
49
#include <list>
50
#include <vector>
50
#include <vector>
51
#include <memory>
51
#include <memory>
52
#include <deque>
52
#include <deque>
53
#include <set>
53
#include <set>
54
#include <algorithm>
54
#include <algorithm>
55

    
   
55

   
56
namespace qpid {
56
namespace qpid {
57
namespace broker {
57
namespace broker {
58
class Broker;
58
class Broker;
59
class MessageStore;
59
class MessageStore;
60
class QueueEvents;
60
class QueueEvents;
61
class QueueRegistry;
61
class QueueRegistry;
62
class TransactionContext;
62
class TransactionContext;
63
class MessageDistributor;
63
class MessageDistributor;
64

    
   
64

   
65
/**
65
/**
66
 * The brokers representation of an amqp queue. Messages are
66
 * The brokers representation of an amqp queue. Messages are
67
 * delivered to a queue from where they can be dispatched to
67
 * delivered to a queue from where they can be dispatched to
68
 * registered consumers or be stored until dequeued or until one
68
 * registered consumers or be stored until dequeued or until one
69
 * or more consumers registers.
69
 * or more consumers registers.
70
 */
70
 */
71
class Queue : public boost::enable_shared_from_this<Queue>,
71
class Queue : public boost::enable_shared_from_this<Queue>,
72
              public PersistableQueue, public management::Manageable {
72
              public PersistableQueue, public management::Manageable {
73

    
   
73

   
74
    struct UsageBarrier
74
    struct UsageBarrier
75
    {
75
    {
76
        Queue& parent;
76
        Queue& parent;
77
        uint count;
77
        uint count;
78

    
   
78

   
79
        UsageBarrier(Queue&);
79
        UsageBarrier(Queue&);
80
        bool acquire();
80
        bool acquire();
81
        void release();
81
        void release();
82
        void destroy();
82
        void destroy();
83
    };
83
    };
84

    
   
84

   
85
    struct ScopedUse
85
    struct ScopedUse
86
    {
86
    {
87
        UsageBarrier& barrier;
87
        UsageBarrier& barrier;
88
        const bool acquired;
88
        const bool acquired;
89
        ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
89
        ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
90
        ~ScopedUse() { if (acquired) barrier.release(); }
90
        ~ScopedUse() { if (acquired) barrier.release(); }
91
    };
91
    };
92

    
   
92

   
93
    typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
93
    typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
94
    enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
94
    enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
95

    
   
95

   
96
    const std::string name;
96
    const std::string name;
97
    const bool autodelete;
97
    const bool autodelete;
98
    MessageStore* store;
98
    MessageStore* store;
99
    const OwnershipToken* owner;
99
    const OwnershipToken* owner;
100
    uint32_t consumerCount;
100
    uint32_t consumerCount;     // Actually a count of all subscriptions, acquiring or not.

    
   
101
    uint32_t browserCount;      // Count of non-acquiring subscriptions.
101
    OwnershipToken* exclusive;
102
    OwnershipToken* exclusive;
102
    bool noLocal;
103
    bool noLocal;
103
    bool persistLastNode;
104
    bool persistLastNode;
104
    bool inLastNodeFailure;
105
    bool inLastNodeFailure;
105
    std::string traceId;
106
    std::string traceId;
106
    std::vector<std::string> traceExclude;
107
    std::vector<std::string> traceExclude;
107
    QueueListeners listeners;
108
    QueueListeners listeners;
108
    std::auto_ptr<Messages> messages;
109
    std::auto_ptr<Messages> messages;
109
    std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
110
    std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
110
    /** messageLock is used to keep the Queue's state consistent while processing message
111
    /** messageLock is used to keep the Queue's state consistent while processing message
111
     * events, such as message dispatch, enqueue, acquire, and dequeue.  It must be held
112
     * events, such as message dispatch, enqueue, acquire, and dequeue.  It must be held
112
     * while updating certain members in order to keep these members consistent with
113
     * while updating certain members in order to keep these members consistent with
113
     * each other:
114
     * each other:
114
     *     o  messages
115
     *     o  messages
115
     *     o  sequence
116
     *     o  sequence
116
     *     o  policy
117
     *     o  policy
117
     *     o  listeners
118
     *     o  listeners
118
     *     o  allocator
119
     *     o  allocator
119
     *     o  observeXXX() methods
120
     *     o  observeXXX() methods
120
     *     o  observers
121
     *     o  observers
121
     *     o  pendingDequeues  (TBD: move under separate lock)
122
     *     o  pendingDequeues  (TBD: move under separate lock)
122
     *     o  exclusive OwnershipToken (TBD: move under separate lock)
123
     *     o  exclusive OwnershipToken (TBD: move under separate lock)
123
     *     o  consumerCount  (TBD: move under separate lock)
124
     *     o  consumerCount  (TBD: move under separate lock)
124
     *     o  Queue::UsageBarrier (TBD: move under separate lock)
125
     *     o  Queue::UsageBarrier (TBD: move under separate lock)
125
     */
126
     */
126
    mutable qpid::sys::Monitor messageLock;
127
    mutable qpid::sys::Monitor messageLock;
127
    mutable qpid::sys::Mutex ownershipLock;
128
    mutable qpid::sys::Mutex ownershipLock;
128
    mutable uint64_t persistenceId;
129
    mutable uint64_t persistenceId;
129
    framing::FieldTable settings;
130
    framing::FieldTable settings;
130
    std::auto_ptr<QueuePolicy> policy;
131
    std::auto_ptr<QueuePolicy> policy;
131
    bool policyExceeded;
132
    bool policyExceeded;
132
    QueueBindings bindings;
133
    QueueBindings bindings;
133
    std::string alternateExchangeName;
134
    std::string alternateExchangeName;
134
    boost::shared_ptr<Exchange> alternateExchange;
135
    boost::shared_ptr<Exchange> alternateExchange;
135
    framing::SequenceNumber sequence;
136
    framing::SequenceNumber sequence;
136
    qmf::org::apache::qpid::broker::Queue* mgmtObject;
137
    qmf::org::apache::qpid::broker::Queue* mgmtObject;
137
    qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
138
    qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
138
    sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
139
    sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
139
    int eventMode;
140
    int eventMode;
140
    Observers observers;
141
    Observers observers;
141
    bool insertSeqNo;
142
    bool insertSeqNo;
142
    std::string seqNoKey;
143
    std::string seqNoKey;
143
    Broker* broker;
144
    Broker* broker;
144
    bool deleted;
145
    bool deleted;
145
    UsageBarrier barrier;
146
    UsageBarrier barrier;
146
    int autoDeleteTimeout;
147
    int autoDeleteTimeout;
147
    boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
148
    boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
148
    boost::shared_ptr<MessageDistributor> allocator;
149
    boost::shared_ptr<MessageDistributor> allocator;
149

    
   
150

   
150
    void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
151
    void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
151
    void setPolicy(std::auto_ptr<QueuePolicy> policy);
152
    void setPolicy(std::auto_ptr<QueuePolicy> policy);
152
    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
153
    bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
153
    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
154
    ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
154
    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
155
    bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
155
    void notifyListener();
156
    void notifyListener();
156

    
   
157

   
157
    void removeListener(Consumer::shared_ptr);
158
    void removeListener(Consumer::shared_ptr);
158

    
   
159

   
159
    bool isExcluded(boost::intrusive_ptr<Message>& msg);
160
    bool isExcluded(boost::intrusive_ptr<Message>& msg);
160

    
   
161

   
161
    /** update queue observers, stats, policy, etc when the messages' state changes.
162
    /** update queue observers, stats, policy, etc when the messages' state changes.
162
     * messageLock is held by caller */
163
     * messageLock is held by caller */
163
    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
164
    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
164
    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
165
    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
165
    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
166
    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
166
    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
167
    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
167
    void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
168
    void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
168
    void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
169
    void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
169

    
   
170

   
170
    bool popAndDequeue(QueuedMessage&);
171
    bool popAndDequeue(QueuedMessage&);
171
    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
172
    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
172
    void forcePersistent(QueuedMessage& msg);
173
    void forcePersistent(QueuedMessage& msg);
173
    int getEventMode();
174
    int getEventMode();
174
    void configureImpl(const qpid::framing::FieldTable& settings);
175
    void configureImpl(const qpid::framing::FieldTable& settings);
175
    void checkNotDeleted(const Consumer::shared_ptr& c);
176
    void checkNotDeleted(const Consumer::shared_ptr& c);
176
    void notifyDeleted();
177
    void notifyDeleted();
177

    
   
178

   
178
  public:
179
  public:
179

    
   
180

   
180
    typedef boost::shared_ptr<Queue> shared_ptr;
181
    typedef boost::shared_ptr<Queue> shared_ptr;
181

    
   
182

   
182
    typedef std::vector<shared_ptr> vector;
183
    typedef std::vector<shared_ptr> vector;
183

    
   
184

   
184
    QPID_BROKER_EXTERN Queue(const std::string& name,
185
    QPID_BROKER_EXTERN Queue(const std::string& name,
185
                             bool autodelete = false,
186
                             bool autodelete = false,
186
                             MessageStore* const store = 0,
187
                             MessageStore* const store = 0,
187
                             const OwnershipToken* const owner = 0,
188
                             const OwnershipToken* const owner = 0,
188
                             management::Manageable* parent = 0,
189
                             management::Manageable* parent = 0,
189
                             Broker* broker = 0);
190
                             Broker* broker = 0);
190
    QPID_BROKER_EXTERN ~Queue();
191
    QPID_BROKER_EXTERN ~Queue();
191

    
   
192

   
192
    /** allow the Consumer to consume or browse the next available message */
193
    /** allow the Consumer to consume or browse the next available message */
193
    QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
194
    QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
194

    
   
195

   
195
    /** allow the Consumer to acquire a message that it has browsed.
196
    /** allow the Consumer to acquire a message that it has browsed.
196
     * @param msg - message to be acquired.
197
     * @param msg - message to be acquired.
197
     * @return false if message is no longer available for acquire.
198
     * @return false if message is no longer available for acquire.
198
     */
199
     */
199
    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
200
    QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
200

    
   
201

   
201
    /**
202
    /**
202
     * Used to configure a new queue and create a persistent record
203
     * Used to configure a new queue and create a persistent record
203
     * for it in store if required.
204
     * for it in store if required.
204
     */
205
     */
205
    QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
206
    QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
206

    
   
207

   
207
    /**
208
    /**
208
     * Used to reconfigure a recovered queue (does not create
209
     * Used to reconfigure a recovered queue (does not create
209
     * persistent record in store).
210
     * persistent record in store).
210
     */
211
     */
211
    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
212
    QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
212
    void destroyed();
213
    void destroyed();
213
    QPID_BROKER_EXTERN void bound(const std::string& exchange,
214
    QPID_BROKER_EXTERN void bound(const std::string& exchange,
214
                                  const std::string& key,
215
                                  const std::string& key,
215
                                  const qpid::framing::FieldTable& args);
216
                                  const qpid::framing::FieldTable& args);
216
    //TODO: get unbind out of the public interface; only there for purposes of one unit test
217
    //TODO: get unbind out of the public interface; only there for purposes of one unit test
217
    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
218
    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
218
    /**
219
    /**
219
     * Bind self to specified exchange, and record that binding for unbinding on delete.
220
     * Bind self to specified exchange, and record that binding for unbinding on delete.
220
     */
221
     */
221
    QPID_BROKER_EXTERN bool bind(
222
    QPID_BROKER_EXTERN bool bind(
222
        boost::shared_ptr<Exchange> exchange, const std::string& key,
223
        boost::shared_ptr<Exchange> exchange, const std::string& key,
223
        const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
224
        const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
224

    
   
225

   
225
    /** Acquire the message at the given position if it is available for acquire.  Not to
226
    /** Acquire the message at the given position if it is available for acquire.  Not to
226
     * be used by clients, but used by the broker for queue management.
227
     * be used by clients, but used by the broker for queue management.
227
     * @param message - set to the acquired message if true returned.
228
     * @param message - set to the acquired message if true returned.
228
     * @return true if the message has been acquired.
229
     * @return true if the message has been acquired.
229
     */
230
     */
230
    QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
231
    QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
231

    
   
232

   
232
    /**
233
    /**
233
     * Delivers a message to the queue. Will record it as
234
     * Delivers a message to the queue. Will record it as
234
     * enqueued if persistent then process it.
235
     * enqueued if persistent then process it.
235
     */
236
     */
236
    QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
237
    QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
237
    /**
238
    /**
238
     * Dispatches the messages immediately to a consumer if
239
     * Dispatches the messages immediately to a consumer if
239
     * one is available or stores it for later if not.
240
     * one is available or stores it for later if not.
240
     */
241
     */
241
    QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
242
    QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
242
    /**
243
    /**
243
     * Returns a message to the in-memory queue (due to lack
244
     * Returns a message to the in-memory queue (due to lack
244
     * of acknowledegement from a receiver). If a consumer is
245
     * of acknowledegement from a receiver). If a consumer is
245
     * available it will be dispatched immediately, else it
246
     * available it will be dispatched immediately, else it
246
     * will be returned to the front of the queue.
247
     * will be returned to the front of the queue.
247
     */
248
     */
248
    QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
249
    QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
249
    /**
250
    /**
250
     * Used during recovery to add stored messages back to the queue
251
     * Used during recovery to add stored messages back to the queue
251
     */
252
     */
252
    QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
253
    QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
253

    
   
254

   
254
    QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
255
    QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
255
                                    bool exclusive = false);
256
                                    bool exclusive = false);
256
    QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
257
    QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
257

    
   
258

   
258
    QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0,  //defaults to all messages
259
    QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0,  //defaults to all messages
259
                   boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
260
                   boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
260
                   const ::qpid::types::Variant::Map *filter=0);
261
                   const ::qpid::types::Variant::Map *filter=0);
261
    QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
262
    QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
262

    
   
263

   
263
    //move qty # of messages to destination Queue destq
264
    //move qty # of messages to destination Queue destq
264
    QPID_BROKER_EXTERN uint32_t move(
265
    QPID_BROKER_EXTERN uint32_t move(
265
        const Queue::shared_ptr destq, uint32_t qty,
266
        const Queue::shared_ptr destq, uint32_t qty,
266
        const qpid::types::Variant::Map *filter=0);
267
        const qpid::types::Variant::Map *filter=0);
267

    
   
268

   
268
    QPID_BROKER_EXTERN uint32_t getMessageCount() const;
269
    QPID_BROKER_EXTERN uint32_t getMessageCount() const;
269
    QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
270
    QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
270
    QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
271
    QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
271
    inline const std::string& getName() const { return name; }
272
    inline const std::string& getName() const { return name; }
272
    QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
273
    QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
273
    QPID_BROKER_EXTERN void releaseExclusiveOwnership();
274
    QPID_BROKER_EXTERN void releaseExclusiveOwnership();
274
    QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
275
    QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
275
    QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
276
    QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
276
    QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
277
    QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
277
    inline bool isDurable() const { return store != 0; }
278
    inline bool isDurable() const { return store != 0; }
278
    inline const framing::FieldTable& getSettings() const { return settings; }
279
    inline const framing::FieldTable& getSettings() const { return settings; }
279
    inline bool isAutoDelete() const { return autodelete; }
280
    inline bool isAutoDelete() const { return autodelete; }
280
    QPID_BROKER_EXTERN bool canAutoDelete() const;
281
    QPID_BROKER_EXTERN bool canAutoDelete() const;
281
    const QueueBindings& getBindings() const { return bindings; }
282
    const QueueBindings& getBindings() const { return bindings; }
282

    
   
283

   
283
    /**
284
    /**
284
     * used to take messages from in memory and flush down to disk.
285
     * used to take messages from in memory and flush down to disk.
285
     */
286
     */
286
    QPID_BROKER_EXTERN void setLastNodeFailure();
287
    QPID_BROKER_EXTERN void setLastNodeFailure();
287
    QPID_BROKER_EXTERN void clearLastNodeFailure();
288
    QPID_BROKER_EXTERN void clearLastNodeFailure();
288

    
   
289

   
289
    QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
290
    QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
290
    QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
291
    QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
291
    /**
292
    /**
292
     * dequeue from store (only done once messages is acknowledged)
293
     * dequeue from store (only done once messages is acknowledged)
293
     */
294
     */
294
    QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
295
    QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
295
    /**
296
    /**
296
     * Inform the queue that a previous transactional dequeue
297
     * Inform the queue that a previous transactional dequeue
297
     * committed.
298
     * committed.
298
     */
299
     */
299
    QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
300
    QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
300

    
   
301

   
301
    /**
302
    /**
302
     * Inform queue of messages that were enqueued, have since
303
     * Inform queue of messages that were enqueued, have since
303
     * been acquired but not yet accepted or released (and
304
     * been acquired but not yet accepted or released (and
304
     * thus are still logically on the queue) - used in
305
     * thus are still logically on the queue) - used in
305
     * clustered broker.
306
     * clustered broker.
306
     */
307
     */
307
    QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
308
    QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
308

    
   
309

   
309
    /**
310
    /**
310
     * Test whether the specified message (identified by its
311
     * Test whether the specified message (identified by its
311
     * sequence/position), is still enqueued (note this
312
     * sequence/position), is still enqueued (note this
312
     * doesn't mean it is available for delivery as it may
313
     * doesn't mean it is available for delivery as it may
313
     * have been delievered to a subscriber who has not yet
314
     * have been delievered to a subscriber who has not yet
314
     * accepted it).
315
     * accepted it).
315
     */
316
     */
316
    QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
317
    QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
317

    
   
318

   
318
    /**
319
    /**
319
     * Acquires the next available (oldest) message
320
     * Acquires the next available (oldest) message
320
     */
321
     */
321
    QPID_BROKER_EXTERN QueuedMessage get();
322
    QPID_BROKER_EXTERN QueuedMessage get();
322

    
   
323

   
323
    /** Get the message at position pos, returns true if found and sets msg */
324
    /** Get the message at position pos, returns true if found and sets msg */
324
    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
325
    QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
325

    
   
326

   
326
    QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
327
    QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
327

    
   
328

   
328
    QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
329
    QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
329
    QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
330
    QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
330
    QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
331
    QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
331

    
   
332

   
332
    //PersistableQueue support:
333
    //PersistableQueue support:
333
    QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
334
    QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
334
    QPID_BROKER_EXTERN void setPersistenceId(uint64_t persistenceId) const;
335
    QPID_BROKER_EXTERN void setPersistenceId(uint64_t persistenceId) const;
335
    QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;
336
    QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;
336
    QPID_BROKER_EXTERN uint32_t encodedSize() const;
337
    QPID_BROKER_EXTERN uint32_t encodedSize() const;
337

    
   
338

   
338
    /**
339
    /**
339
     * Restores a queue from encoded data (used in recovery)
340
     * Restores a queue from encoded data (used in recovery)
340
     *
341
     *
341
     * Note: restored queue will be neither auto-deleted or have an
342
     * Note: restored queue will be neither auto-deleted or have an
342
     * exclusive owner
343
     * exclusive owner
343
     */
344
     */
344
    static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
345
    static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
345
    static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
346
    static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
346

    
   
347

   
347
    virtual void setExternalQueueStore(ExternalQueueStore* inst);
348
    virtual void setExternalQueueStore(ExternalQueueStore* inst);
348

    
   
349

   
349
    // Increment the rejected-by-consumer counter.
350
    // Increment the rejected-by-consumer counter.
350
    QPID_BROKER_EXTERN void countRejected() const;
351
    QPID_BROKER_EXTERN void countRejected() const;
351
    QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const;
352
    QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const;
352
    QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
353
    QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
353

    
   
354

   
354
    // Manageable entry points
355
    // Manageable entry points
355
    QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const;
356
    QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const;
356
    management::Manageable::status_t
357
    management::Manageable::status_t
357
    QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
358
    QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
358
    QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
359
    QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
359

    
   
360

   
360
    /** Apply f to each Message on the queue. */
361
    /** Apply f to each Message on the queue. */
361
    template <class F> void eachMessage(F f) {
362
    template <class F> void eachMessage(F f) {
362
        sys::Mutex::ScopedLock l(messageLock);
363
        sys::Mutex::ScopedLock l(messageLock);
363
        messages->foreach(f);
364
        messages->foreach(f);
364
    }
365
    }
365

    
   
366

   
366
    /** Apply f to each QueueBinding on the queue */
367
    /** Apply f to each QueueBinding on the queue */
367
    template <class F> void eachBinding(F f) {
368
    template <class F> void eachBinding(F f) {
368
        bindings.eachBinding(f);
369
        bindings.eachBinding(f);
369
    }
370
    }
370

    
   
371

   
371
    /** Apply f to each Observer on the queue */
372
    /** Apply f to each Observer on the queue */
372
    template <class F> void eachObserver(F f) {
373
    template <class F> void eachObserver(F f) {
373
        sys::Mutex::ScopedLock l(messageLock);
374
        sys::Mutex::ScopedLock l(messageLock);
374
        std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
375
        std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
375
    }
376
    }
376

    
   
377

   
377
    /** Set the position sequence number  for the next message on the queue.
378
    /** Set the position sequence number  for the next message on the queue.
378
     * Must be >= the current sequence number.
379
     * Must be >= the current sequence number.
379
     * Used by cluster to replicate queues.
380
     * Used by cluster to replicate queues.
380
     */
381
     */
381
    QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
382
    QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
382
    /** return current position sequence number for the next message on the queue.
383
    /** return current position sequence number for the next message on the queue.
383
     */
384
     */
384
    QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
385
    QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
385
    QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
386
    QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
386
    QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
387
    QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
387
    QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
388
    QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
388
    /**
389
    /**
389
     * Notify queue that recovery has completed.
390
     * Notify queue that recovery has completed.
390
     */
391
     */
391
    QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges);
392
    QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges);
392

    
   
393

   
393
    // For cluster update
394
    // For cluster update
394
    QPID_BROKER_EXTERN QueueListeners& getListeners();
395
    QPID_BROKER_EXTERN QueueListeners& getListeners();
395
    QPID_BROKER_EXTERN Messages& getMessages();
396
    QPID_BROKER_EXTERN Messages& getMessages();
396
    QPID_BROKER_EXTERN const Messages& getMessages() const;
397
    QPID_BROKER_EXTERN const Messages& getMessages() const;
397

    
   
398

   
398
    /**
399
    /**
399
     * Reserve space in policy for an enqueued message that
400
     * Reserve space in policy for an enqueued message that
400
     * has been recovered in the prepared state (dtx only)
401
     * has been recovered in the prepared state (dtx only)
401
     */
402
     */
402
    QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
403
    QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
403

    
   
404

   
404
    QPID_BROKER_EXTERN void flush();
405
    QPID_BROKER_EXTERN void flush();
405

    
   
406

   
406
    QPID_BROKER_EXTERN Broker* getBroker();
407
    QPID_BROKER_EXTERN Broker* getBroker();
407

    
   
408

   
408
    uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
409
    uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
409
    QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
410
    QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
410
};
411
};
411
}
412
}
412
}
413
}
413

    
   
414

   
414

    
   
415

   
415
#endif  /*!_broker_Queue_h*/
416
#endif  /*!_broker_Queue_h*/
/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
Revision 1326757 New Change
 
/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
Revision 1326757 New Change
 
/trunk/qpid/cpp/src/tests/ha_tests.py
Revision 1326757 New Change
 
  1. /trunk/qpid/cpp/src/qpid/broker/Queue.h: Loading...
  2. /trunk/qpid/cpp/src/qpid/broker/Queue.cpp: Loading...
  3. /trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp: Loading...
  4. /trunk/qpid/cpp/src/tests/ha_tests.py: Loading...