Example how simple queue (channel) can be processed concurrently in Perl:
#!/usr/bin/perl
#use threads; # threads are buggy... Use forks.
#use threads::shared;
use forks;
use forks::shared; # deadlock => ( detect=>0, period=>2, resolve=>0 );
use feature ':5.24';
use strict;
use warnings;
use English qw( -no_match_vars );
use Const::Fast; ## libconst-fast-perl
use Thread::Queue;
use Time::HiRes qw(sleep);
use sigtrap 'handler' => sub {
return unless 0 == threads->tid; ## Main process tid is 0; run this handler only in main process.
say q{Meow};
exit 2;
}, qw(INT);
my $CPUs = eval { require Sys::CpuAffinity and Sys::CpuAffinity::getNumCpus(); } ## libsys-cpuaffinity-perl
|| qx{getconf _NPROCESSORS_ONLN}
|| 4
;
local $OUTPUT_AUTOFLUSH=1; # local $|=1;
const my $JOBSIZE => 17 * 8;
my $workers = 3* $CPUs - 1;
my @data :shared;
my $data_count :shared = 0;
my $q :shared = Thread::Queue->new(); # Make a new empty queue.
my $qd = int rand(22);
warn "I: queue depth is $qd\n";
$q->limit = $qd; # Set queue capacity. Random depth is just for fun. :)
## Takse one argument - Thread::Queue to fetch data from.
sub generator {
my ($q) = @_;
while ( my $val = $q->dequeue_timed(11) ) {
say "T: got ", $val;
## crunch-crunch
sleep rand(3); ## rate limit; simulating slow processing.
$val = "qqq" . $val;
## Locking this is very-very important;
## If already locked by another thread it will wait until lock can be acquired.
lock($data_count);
push @data, $val;
$data_count += 1;
} # Lock implicitly released at end of scope
warn "T: thread "
,threads->tid
," is finished.\n"
;
return;
}
## starting workers.
for (1..$workers) {
threads->create( \&generator, $q ) or die("E: can't fork\n");
};
#say Dumper ":::", forks->list;
for (my $i=1; $i<=$JOBSIZE; $i++) {
$q->enqueue($i); ## push data into queue.
}
warn "I: ending queue; no more items to add.\n";
$q->end(); ## have to do this so threads could finish.
warn "I: pending items in the queue: ",
,$q->pending()
,"\n"
;
warn "I: waiting for results\n";
#cond_wait($data_count);
sleep 0.5 while $data_count < $JOBSIZE; ## wait for workers to finish processing queue.
print "I: joining threads ";
for (threads->list()){
if ($_->is_joinable()){
$_->join(); ## un-fork workers: finished threads should be joined too.
print $_->tid
," "
;
}
}
say q{};
my $processed_count = scalar @data;
if ($data_count == $processed_count){
say "OK: processed ", $processed_count, " items.";
}else{
say "ERR: processed ", $processed_count, " of ", $data_count, ".";
exit 1;
}
__END__