How to implement a producer consumer pattern with multiple consumers and a common queue?

14 Ansichten (letzte 30 Tage)
TLDR:
How do I setup a producer-consumer-pattern with one producer and multiple consumers receiving items from a common queue? Consumers don't have to communicate with each other nor send any information to the client. (But returning some status information might be nice in the future).
Long version:
My function takes several minutes (up to hours) to complete. In its current implementation it doesn't return anything but writes the results to disk. I want to process many (~10k) inputs. So this takes a while, but I have a remote machine with 24 cores.
I've been after this for a long time, never reaching a final solution. The obvious way to go is to use a parfor-loop. But since it is blocking, I cannot submit new inputs while the loop is running. Also, I cannot cancel the execution, change the order of upcoming calls, etc.
At first, I was pleased with a solution using parfeval:
% Pool is initialized
pool = parpool("IdleTimeout", inf);
% Settings are received ...
n_settings = numel(settings);
futures(n_settings) = parallel.FevalFuture;
for i_set = 1:n_settings
futures(i_set) = parfeval(...
pool, @my_slow_function, 0, settings(i_set));
end
In the real code this is packed into a bunch of methods having access to the ever running pool. This would be a nice starting point. But: Those futures tend to occupy a lot of RAM and they are created all at once. After while I run out of memory. I guess due to finished futures, I do not need anymore (since the result was saved on disk).
I would very much like to use a producer consumer pattern. Like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
end
function consumer(queue_to_consumer)
while true
setting = queue_to_consumer.poll()
% Some checking for escaping the loop.
my_slow_function(setting);
end
end
queue = parallel.pool.PollableDataQueue;
n_consumer = 24
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)
Please, don't hint me at some syntax problems here. It's just to get my idea across. It won't work since the queue wasn't created on the worker. I can create a queue on each worker and send it back to the client. But then I end up with 24 queues, not one.
I read through the documentation several times and consulted with AI. Both without success. Did I miss something? Maybe a human has an idea.

Antworten (1)

Thomas Falch
Thomas Falch am 5 Feb. 2025
Bearbeitet: Thomas Falch am 15 Mai 2025
Starting from R2025a you can use the new "any-destination" PollableDataQueue to solve this problem. The any-destination PollableDataQueue makes it much easier to send data from the client to a worker, or from one worker to another.
With the new any-destination PollableDataQueue, your producer consumer pattern can be written like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
% Close queue to make all consumers stop
queue_to_consumer.close()
end
function consumer(queue_to_consumer)
while true
% Keep polling until producer closes queue
[setting, didReceive] = queue_to_consumer.poll(Inf)
if ~didReceive
break
end
my_slow_function(setting);
end
end
pool = parpool()
n_consumer = pool.NumWorkers - 1;
queue = parallel.pool.PollableDataQueue(Destination="any");
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)

Kategorien

Mehr zu Parallel for-Loops (parfor) finden Sie in Help Center und File Exchange

Produkte


Version

R2024a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by