老衲是一个 python 的脑残粉(可以看我的昵称),然后遇到一个有脑的问题
SpringBoot 搭建的 web 框架,里面集成了 rabbitmq 组建如下:
<!-- RabbitMQ 集成 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后将一些消息发送到 rabbitmq,后端用 celery 起了一堆 worker 来消费这些消息;
本来感觉这个架构很简单,实现起来很 easy,但是 celery 貌似对 json 对象处理有一些不兼容 JAVA 的 JSONobject ; 比如我将对象用 JSONObject 的 fromObject 方法转成 json 对象,然后发送到 MQ 中,celery 的 worker 就会报错:
[2018-11-19 14:21:52,895: CRITICAL/MainProcess] Can't decode message body: ContentDisallowed(u'Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)',) [type:'application/x-java-serialized-object' encoding:None headers:{}]
...
Traceback (most recent call last):
File "/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 553, in on_task_received
payload = message.decode()
File "/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py", line 192, in decode
self._decoded_cache = self._decode()
File "/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py", line 197, in _decode
self.content_encoding, accept=self.accept)
File "/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/serialization.py", line 253, in loads
raise self._for_untrusted_content(content_type, 'untrusted')
ContentDisallowed: Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)
我理解的意思是 celery 无法处理(application/x-java-serialized-object)这种对象吧?
这种情况该怎么解决呢? 我其实还是想用 celery 的,好处是起 worker 简单,easy,但是就是报上面的错误
1
linbiaye 2018-11-19 14:36:43 +08:00 1
这个 application/x-java-serialized-object 一看就是 java 专用的,你需要塞进去的时候转成 json.
|
2
pythonCoder OP @linbiaye 嗯,也用过 Gson 将对象转成 json 对象,java 这边将类型打印出来是 string,如下:
消息已发送,应用的其它操作 {"user_id":111,"username":"lili","mobile":"110","msg":"系统信息"} class java.lang.String 但是 celery 会报另外一种错误: [2018-11-19 14:38:22,055: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: u'{"user_id":111,"username":"lili","mobile":"110","msg":"系统信息"}' (80b) {content_type:'text/plain' content_encoding:'UTF-8' delivery_info:{u'consumer_tag': u'None4', u'redelivered': False, u'routing_key': u'feifei.blog', u'delivery_tag': 4, u'exchange': u'feifei'} headers={}} 这里看到接收的实际类型应该是 content_type:'text/plain' 但是 celery 报错意思是不认识这个 message |
3
neoblackcap 2018-11-19 15:05:08 +08:00 1
你改成 application/json 就可以了,content_type:"text/plain"当然是不认识,纯文本,没法用
|
4
pythonCoder OP 是这个问题,看来理想很丰满
|
5
pythonCoder OP @neoblackcap 发现还是不行,报错内容变了:
[2018-11-19 15:27:36,288: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: {u'username': u'libin', u'mobile': u'15210832508', u'user_id': 111, u'device_id': u'88889999'} (80b) {content_type:'application/json' content_encoding:None delivery_info:{u'consumer_tag': u'None4', u'redelivered': False, u'routing_key': u'feifei.blog', u'delivery_tag': 2, u'exchange': u'feifei'} headers={}} |
6
cmonkey 2018-11-19 15:47:44 +08:00 1
|
7
pythonCoder OP @cmonkey 谢谢,试过了,但是不是这个原因导致的,我看了下我都没安装过 librabbitmq 这个组件。
|
8
baocaixiong 2018-11-19 17:51:25 +08:00
你发到 rabbitmq 的数据格式要遵守 celery 的数据格式呀,随便传一个肯定不行呀
|
9
NeverSayNever 2020-03-06 21:42:13 +08:00
要么消息推送的时候跟 celery 推送的消息格式一样
要么就是避免这种情况,中间层,将消息推送给 API ( api 入 mq,go 不操作 mq )。此 API 只做 tasks.*.apply_async() / delay...... 我的场景是 Go RabbitMq Celery Python https://blog.thinking.mobi/articles/2020/03/06/1583496791208.html |