Friday 15 February 2013

python 2.7 - How to add callbacks to write a file while subscribing -


This is probably a question about Python callback, such as using pika. I am trying to develop some code that subscribes to a queue in RabbitMQ, processes a payload of a distributed message, and then writes that payload in a series of (disk) files, then a simple "Hello World "By using the example, I have added a callback function (which is called co-processed" callback ") in the argument so that any received message can be written in the payload file.

The main problem: I want to write some additional code, if a certain period has expired, for example 300sec (5 minutes), then this process should close the file and create a new one And should write any new messages received after that. And so on ...

But - as I see this issue is that the callback function is called only when the message comes in the queue. I think I need some process outside the callback function that has passed ....

The argument is that I want to create a set of disk files (unique names based on all timestamps If the messages received in the MQ queue become slow in the message, then I close the current open file (hence it can be processed from the forward side) and open the other.

I have also noted that after initially issuing a consumer call (channel.start_consuming), there is no code within it - why?

I have played with Python's multiprocessing module, but there is no luck till now.

There are some skeletal codes with pseudo-code comments: -

  #! / Usr / bin / env dragon import pika connection = pika.BlockingConnection (pika.ConnectionParameters (host = 'localhost')) channel = connection.channel () channel.queue_declare (Q = 'Hello') Print '[*] of messages Waiting for the CTRL + C 'def callback to exit the press (ch, method, properties, body): print "[x] received% r"% (body,) # to write message payload in a file Code (unique name) # If n seconds have expired, close the file and create a new file channel. Basic_consume (callback, queue = 'hello', no_ack = true) channel.start_consuming ()   

Thanks!

It may be worth looking at an alternative implementation, as the crop is blocking by nature By doing so, such a thing becomes difficult. You will need another thread in order to see IO, essentially to see if anything has been written in the last five minutes, and close it.

You can also place a timestamp, and once you receive a new callback. After enough time has passed, you can close the file, and create a new file. Although it keeps the file for a long time to keep open, but the data can be stopped at more than five minutes.

However, I suggest that you will take a look instead. This is the non-inhibitor option of the crock, which will allow you to easily solve your problem.

No comments:

Post a Comment