2

This is my first attempt at the Perl Object Environment (POE). I am attempting to create a program that will run as a service that obtains a list of devices via couchdb, spawns child processes to ping them in 60 second intervals, while restricting the maximum number of concurrent child processes to 3.

I am able to successfully re-queue child processes after a delay (1 min), however, I am not sure how to manage multiple alarms/delays that call the same event. I am attempting to store them in $_[HEAP]->{timers}->{$_} where $_ is a given host.

#!/usr/bin/perl

use warnings;
use strict;
use POE qw(Wheel::Run Filter::Reference);
use CouchDB::Client;
use Data::Dumper;

use constant MAX_CONCURRENT_TASKS => 3;

our $couch      = CouchDB::Client->new(uri => 'http://192.168.1.100:5984/')->newDB('devices');

POE::Session->create(
        inline_states => {
                _start      => sub {
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        $_[KERNEL]->delay_set('refresh', 60);
                        $_[HEAP]->{timers}->{$_} = $_[KERNEL]->delay_set('spawn', 1, $_) for @{$_[HEAP]->{devices}};
                }, 
                refresh => sub { 
                        undef @{$_[HEAP]->{devices}};
                        $_[KERNEL]->delay_set('refresh', 60); 
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        print "\nRefreshing device list.\n\n";
                }, 
                spawn   => sub { 
                        if (keys(%{$_[HEAP]->{task}}) < MAX_CONCURRENT_TASKS) { 
                                print "Starting $_[ARG0].\t # of tasks running: ". keys(%{$_[HEAP]->{task}}),$/;
                                my $host = $_[ARG0];
                                my $task = POE::Wheel::Run->new(        
                                        Program      => sub { \&do_stuff($host) },
                                        StdoutFilter => POE::Filter::Reference->new(),
                                        StdoutEvent  => "task_result",
                                        StderrEvent  => "task_debug",
                                        CloseEvent   => "task_done"
                                );
                                $_[HEAP]->{task}->{$task->ID} = $task;
                                $_[KERNEL]->sig_child($task->PID, "sig_child", $_[ARG0]) 
                        } else { 
                                $_[KERNEL]->delay_adjust($_[HEAP]->{timers}->{$_[ARG0]}, 5);
                        }
                        print Dumper 'spawn', sort $_[HEAP]->{timers};
                },
                task_result => sub { 
                        print "Result for $_[ARG0]->{task}: $_[ARG0]->{status}\n";
                },
                task_done   => sub { 
                        delete $_[HEAP]->{task}->{$_[ARG0]};
                },
                task_debug  => sub { 
                        print "Debug: $_[ARG0]\n";
                },
                sig_child   => sub { 
                        delete $_[HEAP]->{$_[ARG1]};
                        $_[HEAP]->{timers}->{$_[ARG3]} = $_[KERNEL]->delay_set('spawn', 60, $_[ARG3]) if $_[ARG3] ~~ $_[HEAP]->{devices};  
                        $_[KERNEL]->alarm_remove($_[HEAP]->{timers}->{$_[ARG0]});
                }
        }
);
sub do_stuff {
        binmode(STDOUT);
        my $filter = POE::Filter::Reference->new();

        sleep(rand 5);

        my %result = (
                task   => shift,
                status => "complete.",
        );

        my $output = $filter->put([\%result]);
        print @$output;
}
POE::Kernel->run();
exit 0;

Any advice/strategies would be welcome.

Edit 1: I found out that $_[KERNEL]->delay wasn't setting timers on a per child basis. I am able to get this to work by using $_[KERNEL]->delay_set instead. What I am unable to piece together now, is how to restrict the program from running more than 3 processes at a given time.

I am creating the initial timers in _start. As spawn is being called $_[KERNEL]->delay_adjust should extend the delay by 5 seconds if the child process count is 3 or higher.

Apologies for the time it took to respond to questions, this lives on my work PC and this edit is on Monday, my first day back.

