java - Having trouble handling exceptions in Spring Integration -
i'm new spring integration , confused how send error messages designated error queue. want error message header on original message , end in separate queue. read can done header enricher, tried implement nothing showing in error queue.
also, need separate exception handling class in order error messages make error queue or can throw exceptions in transforming methods?
here xml config:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <rabbit:connection-factory id="connectionfactory" host="bigdata-rdp" username="myuser" password="mypass" /> <rabbit:template id="amqptemplate" connection-factory="connectionfactory" /> <rabbit:admin connection-factory="connectionfactory" /> <rabbit:queue name="first" auto-delete="false" durable="true" /> <rabbit:queue name="second" auto-delete="false" durable="true" /> <rabbit:queue name="errorqueue" auto-delete="false" durable="true" /> <int:poller default="true" fixed-rate="100"/> <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true"> <rabbit:bindings> <rabbit:binding queue="second" /> </rabbit:bindings> </rabbit:fanout-exchange> <rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true"> <rabbit:bindings> <rabbit:binding queue="errorqueue" /> </rabbit:bindings> </rabbit:fanout-exchange> <int-amqp:outbound-channel-adapter channel="messageoutputchannel" exchange-name="second-exchange" amqp-template="amqptemplate" /> <int-amqp:inbound-channel-adapter channel="messageinputchannel" error-channel="errorinputchannel" queue-names="first" connection-factory="connectionfactory" concurrent-consumers="20" /> <int-amqp:outbound-channel-adapter channel="erroroutputchannel" exchange-name="error-exchange" amqp-template="amqptemplate" /> <int:channel id="messageinputchannel" /> <int:channel id="messageoutputchannel"/> <int:channel id="errorinputchannel"/>
<int:service-activator input-channel="errorinputchannel" output-channel= "erroroutputchannel" method = "handleerror" > <bean class="firstattempt.messageerrorhandler"/>
<int:chain input-channel="messageinputchannel" output-channel="messageoutputchannel"> <int:header-enricher> <int:error-channel ref="errorinputchannel" /> </int:header-enricher> <int:transformer method = "convert" > <bean class="firstattempt.jsonobjectconverter" /> </int:transformer> <int:service-activator method="transform"> <bean class="firstattempt.transformer" /> </int:service-activator> <int:object-to-string-transformer /> </int:chain> </beans>
error class:
public class errorhandler { public string errorhandle(messagehandlingexception exception) { return exception.getmessage();
qualityscorer class (called transformer):
public class qualityscorer { private hashtable<string, string> table; private final static string csvfile = "c:\\users\\john\\test.csv"; public qualityscorer() throws exception { table = new hashtable<string, string>(); initializetable(); } private void initializetable() throws exception { bufferedreader br = null; string line = ""; string cvssplitby = ","; try { br = new bufferedreader(new filereader(csvfile)); while ((line = br.readline()) != null) { string[] data = line.split(cvssplitby); if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1")) table.putifabsent(data[3], data[1]); } } catch (filenotfoundexception e) { throw new exception("no file found"); } catch (ioexception e) { e.printstacktrace(); } { if (br != null) { try { br.close(); } catch (ioexception e) { e.printstacktrace(); } } } } public float getscore(jsonobject object) throws exception { float score; if (object == null) { throw new illegalargumentexception("object"); } if (!object.has("source")) { throw new exception("object not have source"); } if (!object.has("employer")) { throw new exception("object not have employer"); } string source = object.getstring("source"); string employer = object.getstring("employer"); if (table.containskey(employer) && !source.equals("packageone")) { score = 1; } else { score = -1; } return score; } }
right now, message being loaded has no source, program should throwing messagingexception messageerrorhandler.
transformer code:
public class transformer { private qualityscorer qualityscorer; public transformer() throws exception { qualityscorer = new qualityscorer(); } public jsonobject transform(jsonobject object) throws exception { float score = qualityscorer.getscore(object); object.put("score", score); return object; } }
all together, program should receive pre-loaded message queue, transform , send on second queue, if source provided in pre-loaded message. i'm trying handle errors , make sent error queue message header. issue has been frustrating me awhile, appreciated!
the error being shown in stacktrace is:
java.lang.nosuchmethoderror: org.springframework.messaging.messagehandlingexception: method <init>(lorg/springframework/messaging/message;ljava/lang/throwable;)v not found @ org.springframework.integration.handler.methodinvokingmessageprocessor.processmessage(methodinvokingmessageprocessor.java:96) @ org.springframework.integration.handler.serviceactivatinghandler.handlerequestmessage(serviceactivatinghandler.java:89) @ org.springframework.integration.handler.abstractreplyproducingmessagehandler.handlemessageinternal(abstractreplyproducingmessagehandler.java:109) @ org.springframework.integration.handler.abstractmessagehandler.handlemessage(abstractmessagehandler.java:127) @ org.springframework.integration.handler.messagehandlerchain$1.send(messagehandlerchain.java:129) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:114) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:44) @ org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate.java:92) @ org.springframework.integration.handler.abstractmessageproducinghandler.sendoutput(abstractmessageproducinghandler.java:358) @ org.springframework.integration.handler.abstractmessageproducinghandler.produceoutput(abstractmessageproducinghandler.java:269) @ org.springframework.integration.handler.abstractmessageproducinghandler.sendoutputs(abstractmessageproducinghandler.java:186) @ org.springframework.integration.handler.abstractreplyproducingmessagehandler.handlemessageinternal(abstractreplyproducingmessagehandler.java:115) @ org.springframework.integration.handler.abstractmessagehandler.handlemessage(abstractmessagehandler.java:127) @ org.springframework.integration.handler.messagehandlerchain$1.send(messagehandlerchain.java:129) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:114) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:44) @ org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate.java:92) @ org.springframework.integration.handler.abstractmessageproducinghandler.sendoutput(abstractmessageproducinghandler.java:358) @ org.springframework.integration.handler.abstractmessageproducinghandler.produceoutput(abstractmessageproducinghandler.java:269) @ org.springframework.integration.handler.abstractmessageproducinghandler.sendoutputs(abstractmessageproducinghandler.java:186) @ org.springframework.integration.handler.abstractreplyproducingmessagehandler.handlemessageinternal(abstractreplyproducingmessagehandler.java:115) @ org.springframework.integration.handler.abstractmessagehandler.handlemessage(abstractmessagehandler.java:127) @ org.springframework.integration.handler.messagehandlerchain.handlemessageinternal(messagehandlerchain.java:110) @ org.springframework.integration.handler.abstractmessagehandler.handlemessage(abstractmessagehandler.java:127) @ org.springframework.integration.dispatcher.abstractdispatcher.tryoptimizeddispatch(abstractdispatcher.java:116) @ org.springframework.integration.dispatcher.unicastingdispatcher.dodispatch(unicastingdispatcher.java:148) @ org.springframework.integration.dispatcher.unicastingdispatcher.dispatch(unicastingdispatcher.java:121) @ org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel.java:89) @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:423) @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:373) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:114) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:44) @ org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate.java:92) @ org.springframework.integration.endpoint.messageproducersupport.sendmessage(messageproducersupport.java:188) @ org.springframework.integration.amqp.inbound.amqpinboundchanneladapter.access$1100(amqpinboundchanneladapter.java:56) @ org.springframework.integration.amqp.inbound.amqpinboundchanneladapter$listener.processmessage(amqpinboundchanneladapter.java:246) @ org.springframework.integration.amqp.inbound.amqpinboundchanneladapter$listener.onmessage(amqpinboundchanneladapter.java:203) @ org.springframework.amqp.rabbit.listener.abstractmessagelistenercontainer.doinvokelistener(abstractmessagelistenercontainer.java:822) @ org.springframework.amqp.rabbit.listener.abstractmessagelistenercontainer.invokelistener(abstractmessagelistenercontainer.java:745) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer.access$001(simplemessagelistenercontainer.java:97) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer$1.invokelistener(simplemessagelistenercontainer.java:189) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer.invokelistener(simplemessagelistenercontainer.java:1276) @ org.springframework.amqp.rabbit.listener.abstractmessagelistenercontainer.executelistener(abstractmessagelistenercontainer.java:726) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer.doreceiveandexecute(simplemessagelistenercontainer.java:1219) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer.receiveandexecute(simplemessagelistenercontainer.java:1189) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer.access$1500(simplemessagelistenercontainer.java:97) @ org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer$asyncmessageprocessingconsumer.run(simplemessagelistenercontainer.java:1421) @ java.lang.thread.run(thread.java:748)
but nothing going error queue.
when exception thrown, wrapped requestmessage
messagingexception
. own business exception in cause
, can access requestmessage
messagingexception.failedmessage
property.
so, looks have need use-case. problem before sending error-exchange
should have <transformer>
in error flow convert messagingexception
proper message send amqp.
Comments
Post a Comment