Review Board 1.7.22


QPID-3401 changes to the core client

Review Request #2364 - Created Oct. 12, 2011 and discarded

rajith attapattu
QPID-3401
Reviewers
qpid
gordon, k-wall, robbie, wprice
qpid
The following is a patch that illustrates the changes made to the core client namely the session, message consumer and producer classes.
(Please note that in order to compile and run the tests you need to get apply the QPID-3401.patch attached to the JIRA.)

Most of the code removed from the AMQSession_0_10.java have been included in the new class structure posted as a separate review [ https://reviews.apache.org/r/2366/ ] to ensure clarity.

In summary the changes are,
1. The code now uses AddressBasedDestination if the syntax is ADDR.
2. For address destinations the code now delegates the creation, assertion, deletion actions to the underlying QpidDestination class via the AddressBasedDestination.
3. The code also delegates creating of subscriptions.

TODO.
1. Delegate the deleting of subscriptions (minor change which will follow once this patch is approved)
2. Currently Durable Subscribers want work with AddressBasedDestinations (This will be done in a follow up patch that will be posted soon).

(AddressBasedDestination, AddressBasedTopic and AddressBasedQueue classes are included along with the new class structure patch as a separate review).
All existing tests in AddressBasedDestination test pass (with the exception of the Durable subscription test).
http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Revision 1182391 New Change
[20] 95 lines
[+20]
96
import org.apache.qpid.jms.Session;
96
import org.apache.qpid.jms.Session;
97
import org.apache.qpid.protocol.AMQConstant;
97
import org.apache.qpid.protocol.AMQConstant;
98
import org.apache.qpid.thread.Threading;
98
import org.apache.qpid.thread.Threading;
99
import org.apache.qpid.transport.SessionException;
99
import org.apache.qpid.transport.SessionException;
100
import org.apache.qpid.transport.TransportException;
100
import org.apache.qpid.transport.TransportException;

    
   
101
import org.apache.qpid.url.AMQBindingURL;
101
import org.slf4j.Logger;
102
import org.slf4j.Logger;
102
import org.slf4j.LoggerFactory;
103
import org.slf4j.LoggerFactory;
103

    
   
104

   
104
/**
105
/**
105
 * <p/><table id="crc"><caption>CRC Card</caption>
106
 * <p/><table id="crc"><caption>CRC Card</caption>
[+20] [20] 864 lines
[+20] [+] protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
970

    
   
971

   
971
    public MessageConsumer createConsumer(Destination destination) throws JMSException
972
    public MessageConsumer createConsumer(Destination destination) throws JMSException
972
    {
973
    {
973
        checkValidDestination(destination);
974
        checkValidDestination(destination);
974

    
   
975

   
975
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
976
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), null, null,
976
                                  isBrowseOnlyDestination(destination), false);
977
                                  isBrowseOnlyDestination(destination), false);
977
    }
978
    }
978

    
   
979

   
979
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
980
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
980
    {
981
    {
981
        checkValidDestination(destination);
982
        checkValidDestination(destination);
982

    
   
983

   
983
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
984
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination),
984
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
985
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
985
    }
986
    }
986

    
   
987

   
987
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
988
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
988
            throws JMSException
989
            throws JMSException
989
    {
990
    {
990
        checkValidDestination(destination);
991
        checkValidDestination(destination);
991

    
   
992

   
992
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
993
        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, isTopic(destination),
993
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
994
                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
994
    }
995
    }
995

    
   
996

   
996
    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
997
    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
997
                                          String selector) throws JMSException
998
                                          String selector) throws JMSException
[+20] [20] 30 lines
[+20] [+] public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
1028
            throws JMSException
1029
            throws JMSException
1029
    {
1030
    {
1030
        checkNotClosed();
1031
        checkNotClosed();
1031
        Topic origTopic = checkValidTopic(topic, true);
1032
        Topic origTopic = checkValidTopic(topic, true);
1032
        
1033
        
1033
        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
1034
        // The check valid Topic will throw an exception if topic is not an instanceof 
1034
        if (dest.getDestSyntax() == DestSyntax.ADDR &&
1035
        // AMQTopic or AddressBasedTopc.
1035
            !dest.isAddressResolved())
1036
        Topic dest;

    
   
1037
        if (topic instanceof AMQTopic)
1036
        {
1038
        {
1037
            try
1039
            dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
1038
            {

   
1039
                handleAddressBasedDestination(dest,false,true);

   
1040
                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)

   
1041
                {

   
1042
                    throw new JMSException("Durable subscribers can only be created for Topics");

   
1043
                }

   
1044
                dest.getSourceNode().setDurable(true);

   
1045
            }

   
1046
            catch(AMQException e)

   
1047
            {

   
1048
                JMSException ex = new JMSException("Error when verifying destination");

   
1049
                ex.initCause(e);

   
1050
                ex.setLinkedException(e);

   
1051
                throw ex;

   
1052
            }
1040
        }
1053
            catch(TransportException e)
1041
        else
1054
            {
1042
        {
1055
                throw toJMSException("Error when verifying destination", e);
1043
            dest = AddressBasedTopic.createDurableTopic(origTopic, name, _connection, this);
1056
            }

   
1057
        }
1044
        }
1058
        
1045
        
1059
        String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
1046
        String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
1060
        
1047
        
1061
        _subscriberDetails.lock();
1048
        _subscriberDetails.lock();
1062
        try
1049
        try
1063
        {
1050
        {
1064
            TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
1051
            TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
1065
            
1052
            
1066
            // Not subscribed to this name in the current session
1053
            // Not subscribed to this name in the current session
1067
            if (subscriber == null)
1054
            if (subscriber == null)
1068
            {
1055
            {
1069
                // After the address is resolved routing key will not be null.
1056
                // After the address is resolved routing key will not be null.
1070
                AMQShortString topicName = dest.getRoutingKey();
1057
                AMQShortString topicName = new AMQShortString(dest.getTopicName());
1071
                
1058
                
1072
                if (_strictAMQP)
1059
                if (_strictAMQP)
1073
                {
1060
                {
1074
                    if (_strictAMQPFATAL)
1061
                    if (_strictAMQPFATAL)
1075
                    {
1062
                    {
1076
                        throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
1063
                        throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
1077
                    }
1064
                    }
1078
                    else
1065
                    else
1079
                    {
1066
                    {
1080
                        _logger.warn("Unable to determine if subscription already exists for '" + topicName
1067
                        _logger.warn("Unable to determine if subscription already exists for '" + topicName
1081
                                        + "' for creation durableSubscriber. Requesting queue deletion regardless.");
1068
                                        + "' for creation durableSubscriber. Requesting queue deletion regardless.");
1082
                    }
1069
                    }
1083

    
   
1070

   
1084
                    deleteQueue(dest.getAMQQueueName());
1071
                    if (topic instanceof AMQTopic)

    
   
1072
                    {

    
   
1073
                        deleteQueue(((AMQTopic)dest).getAMQQueueName());

    
   
1074
                    }

    
   
1075
                    else

    
   
1076
                    {

    
   
1077
                        ((AddressBasedTopic)topic).deleteSubscription(this);

    
   
1078
                    }
1085
                }
1079
                }
1086
                else
1080
                else
1087
                {
1081
                {
1088
                    Map<String,Object> args = new HashMap<String,Object>();
1082
                    Map<String,Object> args = new HashMap<String,Object>();
1089
                    
1083
                    
1090
                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a 
1084
                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a 
1091
                    // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
1085
                    // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
1092
                    // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
1086
                    // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
1093
                    // argument, as specifying null for the arguments when querying means they should not be checked at all
1087
                    // argument, as specifying null for the arguments when querying means they should not be checked at all
1094
                    args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
1088
                    args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
1095
                    
1089
                    
1096
                    // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
1090
                    // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
1097
                    // says we must trash the subscription.
1091
                    // says we must trash the subscription.
1098
                    boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
1092
                    AMQShortString exchangeName;

    
   
1093
                    AMQShortString queueName;

    
   
1094
                                        

    
   
1095
                    if (topic instanceof AMQTopic)

    
   
1096
                    {

    
   
1097
                        exchangeName = ((AMQTopic)dest).getExchangeName();

    
   
1098
                        queueName = ((AMQTopic)dest).getAMQQueueName();

    
   
1099
                                            

    
   
1100
                    }

    
   
1101
                    else

    
   
1102
                    {

    
   
1103
                        exchangeName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getName());

    
   
1104
                        queueName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getSubject());

    
   
1105
                    }                    

    
   
1106
                    boolean isQueueBound = isQueueBound(exchangeName, queueName);
1099
                    boolean isQueueBoundForTopicAndSelector = 
1107
                    boolean isQueueBoundForTopicAndSelector = 
1100
                                isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
1108
                        isQueueBound(exchangeName.asString(), queueName.asString(), topicName.asString(), args);
1101

    
   
1109

   
1102
                    if (isQueueBound && !isQueueBoundForTopicAndSelector)
1110
                    if (isQueueBound && !isQueueBoundForTopicAndSelector)
1103
                    {
1111
                    {
1104
                        deleteQueue(dest.getAMQQueueName());
1112
                        if (topic instanceof AMQTopic)

    
   
1113
                        {

    
   
1114
                            deleteQueue(((AMQTopic)dest).getAMQQueueName());

    
   
1115
                        }

    
   
1116
                        else

    
   
1117
                        {

    
   
1118
                            ((AddressBasedTopic)topic).deleteSubscription(this);

    
   
1119
                        }
1105
                    }
1120
                    }
1106
                }
1121
                }
1107
            }
1122
            }
1108
            else 
1123
            else 
1109
            {
1124
            {
[+20] [20] 101 lines
[+20] [+] public TopicPublisher createPublisher(Topic topic) throws JMSException
1211
    }
1226
    }
1212

    
   
1227

   
1213
    public Queue createQueue(String queueName) throws JMSException
1228
    public Queue createQueue(String queueName) throws JMSException
1214
    {
1229
    {
1215
        checkNotClosed();
1230
        checkNotClosed();
1216
        try

   
1217
        {

   
1218
            if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)

   
1219
            {

   
1220
                DestSyntax syntax = AMQDestination.getDestType(queueName);
1231
        DestSyntax syntax = AMQDestination.getDestType(queueName);

    
   
1232
        queueName = AMQDestination.stripSyntaxPrefix(queueName);
1221
                if (syntax == AMQDestination.DestSyntax.BURL)
1233
        if (syntax == AMQDestination.DestSyntax.BURL)
1222
                {
1234
        {
1223
                    // For testing we may want to use the prefix
1235
            if (queueName.indexOf('/') == -1)

    
   
1236
            {
1224
                    return new AMQQueue(getDefaultQueueExchangeName(), 
1237
                return new AMQQueue(getDefaultQueueExchangeName(), 
1225
                                        new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
1238
                                   new AMQShortString(queueName));                
1226
                }
1239
            }
1227
                else
1240
            else
1228
                {
1241
            {
1229
                    AMQQueue queue = new AMQQueue(queueName);
1242
                try
1230
                    return queue;

   
1231
                    

   
1232
                }

   
1233
            }

   
1234
            else

   
1235
            {
1243
                {
1236
                return new AMQQueue(queueName);            
1244
                    return new AMQQueue(new AMQBindingURL(queueName));
1237
            }
1245
                }    
1238
        }

   
1239
        catch (URISyntaxException urlse)
1246
                catch (URISyntaxException urlse)
1240
        {
1247
                {
1241
            _logger.error("", urlse);
1248
                    _logger.error("", urlse);
1242
            JMSException jmse = new JMSException(urlse.getReason());
1249
                    JMSException jmse = new JMSException(urlse.getReason());
1243
            jmse.setLinkedException(urlse);
1250
                    jmse.setLinkedException(urlse);
1244
            jmse.initCause(urlse);
1251
                    jmse.initCause(urlse);
1245
            throw jmse;
1252
                    throw jmse;
1246
        }
1253
                }

    
   
1254
            }

    
   
1255
        }

    
   
1256
        else

    
   
1257
        {

    
   
1258
            return new AddressBasedQueue(queueName);

    
   
1259
        }
1247

    
   
1260

   
1248
    }
1261
    }
1249

    
   
1262

   
1250
    /**
1263
    /**
1251
     * Declares the named queue.
1264
     * Declares the named queue.
[+20] [20] 253 lines
[+20] [+] public TextMessage createTextMessage(String text) throws JMSException
1505
    }
1518
    }
1506

    
   
1519

   
1507
    public Topic createTopic(String topicName) throws JMSException
1520
    public Topic createTopic(String topicName) throws JMSException
1508
    {
1521
    {
1509
        checkNotClosed();
1522
        checkNotClosed();
1510
        try

   
1511
        {

   
1512
            if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)

   
1513
            {

   
1514
                DestSyntax syntax = AMQDestination.getDestType(topicName);
1523
        DestSyntax syntax = AMQDestination.getDestType(topicName);
1515
                // for testing we may want to use the prefix to indicate our choice.

   
1516
                topicName = AMQDestination.stripSyntaxPrefix(topicName);
1524
        topicName = AMQDestination.stripSyntaxPrefix(topicName);
1517
                if (syntax == AMQDestination.DestSyntax.BURL)
1525
        if (syntax == AMQDestination.DestSyntax.BURL)
1518
                {
1526
        {

    
   
1527
            if (topicName.indexOf('/') == -1)

    
   
1528
            {
1519
                    return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
1529
                return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
1520
                }
1530
            }
1521
                else
1531
            else
1522
                {
1532
            {
1523
                    return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
1533
                try
1524
                }

   
1525
            }

   
1526
            else

   
1527
            {
1534
                {
1528
                return new AMQTopic(topicName);            
1535
                    return new AMQTopic(new AMQBindingURL(topicName));
1529
            }

   
1530
        

   
1531
        }
1536
                }
1532
        catch (URISyntaxException urlse)
1537
                catch (URISyntaxException urlse)
1533
        {
1538
                {
1534
            _logger.error("", urlse);
1539
                    _logger.error("", urlse);
1535
            JMSException jmse = new JMSException(urlse.getReason());
1540
                    JMSException jmse = new JMSException(urlse.getReason());
1536
            jmse.setLinkedException(urlse);
1541
                    jmse.setLinkedException(urlse);
1537
            jmse.initCause(urlse);
1542
                    jmse.initCause(urlse);
1538
            throw jmse;
1543
                    throw jmse;
1539
        }
1544
                }
1540
    }
1545
            }

    
   
1546
        }

    
   
1547
        else

    
   
1548
        {

    
   
1549
            if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)

    
   
1550
            {

    
   
1551
                topicName = getDefaultTopicExchangeName() + "/" + topicName;

    
   
1552
            }

    
   
1553
            

    
   
1554
            return new AddressBasedTopic(topicName);

    
   
1555
        }        

    
   
1556
    }
1541

    
   
1557

   
1542
    public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
1558
    public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
1543
    {
1559
    {
1544
        declareExchange(name, type, getProtocolHandler(), nowait);
1560
        declareExchange(name, type, getProtocolHandler(), nowait);
1545
    }
1561
    }
[+20] [20] 913 lines
[+20] [+] protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
2459
        {
2475
        {
2460
            throw new javax.jms.InvalidDestinationException
2476
            throw new javax.jms.InvalidDestinationException
2461
                ("Cannot create a durable subscription with a temporary topic: " + topic);
2477
                ("Cannot create a durable subscription with a temporary topic: " + topic);
2462
        }
2478
        }
2463

    
   
2479

   
2464
        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
2480
        if (!(topic instanceof AMQTopic || topic instanceof AddressBasedTopic))
2465
        {
2481
        {
2466
            throw new javax.jms.InvalidDestinationException(
2482
            throw new javax.jms.InvalidDestinationException(
2467
                    "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
2483
                    "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
2468
                    + topic.getClass().getName());
2484
                    + topic.getClass().getName());
2469
        }
2485
        }
[+20] [20] 398 lines
[+20] [+] private void markClosedProducersAndConsumers()
2868
    {
2884
    {
2869
        AMQDestination amqd = consumer.getDestination();
2885
        AMQDestination amqd = consumer.getDestination();
2870

    
   
2886

   
2871
        AMQProtocolHandler protocolHandler = getProtocolHandler();
2887
        AMQProtocolHandler protocolHandler = getProtocolHandler();
2872

    
   
2888

   
2873
        if (amqd.getDestSyntax() == DestSyntax.ADDR)
2889
        if (amqd.getDestSyntax() == DestSyntax.BURL)
2874
        {

   
2875
            handleAddressBasedDestination(amqd,true,nowait);            

   
2876
        }

   
2877
        else

   
2878
        {
2890
        {
2879
            if (DECLARE_EXCHANGES)
2891
            if (DECLARE_EXCHANGES)
2880
            {
2892
            {
2881
                declareExchange(amqd, protocolHandler, nowait);
2893
                declareExchange(amqd, protocolHandler, nowait);
2882
            }
2894
            }
[+20] [20] 45 lines
[+20] private void markClosedProducersAndConsumers()
2928
        catch (FailoverException e)
2940
        catch (FailoverException e)
2929
        {
2941
        {
2930
            throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
2942
            throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
2931
        }
2943
        }
2932
    }
2944
    }
2933

    
   

   
2934
    public abstract void handleAddressBasedDestination(AMQDestination dest, 

   
2935
                                                       boolean isConsumer,

   
2936
                                                       boolean noWait) throws AMQException;

   
2937
    
2945
    
2938
    private void registerProducer(long producerId, MessageProducer producer)
2946
    private void registerProducer(long producerId, MessageProducer producer)
2939
    {
2947
    {
2940
        _producers.put(new Long(producerId), producer);
2948
        _producers.put(new Long(producerId), producer);
2941
    }
2949
    }
[+20] [20] 598 lines
[+20] [+] private int getErrorCode(TransportException e)
3540

    
   
3548

   
3541
    private boolean isBrowseOnlyDestination(Destination destination)
3549
    private boolean isBrowseOnlyDestination(Destination destination)
3542
    {
3550
    {
3543
        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
3551
        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
3544
    }
3552
    }

    
   
3553
    

    
   
3554
    private boolean isTopic(Destination dest)

    
   
3555
    {

    
   
3556
        if (dest instanceof AMQDestination)

    
   
3557
        {

    
   
3558
           return ((AMQDestination)dest).isTopic();

    
   
3559
        }

    
   
3560
        else

    
   
3561
        {

    
   
3562
           return dest instanceof Topic;

    
   
3563
        }

    
   
3564
    }
3545
}
3565
}
http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Revision 1182391 New Change
 
http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Revision 1182391 New Change
 
http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Revision 1182391 New Change
 
http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
Revision 1182391 New Change
 
  1. http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java: Loading...
  2. http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java: Loading...
  3. http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java: Loading...
  4. http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java: Loading...
  5. http://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java: Loading...