Mose
  • 541
  • 1
  • 11
  • 27
  • What do you mean by "_child should re-fire_" ? Normally a child process exits when it completes its task. If you allow that, you can spawn another child, I guess, but you do that anyway. Should new processes continue where the old ones left off (after 10 seconds)? Can you clarify this? – zdim Dec 17 '16 at 04:50
  • I should add -- i don't use POE, so the above comment may be naive in some sense. (For example, it may keep a pool of processes and re-use them, or something like that.) If that is the case please let me know. – zdim Dec 17 '16 at 05:08
  • Once a child is completed in wheel I want $_[KERNEL]->delay() to wait 10 seconds then re-create the child process. For instance if I want to ping 10 hosts perpetually, but wait 10 seconds between pinging any given host would be the best way I can think of explaining it. – Mose Dec 17 '16 at 09:43
  • That last ping example sounds like you want to ping one, then wait. The ping the next, then wait. Should that be program wide, or per child? The whole idea of POE is that stuff is concurrent so the children don't wait for each other. What good is that break, if I get it at all? Can you edit and explain that part a bit more? – simbabque Dec 17 '16 at 10:08
  • The next_task event creates 1 child per $_[HEAP]->{devices}. At this point they would be pinging independently of one and other. As each child completes, I am trying to leverage $_[KERNEL]->delay to wait, then recreate the child. This is attempted in the sig_child handler. I am on a tablet atm, will try to clarify via edit asap once I am back at a compter – Mose Dec 17 '16 at 10:19
  • @Mose To use your example, is this the goal -- start (say) three processes, where each pings their host. As one exits, after 10 seconds start another child which will ping the host of the one that exited. As another exits, again after 10 seconds start a new process that pings that child's host. Is this what you mean by "_recreate the child_", so to create a process that continues what the previous one was doing? – zdim Dec 17 '16 at 20:23
  • @zdim yes. I want the program to continually ping the same hosts, each host is a child process, each child process has a 10 second delay before restarting. Overall I'm trtying to keep a maximum child process count of 3 at any given time ( so as hosts are added the program does not get out of hand as the list grows). Sorry if I'm not expalining this well, this is my first attempt at POE. The POE cookbook I linked does just about everything I need except restarting children – Mose Dec 18 '16 at 04:04
  • @Mose I don't see how one can "_restart the child_" -- once the child exits and is reaped, it's gone. It is no more. What you can do, if I get it right, is to have each child return some ID of hosts that it worked with. Then 10 seconds after it exited you start another child with this info, so it knows to keep working with the same host. You can do that with `alarm` (or `delay`). But you have new children springing up anyway, if new hosts come up. So you'd need to coordinate that, to distinguish between a new host and when you are returning to an old one. Does this fit? – zdim Dec 19 '16 at 08:48
  • @zdim Thats what I am trying to do in sig_child via $_[KERNEL]->delay. Perhaps the verbage I am using isn't correct, I do realize once the child is reaped it is gone, however I am not seeing how I can leverage the delay call to re-create it. Right now sig_child will call the spawn method, but will only do so for the last device in the list. I would have expected that the output of 'spawn' would be 1 per device. If that where the case I could pass the device name back into a POE wheel after waiting 10 seconds. I'm finally back in front of my PC today, I'll try and make a more sensible example. – Mose Dec 19 '16 at 15:33
  • @Mose I just now noticed the edit. I'll try to look into this further, time permitting. – zdim Dec 29 '16 at 21:35
  • @zdim I managed to figure it out by forgoing the delay_adjust and scheduling a new delay. Will post the solution when able, hectic with the holidays so I haven't had a chance to follow up – Mose Dec 30 '16 at 01:03
  • @Mose Oh, great :) Thank you for letting me know! It is a good idea to post it when you get around to it. – zdim Dec 30 '16 at 02:36

0 Answers0