How to implement a producer consumer pattern with multiple consumers and a common queue?
14 Ansichten (letzte 30 Tage)
Ältere Kommentare anzeigen
TLDR:
How do I setup a producer-consumer-pattern with one producer and multiple consumers receiving items from a common queue? Consumers don't have to communicate with each other nor send any information to the client. (But returning some status information might be nice in the future).
Long version:
My function takes several minutes (up to hours) to complete. In its current implementation it doesn't return anything but writes the results to disk. I want to process many (~10k) inputs. So this takes a while, but I have a remote machine with 24 cores.
I've been after this for a long time, never reaching a final solution. The obvious way to go is to use a parfor-loop. But since it is blocking, I cannot submit new inputs while the loop is running. Also, I cannot cancel the execution, change the order of upcoming calls, etc.
At first, I was pleased with a solution using parfeval:
% Pool is initialized
pool = parpool("IdleTimeout", inf);
% Settings are received ...
n_settings = numel(settings);
futures(n_settings) = parallel.FevalFuture;
for i_set = 1:n_settings
futures(i_set) = parfeval(...
pool, @my_slow_function, 0, settings(i_set));
end
In the real code this is packed into a bunch of methods having access to the ever running pool. This would be a nice starting point. But: Those futures tend to occupy a lot of RAM and they are created all at once. After while I run out of memory. I guess due to finished futures, I do not need anymore (since the result was saved on disk).
I would very much like to use a producer consumer pattern. Like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
end
function consumer(queue_to_consumer)
while true
setting = queue_to_consumer.poll()
% Some checking for escaping the loop.
my_slow_function(setting);
end
end
queue = parallel.pool.PollableDataQueue;
n_consumer = 24
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)
Please, don't hint me at some syntax problems here. It's just to get my idea across. It won't work since the queue wasn't created on the worker. I can create a queue on each worker and send it back to the client. But then I end up with 24 queues, not one.
I read through the documentation several times and consulted with AI. Both without success. Did I miss something? Maybe a human has an idea.
0 Kommentare
Antworten (1)
Thomas Falch
am 5 Feb. 2025
Bearbeitet: Thomas Falch
am 15 Mai 2025
Starting from R2025a you can use the new "any-destination" PollableDataQueue to solve this problem. The any-destination PollableDataQueue makes it much easier to send data from the client to a worker, or from one worker to another.
The new queue is documented here: PollableDataQueue - Send and poll data between client and workers - MATLAB
With the new any-destination PollableDataQueue, your producer consumer pattern can be written like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
% Close queue to make all consumers stop
queue_to_consumer.close()
end
function consumer(queue_to_consumer)
while true
% Keep polling until producer closes queue
[setting, didReceive] = queue_to_consumer.poll(Inf)
if ~didReceive
break
end
my_slow_function(setting);
end
end
pool = parpool()
n_consumer = pool.NumWorkers - 1;
queue = parallel.pool.PollableDataQueue(Destination="any");
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)
Siehe auch
Kategorien
Mehr zu Parallel for-Loops (parfor) finden Sie in Help Center und File Exchange
Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!