Review Board 1.7.22


QPID-3960 Performance regression in priority queue implementatino.

Review Request #4814 - Created April 19, 2012 and submitted

Alan Conway
Reviewers
gordon, kgiusti
qpid
Commit 1307582 introduced a serious regression in the performance of priority queues.

 QPID-3603: Keep acquired messages on queues for all queue types.

 Updated priority and lvq queues to keep acquired messages, and supply 
 them to browsers if requested. This is necessary so replicating
 subscriptions can back-up these queue types without message loss.

The commit replaced a o(1) algorithm for push() with an o(log(n)) algorithme, the result is increasingly bad performance as a queue gets longer.

To demonstrate the slowdown, run this on the broker before and after the commit:

qpid-send -a "test1;{create:always,node:{x-declare:{arguments:{'qpid.priorities':10}}}}" --priority 5 --content-size 259 -m 25000 --report-total

E.g. on my test I see 24112 m/s before and 4549 after. Note that if you repeatedly run the test the results get progressively worse, so it appears to be related to queue depth.

This patch uses the old multi-deque algorithm for push and consume, and adds a MessageDeque index for fast browsign. It appears to fix the performance problem but fails 2 tests, I'm hoping someone can spot my mistake.

Make check fails in 2 places:

../../../qpid/cpp/src/tests/cluster_test.cpp(1227): error in "testFairsharePriorityDelivery": check browse(c0, "q", 12) == browse(c1, "q"\
, 12) failed [msg-4 msg-6 msg-8 msg-10 msg-11 msg-12 msg-13 msg-14 msg-15 msg-16 msg-17 msg-18  != msg-4 msg-6 msg-8 msg-10 msg-12 msg-14 msg-16 msg-18 msg-9 msg-10 msg-12 msg-14 ]
FAIL: run_cluster_test


qpid_tests.broker_0_10.management.ManagementTest.test_reroute_priority_queue ........ fail
Error during test:  Traceback (most recent call last):
    File "/home/aconway/qpidha/opt/src/tests/python/commands/qpid-python-test", line 340, in run
      phase()
    File "/home/aconway/qpidha/qpid/tests/src/py/qpid_tests/broker_0_10/management.py", line 276, in test_reroute_priority_queue
      self.assertEqual(result.status, 0)
    File "/usr/lib64/python2.6/unittest.py", line 349, in failUnlessEqual
      (msg or '%r != %r' % (first, second))
  AssertionError: 7 != 0
FAIL: python_tests
/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
Revision 1328126 New Change
[20] 19 lines
[+20]
20
 * specific language governing permissions and limitations
20
 * specific language governing permissions and limitations
21
 * under the License.
21
 * under the License.
22
 *
22
 *
23
 */
23
 */
24
#include "qpid/broker/PriorityQueue.h"
24
#include "qpid/broker/PriorityQueue.h"
25
#include <vector>

   
26

    
   
25

   
27
namespace qpid {
26
namespace qpid {
28
namespace framing {
27
namespace framing {
29
class FieldTable;
28
class FieldTable;
30
}
29
}
[+20] [20] 6 lines
[+20] [+] namespace broker {
37
 */
36
 */
38
class Fairshare : public PriorityQueue
37
class Fairshare : public PriorityQueue
39
{
38
{
40
  public:
39
  public:
41
    Fairshare(size_t levels, uint limit);
40
    Fairshare(size_t levels, uint limit);
42
    bool getState(qpid::framing::FieldTable& counts) const;
41
    bool getState(uint& priority, uint& count) const;
43
    bool setState(const qpid::framing::FieldTable& counts);
42
    bool setState(uint priority, uint count);
44
    void setLimit(size_t level, uint limit);
43
    void setLimit(size_t level, uint limit);
45
    bool isNull();
44
    bool isNull();
46
    bool consume(QueuedMessage&);

   
47
    static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
45
    static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
48
    static bool getState(const Messages&, qpid::framing::FieldTable& counts);
46
    static bool getState(const Messages&, uint& priority, uint& count);
49
    static bool setState(Messages&, const qpid::framing::FieldTable& counts);
47
    static bool setState(Messages&, uint priority, uint count);
50
  private:
48
  private:
51
    std::vector<uint> limits;
49
    std::vector<uint> limits;
52
    std::vector<uint> counts;

   
53

    
   
50

   
54
    bool checkLevel(uint level);
51
    uint priority;

    
   
52
    uint count;

    
   
53

   

    
   
54
    uint currentLevel();

    
   
55
    uint nextLevel();

    
   
56
    bool limitReached();

    
   
57
    bool findFrontLevel(uint& p, PriorityLevels&);
55
};
58
};
56
}} // namespace qpid::broker
59
}} // namespace qpid::broker
57

    
   
60

   
58
#endif  /*!QPID_BROKER_FAIRSHARE_H*/
61
#endif  /*!QPID_BROKER_FAIRSHARE_H*/
/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/cluster/Connection.h
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
Revision 1328126 New Change
 
/trunk/qpid/cpp/src/tests/ha_tests.py
Revision 1328126 New Change
 
/trunk/qpid/cpp/xml/cluster.xml
Revision 1328126 New Change
 
  1. /trunk/qpid/cpp/src/qpid/broker/Fairshare.h: Loading...
  2. /trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp: Loading...
  3. /trunk/qpid/cpp/src/qpid/broker/MessageDeque.h: Loading...
  4. /trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp: Loading...
  5. /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h: Loading...
  6. /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp: Loading...
  7. /trunk/qpid/cpp/src/qpid/cluster/Connection.h: Loading...
  8. /trunk/qpid/cpp/src/qpid/cluster/Connection.cpp: Loading...
  9. /trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp: Loading...
  10. /trunk/qpid/cpp/src/tests/ha_tests.py: Loading...
  11. /trunk/qpid/cpp/xml/cluster.xml: Loading...