#!/usr/bin/env perl # This chunk of stuff was generated by App::FatPacker. To find the original # file's code, look for the end of this BEGIN block or the string 'FATPACK' BEGIN { my %fatpacked; $fatpacked{"App/RecordStream.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM'; use strict;use warnings;package App::RecordStream;use Module::Pluggable::Object;our$VERSION="4.0.25";sub operation_packages {sort {$a cmp $b}Module::Pluggable::Object->new(search_path=>"App::RecordStream::Operation",max_depth=>4,)->plugins}1; APP_RECORDSTREAM $fatpacked{"App/RecordStream/Accumulator.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_ACCUMULATOR'; package App::RecordStream::Accumulator;our$VERSION="4.0.25";sub accept_record {my$this=shift;$this->accumulate_record(shift)}sub accumulate_record {my$this=shift;push @{$this->get_records()},shift}sub get_records {my$this=shift;$this->{'RECORDS'}||= [];return$this->{'RECORDS'}}1; APP_RECORDSTREAM_ACCUMULATOR $fatpacked{"App/RecordStream/Aggregator.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR'; package App::RecordStream::Aggregator;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::BaseRegistry;use App::RecordStream::Site;use base ('App::RecordStream::BaseRegistry');sub make_aggregators {my$registry_class=shift;my%ret;for my$input (@_){my$spec=$input;my$name;if($spec =~ /^(.*)=(.*)$/){$name=$1;$spec=$2}if(!defined($name)){my@spec=split(/,/,$spec);$name=join("_",map {my$n=$_;$n =~ s!/!_!;$n}@spec)}$ret{$name}=$registry_class->parse_single_nameless_implementation($spec)}return \%ret}sub map_initial {my ($aggrs)=@_;my%ret;for my$name (keys(%$aggrs)){$ret{$name}=$aggrs->{$name}->initial()}return \%ret}sub map_combine {my ($aggrs,$cookies,$record)=@_;my%ret;for my$name (keys(%$aggrs)){$ret{$name}=$aggrs->{$name}->combine($cookies->{$name},$record)}return \%ret}sub map_squish {my ($aggrs,$cookies)=@_;my$return_record=App::RecordStream::Record->new();for my$name (keys(%$aggrs)){my$aggregator=$aggrs->{$name};my$value=$aggregator->squish($cookies->{$name});${$return_record->guess_key_from_spec($name)}=$value}return$return_record}sub typename {return "aggregator"}1; APP_RECORDSTREAM_AGGREGATOR $fatpacked{"App/RecordStream/Aggregator/Aggregation.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_AGGREGATION'; package App::RecordStream::Aggregator::Aggregation;our$VERSION="4.0.25";1; APP_RECORDSTREAM_AGGREGATOR_AGGREGATION $fatpacked{"App/RecordStream/Aggregator/Array.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_ARRAY'; package App::RecordStream::Aggregator::Array;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce::Field;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::MapReduce::Field';sub map_field {my ($this,$value)=@_;return [$value]}sub reduce {my ($this,$cookie,$cookie2)=@_;return [@$cookie,@$cookie2]}sub long_usage {return <register_implementation('array',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','array','VALUATION');1; Usage: array, Collect values from specified field into an array. EOF APP_RECORDSTREAM_AGGREGATOR_ARRAY $fatpacked{"App/RecordStream/Aggregator/Average.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_AVERAGE'; package App::RecordStream::Aggregator::Average;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::Ord2Univariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Univariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumx2)=@$cookie;return$sumx / $sum1}sub long_usage {return <register_implementation('average',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('avg',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','average','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','avg','VALUATION');1; Usage: avg, Average of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_AVERAGE $fatpacked{"App/RecordStream/Aggregator/Concatenate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_CONCATENATE'; package App::RecordStream::Aggregator::Concatenate;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce::Field;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::MapReduce::Field';sub new {my$class=shift;my$delim=shift;my$field=shift;my$this=$class->SUPER::new($field);$this->{'delim'}=$delim;return$this}sub new_from_valuation {my$class=shift;my$delim=shift;my$valuation=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'delim'}=$delim;return$this}sub map_field {my ($this,$value)=@_;return [$value]}sub reduce {my ($this,$cookie,$cookie2)=@_;return [@$cookie,@$cookie2]}sub squish {my ($this,$cookie)=@_;return join($this->{'delim'},@$cookie)}sub long_usage {return <register_implementation('concatenate',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('concat',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','concatenate','SCALAR','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','concat','SCALAR','VALUATION');1; Usage: concat,, Concatenate values from specified field. EOF APP_RECORDSTREAM_AGGREGATOR_CONCATENATE $fatpacked{"App/RecordStream/Aggregator/Correlation.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_CORRELATION'; package App::RecordStream::Aggregator::Correlation;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::Ord2Bivariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Bivariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumy,$sumxy,$sumx2,$sumy2)=@$cookie;return ($sumxy * $sum1 - $sumx * $sumy)/ sqrt(($sumx2 * $sum1 - $sumx ** 2)* ($sumy2 * $sum1 - $sumy ** 2))}sub long_usage {return <register_implementation('corr',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('correl',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('correlation',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','corr','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','correl','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','correlation','VALUATION','VALUATION');1; Usage: corr,, Correlation of specified fields. This is Cov(X, Y) / sqrt(Var(X) * Var(Y)). See help on aggregators cov and var for how Cov(...) and Var(...) are computed. Ultimately this value is in [-1, 1] where larger negative values indicate larger inverse correlation and larger positive values indicate larger positive correlation. EOF APP_RECORDSTREAM_AGGREGATOR_CORRELATION $fatpacked{"App/RecordStream/Aggregator/Count.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_COUNT'; package App::RecordStream::Aggregator::Count;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::MapReduce';sub map {return 1}sub reduce {my ($this,$cookie,$cookie2)=@_;return$cookie + $cookie2}sub argct {return 0}sub long_usage {return <register_implementation('count',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('ct',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'count');App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'ct');1; Usage: count Counts number of (non-unique) records. EOF APP_RECORDSTREAM_AGGREGATOR_COUNT $fatpacked{"App/RecordStream/Aggregator/CountBy.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_COUNTBY'; package App::RecordStream::Aggregator::CountBy;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub initial {return {}}sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;$cookie->{$value}++;return$cookie}sub squish {my$this=shift;my$cookie=shift;return$cookie}sub short_usage {return "counts by unique value for a field"}sub long_usage {return <register_implementation('countby',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('cb',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','countby','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','cb','VALUATION');1; Usage: cb, Returns a list of uniq values associated with their counts. Unlike most other aggregators, the value of the field returned will actually be a hash, with keys of uniq fields, and values of the counts. EOF APP_RECORDSTREAM_AGGREGATOR_COUNTBY $fatpacked{"App/RecordStream/Aggregator/Covariance.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_COVARIANCE'; package App::RecordStream::Aggregator::Covariance;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::Ord2Bivariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Bivariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumy,$sumxy,$sumx2,$sumy2)=@$cookie;return ($sumxy / $sum1)- ($sumx / $sum1)* ($sumy / $sum1)}sub long_usage {return <register_implementation('cov',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('covar',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('covariance',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','cov','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','covar','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','covariance','VALUATION','VALUATION');1; Usage: cov,, Covariance of specified fields. EOF APP_RECORDSTREAM_AGGREGATOR_COVARIANCE $fatpacked{"App/RecordStream/Aggregator/DistinctCount.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_DISTINCTCOUNT'; package App::RecordStream::Aggregator::DistinctCount;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base qw(App::RecordStream::Aggregator::Aggregation);sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my ($valuation)=@_;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub squish {my ($this,$cookie)=@_;return scalar(keys(%$cookie))}sub short_usage {return "count unique values from provided field"}sub long_usage {return <{'valuation'}->evaluate_record($record);$cookie->{$value}=1;return$cookie}App::RecordStream::Aggregator->register_implementation('dcount',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('dct',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('distinctcount',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('distinctct',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','dcount','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','dct','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','distinctcount','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','distinctct','VALUATION');1; Usage: dct, Finds the number of unique values for a field and returns it. Will load all values into memory. EOF APP_RECORDSTREAM_AGGREGATOR_DISTINCTCOUNT $fatpacked{"App/RecordStream/Aggregator/First.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_FIRST'; package App::RecordStream::Aggregator::First;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;return defined($cookie)? $cookie : $value}sub short_usage {return "first value for a field"}sub long_usage {return <register_implementation('first',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','first','VALUATION');1; Usage: first, First value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_FIRST $fatpacked{"App/RecordStream/Aggregator/FirstRecord.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_FIRSTRECORD'; package App::RecordStream::Aggregator::FirstRecord;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto);sub combine {my$this=shift;my$cookie=shift;my$record=shift;return$record unless (defined$cookie);return$cookie}sub short_usage {return "first record"}sub long_usage {return <register_implementation('firstrecord',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('firstrec',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'firstrecord');App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'firstrec');1; Usage: first Returns the first record. EOF APP_RECORDSTREAM_AGGREGATOR_FIRSTRECORD $fatpacked{"App/RecordStream/Aggregator/InjectInto.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INJECTINTO'; package App::RecordStream::Aggregator::InjectInto;our$VERSION="4.0.25";use App::RecordStream::Aggregator;use base qw(App::RecordStream::Aggregator::Aggregation);use strict;use warnings;sub new {my$class=shift;my$this={};bless$this,$class;return$this}sub initial {return undef}sub combine {die "InjectInto subclass did not implement combine.\n"}sub squish {my ($this,$cookie)=@_;return$cookie}1; APP_RECORDSTREAM_AGGREGATOR_INJECTINTO $fatpacked{"App/RecordStream/Aggregator/InjectInto/Field.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INJECTINTO_FIELD'; package App::RecordStream::Aggregator::InjectInto::Field;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use App::RecordStream::Aggregator::InjectInto;use base qw(App::RecordStream::Aggregator::InjectInto);sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$valuation=shift;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub initial {return undef}sub combine {my$this=shift;my$cookie=shift;my$record=shift;my$value=$this->get_valuation()->evaluate_record($record);if (defined$value){return$this->combine_field($cookie,$value)}else {return$cookie}}sub get_valuation {my$this=shift;return$this->{'valuation'}}sub squish {my ($this,$cookie)=@_;return$cookie}sub argct {return 1}1; APP_RECORDSTREAM_AGGREGATOR_INJECTINTO_FIELD $fatpacked{"App/RecordStream/Aggregator/InjectInto/Subrefs.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INJECTINTO_SUBREFS'; package App::RecordStream::Aggregator::InjectInto::Subrefs;use strict;use warnings;use App::RecordStream::Aggregator::InjectInto;use base qw(App::RecordStream::Aggregator::InjectInto);sub new {my$class=shift;my$initial=shift;my$combine=shift;my$squish=shift;my$this={'initial'=>$initial,'combine'=>$combine,'squish'=>$squish,};bless$this,$class;return$this}sub initial {my$this=shift;return$this->{'initial'}->()}sub combine {my$this=shift;return$this->{'combine'}->(@_)}sub squish {my$this=shift;return$this->{'squish'}->(@_)}1; APP_RECORDSTREAM_AGGREGATOR_INJECTINTO_SUBREFS $fatpacked{"App/RecordStream/Aggregator/Internal/Constant.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INTERNAL_CONSTANT'; package App::RecordStream::Aggregator::Internal::Constant;use strict;use warnings;use App::RecordStream::Aggregator::Aggregation;use base 'App::RecordStream::Aggregator::Aggregation';sub new {my$class=shift;my$value=shift;my$this={'VALUE'=>$value,};bless$this,$class;return$this}sub initial {return undef}sub combine {return undef}sub squish {my$this=shift;return$this->{'VALUE'}}1; APP_RECORDSTREAM_AGGREGATOR_INTERNAL_CONSTANT $fatpacked{"App/RecordStream/Aggregator/Internal/ForField.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INTERNAL_FORFIELD'; package App::RecordStream::Aggregator::Internal::ForField;use strict;use warnings;use App::RecordStream::Aggregator::Aggregation;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Aggregation';sub new {my$class=shift;my$regex=shift;my$snippet=shift;my$this={'REGEX'=>$regex,'SNIPPET'=>$snippet,};bless$this,$class;return$this}sub initial {return {}}sub combine {my$this=shift;my$cookie=shift;my$record=shift;for my$field (keys(%$record)){next unless($field =~ $this->{'REGEX'});if(!exists($cookie->{$field})){my$agg=$this->{'SNIPPET'}->evaluate_as('AGGREGATOR',{'$f'=>$field});$cookie->{$field}=[$agg,$agg->initial()]}my ($agg,$sub_cookie)=@{$cookie->{$field}};$sub_cookie=$agg->combine($sub_cookie,$record);$cookie->{$field}->[1]=$sub_cookie}return$cookie}sub squish {my$this=shift;my$cookie=shift;for my$field (keys(%$cookie)){my ($agg,$sub_cookie)=@{$cookie->{$field}};$cookie->{$field}=$agg->squish($sub_cookie)}return$cookie}App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new','for_field','SCALAR','SNIPPET');1; APP_RECORDSTREAM_AGGREGATOR_INTERNAL_FORFIELD $fatpacked{"App/RecordStream/Aggregator/Internal/ForField2.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INTERNAL_FORFIELD2'; package App::RecordStream::Aggregator::Internal::ForField2;use strict;use warnings;use App::RecordStream::Aggregator::Aggregation;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Aggregation';sub new {my$class=shift;my$regex1=shift;my$regex2=shift;my$snippet=shift;my$this={'REGEX1'=>$regex1,'REGEX2'=>$regex2,'SNIPPET'=>$snippet,};bless$this,$class;return$this}sub initial {return {}}sub combine {my$this=shift;my$cookie=shift;my$record=shift;my@field1;my@field2;for my$field (keys(%$record)){push@field1,$field if($field =~ $this->{'REGEX1'});push@field2,$field if($field =~ $this->{'REGEX2'})}for my$field1 (@field1){for my$field2 (@field2){my$fieldc="$field1,$field2";if(!exists($cookie->{$fieldc})){my$agg=$this->{'SNIPPET'}->evaluate_as('AGGREGATOR',{'$f1'=>$field1,'$f2'=>$field2});$cookie->{$fieldc}=[$agg,$agg->initial()]}my ($agg,$sub_cookie)=@{$cookie->{$fieldc}};$sub_cookie=$agg->combine($sub_cookie,$record);$cookie->{$fieldc}->[1]=$sub_cookie}}return$cookie}sub squish {my$this=shift;my$cookie=shift;for my$fieldc (keys(%$cookie)){my ($agg,$sub_cookie)=@{$cookie->{$fieldc}};$cookie->{$fieldc}=$agg->squish($sub_cookie)}return$cookie}App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new','for_field','SCALAR','SCALAR','SNIPPET');1; APP_RECORDSTREAM_AGGREGATOR_INTERNAL_FORFIELD2 $fatpacked{"App/RecordStream/Aggregator/Internal/Ord2BivariateMap.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INTERNAL_ORD2BIVARIATEMAP'; package App::RecordStream::Aggregator::Internal::Ord2BivariateMap;use strict;use warnings;use App::RecordStream::Aggregator::Ord2Bivariate;use base 'App::RecordStream::Aggregator::Ord2Bivariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumy,$sumxy,$sumx2,$sumy2)=@$cookie;my$corr=($sumxy * $sum1 - $sumx * $sumy)/ sqrt(($sumx2 * $sum1 - $sumx ** 2)* ($sumy2 * $sum1 - $sumy ** 2));my$cov=($sumxy / $sum1)- ($sumx / $sum1)* ($sumy / $sum1);my$varx=($sumx2 / $sum1)- ($sumx / $sum1)** 2;my$vary=($sumy2 / $sum1)- ($sumy / $sum1)** 2;return {'ct'=>$sum1,'sumx'=>$sumx,'avgx'=>$sumx / $sum1,'sumx2'=>$sumx2,'avgx2'=>$sumx2 / $sum1,'varx'=>$varx,'stddevx'=>sqrt($varx),'sumy'=>$sumy,'avgy'=>$sumy / $sum1,'sumy2'=>$sumy2,'avgy2'=>$sumy2 / $sum1,'vary'=>$vary,'stddevy'=>sqrt($vary),'sumxy'=>$sumxy,'avgxy'=>$sumxy / $sum1,'cov'=>$cov,'corr'=>$corr,}}App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','ord2map','VALUATION','VALUATION');1; APP_RECORDSTREAM_AGGREGATOR_INTERNAL_ORD2BIVARIATEMAP $fatpacked{"App/RecordStream/Aggregator/Internal/Ord2UnivariateMap.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_INTERNAL_ORD2UNIVARIATEMAP'; package App::RecordStream::Aggregator::Internal::Ord2UnivariateMap;use strict;use warnings;use App::RecordStream::Aggregator::Ord2Univariate;use base 'App::RecordStream::Aggregator::Ord2Univariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumx2)=@$cookie;my$var=($sumx2 / $sum1)- ($sumx / $sum1)** 2;return {'ct'=>$sum1,'sum'=>$sumx,'avg'=>$sumx / $sum1,'sum2'=>$sumx2,'avg2'=>$sumx2 / $sum1,'var'=>$var,'stddev'=>sqrt($var),}}App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','ord2map','VALUATION');1; APP_RECORDSTREAM_AGGREGATOR_INTERNAL_ORD2UNIVARIATEMAP $fatpacked{"App/RecordStream/Aggregator/Last.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_LAST'; package App::RecordStream::Aggregator::Last;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;return$value}sub short_usage {return "last value for a field"}sub long_usage {return <register_implementation('last',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','last','VALUATION');1; Usage: last, Last value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_LAST $fatpacked{"App/RecordStream/Aggregator/LastRecord.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_LASTRECORD'; package App::RecordStream::Aggregator::LastRecord;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto);sub combine {my$this=shift;my$cookie=shift;my$record=shift;return$record}sub short_usage {return "last record seen"}sub long_usage {return <register_implementation('lastrecord',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('lastrec',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'lastrecord');App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'lastrec');1; Usage: last_record Last record seen. EOF APP_RECORDSTREAM_AGGREGATOR_LASTRECORD $fatpacked{"App/RecordStream/Aggregator/LinearRegression.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_LINEARREGRESSION'; package App::RecordStream::Aggregator::LinearRegression;use strict;use warnings;use App::RecordStream::Aggregator::Ord2Bivariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Bivariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumy,$sumxy,$sumx2,$sumy2)=@$cookie;my$beta=($sumxy * $sum1 - $sumx * $sumy)/ ($sumx2 * $sum1 - $sumx ** 2);my$alpha=($sumy - $beta * $sumx)/ $sum1;my$sbeta_numerator=($sumy2 + $alpha ** 2 * $sum1 + $beta ** 2 * $sumx2 - 2 * $alpha * $sumy + 2 * $alpha * $beta * $sumx - 2 * $beta * $sumxy)/ ($sum1 - 2);my$sbeta_denominator=$sumx2 - $sumx * $sumx / $sum1;my$sbeta=sqrt($sbeta_numerator / $sbeta_denominator);my$salpha=$sbeta * sqrt($sumx2 / $sum1);return {'alpha'=>$alpha,'beta'=>$beta,'beta_se'=>$sbeta,'alpha_se'=>$salpha,}}sub long_usage {return <register_implementation('linreg',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('linearregression',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','linreg','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','linearregression','VALUATION','VALUATION');1; Usage: linreg,, Dump various status from a linear regression of y against x. EOF APP_RECORDSTREAM_AGGREGATOR_LINEARREGRESSION $fatpacked{"App/RecordStream/Aggregator/MapReduce.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE'; package App::RecordStream::Aggregator::MapReduce;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Aggregator::Aggregation);sub new {my$class=shift;my$this={};bless$this,$class;return$this}sub initial {return undef}sub combine {my ($this,$cookie,$record)=@_;my$cookie2=$this->map($record);if(!defined($cookie)){return$cookie2}if(!defined($cookie2)){return$cookie}return$this->reduce($cookie,$cookie2)}sub squish {my ($this,$cookie)=@_;return$cookie}sub map {die "MapReduce subclass did not implement map.\n"}sub reduce {die "MapReduce subclass did not implement reduce.\n"}1; APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE $fatpacked{"App/RecordStream/Aggregator/MapReduce/Field.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_FIELD'; package App::RecordStream::Aggregator::MapReduce::Field;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$valuation=shift;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub map {my ($this,$record)=@_;if(!defined($this->{'valuation'})){return undef}return$this->map_field($this->{'valuation'}->evaluate_record($record))}sub map_field {my ($this,$value)=@_;return$value}sub argct {return 1}1; APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_FIELD $fatpacked{"App/RecordStream/Aggregator/MapReduce/FieldSet.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_FIELDSET'; package App::RecordStream::Aggregator::MapReduce::FieldSet;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my@fields=@_;return new_from_valuation($class,map {App::RecordStream::DomainLanguage::Valuation::KeySpec->new($_)}@fields)}sub new_from_valuation {my$class=shift;my@valuations=@_;my$this={'valuations'=>\@valuations,};bless$this,$class;return$this}sub map {my ($this,$record)=@_;return$this->map_fields(map {$_->evaluate_record($record)}@{$this->{'valuations'}})}sub map_fields {die "FieldSet subclass does not implement map_fields\n"}1; APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_FIELDSET $fatpacked{"App/RecordStream/Aggregator/MapReduce/Subrefs.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_SUBREFS'; package App::RecordStream::Aggregator::MapReduce::Subrefs;use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my$map=shift;my$reduce=shift;my$squish=shift;my$this={'map'=>$map,'reduce'=>$reduce,'squish'=>$squish,};bless$this,$class;return$this}sub map {my$this=shift;return$this->{'map'}->(@_)}sub reduce {my$this=shift;return$this->{'reduce'}->(@_)}sub squish {my$this=shift;return$this->{'squish'}->(@_)}1; APP_RECORDSTREAM_AGGREGATOR_MAPREDUCE_SUBREFS $fatpacked{"App/RecordStream/Aggregator/Maximum.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MAXIMUM'; package App::RecordStream::Aggregator::Maximum;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;return$value unless (defined$cookie);if ($cookie < $value){return$value}return$cookie}sub short_usage {return "maximum value for a field"}sub long_usage {return <register_implementation('maximum',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('max',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','maximum','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','max','VALUATION');1; Usage: max, Maximum value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_MAXIMUM $fatpacked{"App/RecordStream/Aggregator/Minimum.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MINIMUM'; package App::RecordStream::Aggregator::Minimum;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;return$value unless (defined$cookie);if ($cookie > $value){return$value}return$cookie}sub short_usage {return "minimum value for a field"}sub long_usage {return <register_implementation('minimum',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('min',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','minimum','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','min','VALUATION');1; Usage: min, Minimum value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_MINIMUM $fatpacked{"App/RecordStream/Aggregator/Mode.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_MODE'; package App::RecordStream::Aggregator::Mode;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub initial {return {}}sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;$cookie->{$value}++;return$cookie}sub squish {my$this=shift;my$cookie=shift;my@keys=keys %$cookie;my$max_key=shift@keys;my$max_value=$cookie->{$max_key};for my$key (@keys){my$value=$cookie->{$key};if ($max_value < $value){$max_key=$key;$max_value=$value}}return$max_key}sub short_usage {return "most common value for a field"}sub long_usage {print <register_implementation('mode',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','mode','VALUATION');1; Usage: mode, Finds the most common value for a field and returns it. Will load all values into memory. EOF APP_RECORDSTREAM_AGGREGATOR_MODE $fatpacked{"App/RecordStream/Aggregator/Ord2Bivariate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_ORD2BIVARIATE'; package App::RecordStream::Aggregator::Ord2Bivariate;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce::FieldSet;use App::RecordStream::Aggregator;use base 'App::RecordStream::Aggregator::MapReduce::FieldSet';sub map_fields {my ($this,$x,$y)=@_;return [1,$x,$y,$x * $y,$x * $x,$y * $y]}sub reduce {my ($this,$cookie,$cookie2)=@_;my ($sum1_1,$sumx_1,$sumy_1,$sumxy_1,$sumx2_1,$sumy2_1)=@$cookie;my ($sum1_2,$sumx_2,$sumy_2,$sumxy_2,$sumx2_2,$sumy2_2)=@$cookie2;return [$sum1_1 + $sum1_2,$sumx_1 + $sumx_2,$sumy_1 + $sumy_2,$sumxy_1 + $sumxy_2,$sumx2_1 + $sumx2_2,$sumy2_1 + $sumy2_2]}sub squish {die "Ord2Bivariate subclass doesn't implement squish\n"}sub argct {return 2}1; APP_RECORDSTREAM_AGGREGATOR_ORD2BIVARIATE $fatpacked{"App/RecordStream/Aggregator/Ord2Univariate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_ORD2UNIVARIATE'; package App::RecordStream::Aggregator::Ord2Univariate;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce::Field;use App::RecordStream::Aggregator;use base 'App::RecordStream::Aggregator::MapReduce::Field';sub map_field {my ($this,$x)=@_;return [1,$x,$x * $x]}sub reduce {my ($this,$cookie,$cookie2)=@_;my ($sum1_1,$sumx_1,$sumx2_1)=@$cookie;my ($sum1_2,$sumx_2,$sumx2_2)=@$cookie2;return [$sum1_1 + $sum1_2,$sumx_1 + $sumx_2,$sumx2_1 + $sumx2_2]}sub squish {die "Ord2Univariate subclass doesn't implement squish\n"}1; APP_RECORDSTREAM_AGGREGATOR_ORD2UNIVARIATE $fatpacked{"App/RecordStream/Aggregator/Percentile.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_PERCENTILE'; package App::RecordStream::Aggregator::Percentile;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub new {my$class=shift;my$percentile=shift;my$field=shift;my$this=$class->SUPER::new($field);$this->{'percentile'}=$percentile;return$this}sub new_from_valuation {my$class=shift;my$percentile=shift;my$valuation=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'percentile'}=$percentile;return$this}sub initial {return []}sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;push @$cookie,$value;return$cookie}sub squish {my$this=shift;my$cookie=shift;my$percentile=$this->{'percentile'};my@sorted=sort {$a <=> $b}@$cookie;my$index=int((scalar@sorted)* ($percentile / 100));if ($index==scalar@sorted){$index--}return$sorted[$index]}sub short_usage {return "value of pXX for field"}sub long_usage {print <register_implementation('percentile',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('perc',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','percentile','SCALAR','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','perc','SCALAR','VALUATION');1; Usage: per,, Finds the field value which percent of values are less than. This is computed by creating an array of all values, sorting, and indexing into it at the floor((percentile / 100) * length) point EOF APP_RECORDSTREAM_AGGREGATOR_PERCENTILE $fatpacked{"App/RecordStream/Aggregator/PercentileMap.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_PERCENTILEMAP'; package App::RecordStream::Aggregator::PercentileMap;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Field;use App::RecordStream::DomainLanguage::Registry;use base qw(App::RecordStream::Aggregator::InjectInto::Field);sub _make_percentiles {my$percentiles=shift;if(ref($percentiles)eq "ARRAY"){return$percentiles}return [split(' ',$percentiles)]}sub new {my$class=shift;my$percentiles=shift;my$field=shift;my$this=$class->SUPER::new($field);$this->{'percentiles'}=_make_percentiles($percentiles);return$this}sub new_from_valuation {my$class=shift;my$percentiles=shift;my$valuation=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'percentiles'}=_make_percentiles($percentiles);return$this}sub initial {return []}sub combine_field {my$this=shift;my$cookie=shift;my$value=shift;push @$cookie,$value;return$cookie}sub squish {my$this=shift;my$cookie=shift;my@sorted=sort {$a <=> $b}@$cookie;my%ret;for my$percentile (@{$this->{'percentiles'}}){my$index=int((scalar@sorted)* ($percentile / 100));if($index==scalar(@sorted)){$index--}$ret{$percentile}=$sorted[$index]}return \%ret}sub short_usage {return "map of percentile values for field"}sub long_usage {print <register_implementation('percentilemap',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('percmap',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','percentilemap','SCALAR','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','percmap','SCALAR','VALUATION');1; Usage: percmap,, Finds the field values which percent of values are less than. This is computed by creating an array of all values, sorting, and indexing into it at the floor((percentile / 100) * length) point will be perl split to determine percentiles to compute. Output is a hash whose keys are percentiles and whose values are corresponding field values. EOF APP_RECORDSTREAM_AGGREGATOR_PERCENTILEMAP $fatpacked{"App/RecordStream/Aggregator/RecordForMaximum.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_RECORDFORMAXIMUM'; package App::RecordStream::Aggregator::RecordForMaximum;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my ($valuation)=@_;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub map {my ($this,$record)=@_;my$value=$this->{'valuation'}->evaluate_record($record);return [$value,$record]}sub reduce {my ($this,$cookie1,$cookie2)=@_;my ($v1,$r1)=@$cookie1;my ($v2,$r2)=@$cookie2;if($v1 > $v2){return$cookie1}return$cookie2}sub squish {my ($this,$cookie)=@_;my ($v,$r)=@$cookie;return$r}sub argct {return 1}sub short_usage {return "returns the record corresponding to the maximum value for a field"}sub long_usage {return <register_implementation('recformax',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recformaximum',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recordformax',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recordformaximum',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recformax','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recformaximum','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recordformax','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recordformaximum','VALUATION');1; Usage: recformax, The record corresponding to the maximum value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_RECORDFORMAXIMUM $fatpacked{"App/RecordStream/Aggregator/RecordForMinimum.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_RECORDFORMINIMUM'; package App::RecordStream::Aggregator::RecordForMinimum;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my ($valuation)=@_;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub map {my ($this,$record)=@_;my$value=$this->{'valuation'}->evaluate_record($record);return [$value,$record]}sub reduce {my ($this,$cookie1,$cookie2)=@_;my ($v1,$r1)=@$cookie1;my ($v2,$r2)=@$cookie2;if($v1 < $v2){return$cookie1}return$cookie2}sub squish {my ($this,$cookie)=@_;my ($v,$r)=@$cookie;return$r}sub argct {return 1}sub short_usage {return "returns the record corresponding to the minimum value for a field"}sub long_usage {return <register_implementation('recformin',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recforminimum',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recordformin',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recordforminimum',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recformin','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recforminimum','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recordformin','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','recordforminimum','VALUATION');1; Usage: recformin, The record corresponding to the minimum value of specified field. EOF APP_RECORDSTREAM_AGGREGATOR_RECORDFORMINIMUM $fatpacked{"App/RecordStream/Aggregator/Records.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_RECORDS'; package App::RecordStream::Aggregator::Records;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::MapReduce';sub new {my$class=shift;my$this={};bless$this,$class;return$this}sub map {my ($this,$record)=@_;return [$record]}sub reduce {my ($this,$cookie1,$cookie2)=@_;return [@$cookie1,@$cookie2]}sub argct {return 0}sub short_usage {return "returns an arrayref of all records"}sub long_usage {return <register_implementation('records',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('recs',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'records');App::RecordStream::DomainLanguage::Registry::register_ctor(__PACKAGE__,'recs');1; Usage: records An arrayref of all records. EOF APP_RECORDSTREAM_AGGREGATOR_RECORDS $fatpacked{"App/RecordStream/Aggregator/StandardDeviation.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_STANDARDDEVIATION'; package App::RecordStream::Aggregator::StandardDeviation;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::Ord2Univariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Univariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumx2)=@$cookie;return sqrt(($sumx2 / $sum1)- ($sumx / $sum1)** 2)}sub long_usage {return <register_implementation('stddev',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','stddev','VALUATION');1; Usage: stddev, Standard deviation of specified fields. This is computed as StdDev(X) = sqrt(E[(X - E[X])^2]). Standard deviation is an indication of deviation from average value. EOF APP_RECORDSTREAM_AGGREGATOR_STANDARDDEVIATION $fatpacked{"App/RecordStream/Aggregator/Sum.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_SUM'; package App::RecordStream::Aggregator::Sum;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::MapReduce::Field;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::MapReduce::Field';sub reduce {my ($this,$cookie,$cookie2)=@_;return$cookie + $cookie2}sub long_usage {return <register_implementation('sum',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','sum','VALUATION');1; Usage: sum, Sums specified field. EOF APP_RECORDSTREAM_AGGREGATOR_SUM $fatpacked{"App/RecordStream/Aggregator/UniqArray.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_UNIQARRAY'; package App::RecordStream::Aggregator::UniqArray;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base qw(App::RecordStream::Aggregator::Aggregation);sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$valuation=shift;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub squish {my ($this,$cookie)=@_;return [sort keys %$cookie ]}sub long_usage {return <{'valuation'}->evaluate_record($record);$cookie->{$value}=1;return$cookie}App::RecordStream::Aggregator->register_implementation('uarray',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','uarray','VALUATION');1; Usage: uarray, Collect unique values from specified field into an array. EOF APP_RECORDSTREAM_AGGREGATOR_UNIQARRAY $fatpacked{"App/RecordStream/Aggregator/UniqConcatenate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_UNIQCONCATENATE'; package App::RecordStream::Aggregator::UniqConcatenate;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base qw(App::RecordStream::Aggregator::Aggregation);sub new {my$class=shift;my$delim=shift;my$field=shift;return new_from_valuation($class,$delim,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$delim=shift;my$valuation=shift;my$this={'valuation'=>$valuation,'delim'=>$delim,};bless$this,$class;return$this}sub squish {my ($this,$cookie)=@_;return join($this->{'delim'},sort(keys(%$cookie)))}sub long_usage {return <{'valuation'}->evaluate_record($record);$cookie->{$value}=1;return$cookie}App::RecordStream::Aggregator->register_implementation('uconcatenate',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('uconcat',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','uconcatenate','SCALAR','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','uconcat','SCALAR','VALUATION');1; Usage: uconcat,, Concatenate unique values from specified field. EOF APP_RECORDSTREAM_AGGREGATOR_UNIQCONCATENATE $fatpacked{"App/RecordStream/Aggregator/ValuesToKeys.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_VALUESTOKEYS'; package App::RecordStream::Aggregator::ValuesToKeys;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator;use base qw(App::RecordStream::Aggregator::MapReduce::FieldSet);sub map_fields {my ($this,$key,$value)=@_;return {$key=>$value }}sub reduce {my ($this,$cookie1,$cookie2)=@_;for my$key (keys %$cookie2){my$value=$cookie2->{$key};$cookie1->{$key}=$value}return$cookie1}sub long_usage {return <register_implementation('valuestokeys',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('vk',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','valuestokeys','VALUATION','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','vk','VALUATION','VALUATION');1; Usage: valuestokeys,, Take the specified keyfield, use its value as the key for the value of value field.. For instance: { k: 'FOO', t: 2 } { k: 'BAR', t: 5 } becomes: { 'FOO': 2, 'BAR': 5 } with the aggregator 'vk,k,t'. Repeated keyfield values will clobber earlier instances EOF APP_RECORDSTREAM_AGGREGATOR_VALUESTOKEYS $fatpacked{"App/RecordStream/Aggregator/Variance.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_AGGREGATOR_VARIANCE'; package App::RecordStream::Aggregator::Variance;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Aggregator::Ord2Univariate;use App::RecordStream::Aggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Aggregator::Ord2Univariate';sub squish {my ($this,$cookie)=@_;my ($sum1,$sumx,$sumx2)=@$cookie;return ($sumx2 / $sum1)- ($sumx / $sum1)** 2}sub long_usage {return <register_implementation('var',__PACKAGE__);App::RecordStream::Aggregator->register_implementation('variance',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','var','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','variance','VALUATION');1; Usage: var, Variance of specified fields. This is computed as Var(X) = E[(X - E[X])^2]. Variance is an indication of deviation from average value. EOF APP_RECORDSTREAM_AGGREGATOR_VARIANCE $fatpacked{"App/RecordStream/BaseRegistry.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_BASEREGISTRY'; package App::RecordStream::BaseRegistry;use App::RecordStream::Site;use strict;use warnings;use Module::Pluggable::Object;sub load_implementations {my$registry_class=shift;my$subtree=$registry_class->subtree();App::RecordStream::Site::bootstrap();my@sites=sort {$a->{'priority'}<=> $b->{'priority'}}App::RecordStream::Site::list_sites();Module::Pluggable::Object->new(require=>1,search_path=>["App::RecordStream::$subtree",map {"$_->{path}::$subtree"}@sites ],)->plugins}{my%class_registry;sub register_implementation {my$registry_class=shift;my$name=shift;my$class=shift;$class_registry{$registry_class}->{$name}=$class}sub parse_single_nameless_implementation {my$registry_class=shift;my$spec=shift;my@spec=split(/,/,$spec);if(!@spec){die "Bad " .$registry_class->typename()." spec: " .$spec ."\n"}my$aggr_name=shift(@spec);my$class=$class_registry{$registry_class}->{$aggr_name};if(!$class){die "Bad " .$registry_class->typename().": " .$aggr_name ."\n"}my$argct=$class->argct();if(!ref($argct)){$argct=[$argct]}if(!(grep {$_==@spec}@$argct)){print$class->long_usage();exit 1}return$class->new(@spec)}sub list_implementations {my$registry_class=shift;my$prefix=shift || '';my%reverse;my@classes;for my$name (sort(keys(%{$class_registry{$registry_class}}))){my$class=$class_registry{$registry_class}->{$name};my$ar=$reverse{$class};if(!$ar){$reverse{$class}=$ar=[];push@classes,$class}push @$ar,$name}my$ret="";for my$class (@classes){my$usage=$class->short_usage();$ret .= $prefix .join(", ",@{$reverse{$class}}).": " .$usage ."\n"}return$ret}sub show_implementation {my$registry_class=shift;my$name=shift;my$class=$class_registry{$registry_class}->{$name};if(!$class){print "Bad " .$registry_class->typename().": " .$name ."\n";exit 1}print$class->long_usage()}}sub subtree {my$registry_class=shift;return$registry_class->begin_sentence_typename()}sub begin_sentence_typename {my$registry_class=shift;my$t=$registry_class->typename();return uc(substr($t,0,1)).substr($t,1)}sub typename {my$registry_class=shift;die "BaseRegistry subclass $registry_class did not implement typename()"}1; APP_RECORDSTREAM_BASEREGISTRY $fatpacked{"App/RecordStream/Clumper.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER'; package App::RecordStream::Clumper;use strict;use warnings;use App::RecordStream::BaseRegistry;use base ('App::RecordStream::BaseRegistry');sub make_clumper {my$registry_class=shift;my$spec=shift;return$registry_class->parse_single_nameless_implementation($spec)}sub typename {return "clumper"}sub usage {my$ret="";$ret .= <list_implementations(" ");return$ret}1; CLUMPING: __FORMAT_TEXT__ "Clumping" defines a way of taking a stream of input records and rearranging them into to groups for consideration. The most common "consideration" for such a group of records is the application of one or more aggregators by recs-collate and the most common clumpers are those specifiable by recs-collate's normal options. However, other recs scripts can use "clumpers" and much more complex clumping is possible. A list of clumpers can be found via the --list-clumpers option on recs-collate and documentation for individual clumpers can be inspected via --show-clumper. __FORMAT_TEXT__ Examples: Group adjacent records for each host and output each such group's size. recs-collate -c keylru,host,1 -a ct Output the successive differences of the time field. recs-collate -c window,2 --dla 'time_delta=xform(recs, <<{{#1/time}} - {{#0/time}}>>)' Full list: USAGE APP_RECORDSTREAM_CLUMPER $fatpacked{"App/RecordStream/Clumper/Base.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_BASE'; package App::RecordStream::Clumper::Base;1; APP_RECORDSTREAM_CLUMPER_BASE $fatpacked{"App/RecordStream/Clumper/CubeKeyPerfect.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_CUBEKEYPERFECT'; package App::RecordStream::Clumper::CubeKeyPerfect;use strict;use warnings;use App::RecordStream::Clumper::KeyPerfect;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Clumper::KeyPerfect';sub long_usage {return <register_implementation('cubekeyperfect',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','cubekeyperfect','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','cubekey','VALUATION');1; Usage: cubekeyperfect, Clump records by the value for a key and additionally produce an "ALL" slice. EOF APP_RECORDSTREAM_CLUMPER_CUBEKEYPERFECT $fatpacked{"App/RecordStream/Clumper/Key.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_KEY'; package App::RecordStream::Clumper::Key;use strict;use warnings;use App::RecordStream::Clumper::Key::WrappedCallback;use App::RecordStream::Clumper;use base 'App::RecordStream::Clumper';sub new {my$class=shift;my$field=shift;return new_from_valuation($class,$field,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$name=shift;my$valuation=shift;my$this={'name'=>$name,'valuation'=>$valuation,};bless$this,$class;return$this}sub evaluate_record {my$this=shift;my$record=shift;my$v=$this->{'valuation'}->evaluate_record($record);if(!defined($v)){$v=""}return$v}sub clumper_begin {my$this=shift;my$bucket=shift;return [$bucket,$this->key_clumper_begin()]}sub clumper_push_record {my$this=shift;my$cookie=shift;my$record=shift;my$next=shift;my$name=$this->{'name'};my$value=$this->evaluate_record($record);my$wrapped_next=App::RecordStream::Clumper::Key::WrappedCallback->new($next,$cookie->[0],$name,$value);$this->key_clumper_push_record($cookie->[1],$value,$record,$wrapped_next)}sub clumper_end {my$this=shift;my$cookie=shift;my$next=shift;my$wrapped_next=App::RecordStream::Clumper::Key::WrappedCallback->new($next,undef);$this->key_clumper_end($cookie->[1],$wrapped_next)}sub argct {return 1}1; APP_RECORDSTREAM_CLUMPER_KEY $fatpacked{"App/RecordStream/Clumper/Key/WrappedCallback.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_KEY_WRAPPEDCALLBACK'; package App::RecordStream::Clumper::Key::WrappedCallback;use strict;use warnings;sub new {my$class=shift;my$next=shift;my$bucket=shift;my$name=shift;my$value=shift;my$this={'next'=>$next,'bucket'=>$bucket,'name'=>$name,'value'=>$value,};bless$this,$class;return$this}sub key_clumper_callback_begin {my$this=shift;$this->key_clumper_callback_begin_value($this->{'value'})}sub key_clumper_callback_begin_value {my$this=shift;my$value=shift;my$name=$this->{'name'};my$bucket=$this->{'bucket'};if(!defined($bucket)){die "clumper_callback_begin() called in bucketless position (did you call begin() in your end()?)"}return$this->{'next'}->clumper_callback_begin({%$bucket,$name=>$value})}sub key_clumper_callback_push_record {my$this=shift;my$cookie=shift;my$record=shift;$this->{'next'}->clumper_callback_push_record($cookie,$record)}sub key_clumper_callback_end {my$this=shift;my$cookie=shift;$this->{'next'}->clumper_callback_end($cookie)}1; APP_RECORDSTREAM_CLUMPER_KEY_WRAPPEDCALLBACK $fatpacked{"App/RecordStream/Clumper/KeyLRU.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_KEYLRU'; package App::RecordStream::Clumper::KeyLRU;use strict;use warnings;use App::RecordStream::Clumper::Key;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::LRUSheriff;use base 'App::RecordStream::Clumper::Key';sub new {my$class=shift;my$field=shift;my$size=shift;my$this=$class->SUPER::new($field);$this->{'size'}=$size;return$this}sub new_from_valuation {my$class=shift;my$name=shift;my$valuation=shift;my$size=shift;my$this=$class->SUPER::new_from_valuation($name,$valuation);$this->{'size'}=$size;return$this}sub long_usage {return <new()}sub key_clumper_push_record {my$this=shift;my$cookie=shift;my$value=shift;my$record=shift;my$next=shift;{my$next_cookie=$cookie->find($value);if(!defined($next_cookie)){$cookie->put($value,$next_cookie=$next->key_clumper_callback_begin())}$next->key_clumper_callback_push_record($next_cookie,$record)}for my$next_cookie ($cookie->purgenate($this->{'size'})){$next->key_clumper_callback_end($next_cookie)}}sub key_clumper_end {my$this=shift;my$cookie=shift;my$next=shift;for my$next_cookie ($cookie->purgenate(0)){$next->key_clumper_callback_end($next_cookie)}}App::RecordStream::Clumper->register_implementation('keylru',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','keylru','SCALAR','VALUATION','SCALAR');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','key','SCALAR','VALUATION','SCALAR');1; Usage: keylru,, Clump records by the value for a key, limiting number of active clumps to EOF APP_RECORDSTREAM_CLUMPER_KEYLRU $fatpacked{"App/RecordStream/Clumper/KeyPerfect.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_KEYPERFECT'; package App::RecordStream::Clumper::KeyPerfect;use strict;use warnings;use App::RecordStream::Clumper::Key;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Clumper::Key';sub long_usage {return <get_values($value)){my$next_cookie=$cookie->[1]->{$value};if(!defined($next_cookie)){push @{$cookie->[0]},$value;$next_cookie=$cookie->[1]->{$value}=$next->key_clumper_callback_begin_value($value)}$next->key_clumper_callback_push_record($next_cookie,$record)}}sub get_values {my$this=shift;my$value=shift;return ($value)}sub key_clumper_end {my$this=shift;my$cookie=shift;my$next=shift;for my$value (@{$cookie->[0]}){my$next_cookie=$cookie->[1]->{$value};$next->key_clumper_callback_end($next_cookie)}}App::RecordStream::Clumper->register_implementation('keyperfect',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','keyperfect','VALUATION');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','key','VALUATION');1; Usage: keyperfect, Clump records by the value for a key EOF APP_RECORDSTREAM_CLUMPER_KEYPERFECT $fatpacked{"App/RecordStream/Clumper/Options.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_OPTIONS'; package App::RecordStream::Clumper::Options;use strict;use warnings;use App::RecordStream::Clumper::CubeKeyPerfect;use App::RecordStream::Clumper::KeyLRU;use App::RecordStream::Clumper::KeyPerfect;use App::RecordStream::Clumper::WrappedClumperCallback;use App::RecordStream::Clumper;use App::RecordStream::DomainLanguage::Library;use App::RecordStream::DomainLanguage::Snippet;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use App::RecordStream::DomainLanguage;use App::RecordStream::KeyGroups;sub new {my$class=shift;App::RecordStream::Clumper->load_implementations();my$this={'KEY_SIZE'=>undef,'KEY_CUBE'=>0,'TBD'=>[],'HELP_LIST'=>0,'HELP_SHOW'=>0,};bless$this,$class;return$this}sub main_options {my$this=shift;my$clumpers=$this->{'TBD'};return ("key|k=s"=>sub {push @$clumpers,['KEYGROUP',$_[1]]},"dlkey|K=s"=>sub {push @$clumpers,['VALUATION',_build_dlkey($_[1])]},"size|sz|n=i"=>\($this->{'KEY_SIZE'}),"adjacent|1"=>sub {$this->{'KEY_SIZE'}=1},"cube"=>\($this->{'KEY_CUBE'}),"clumper|c=s"=>sub {push @$clumpers,['CLUMPER',App::RecordStream::Clumper->make_clumper($_[1])]},"dlclumper|C=s"=>sub {push @$clumpers,['CLUMPER',_build_dlclumper($_[1])]},)}sub help_options {my$this=shift;return ("list-clumpers"=>\($this->{'HELP_LIST'}),"show-clumper=s"=>\($this->{'HELP_SHOW'}),)}sub check_options {my$this=shift;my$clumper_callback=shift;if($this->{'HELP_LIST'}){die sub {print App::RecordStream::Clumper->list_implementations()}}if($this->{'HELP_SHOW'}){die sub {App::RecordStream::Clumper->show_implementation($this->{'HELP_SHOW'})}}$this->{'CALLBACK'}=$clumper_callback;$this->{'CALLBACK_COOKIE'}=undef}sub _build_dlkey {my$string=shift;my$name;if($string =~ s/^([^=]*)=//){$name=$1}else {die "Bad domain language key option (missing '=' to separate name and code): " .$string}return ($name,App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('VALUATION'))}sub _build_dlclumper {my$string=shift;return App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('CLUMPER')}sub _get_cb_and_cookie {my$this=shift;my$cb=$this->{'CALLBACK'};my$cookie=$this->{'CALLBACK_COOKIE'};if(!defined($cookie)){$cookie=$this->{'CALLBACK_COOKIE'}=$cb->clumper_callback_begin({})}return ($cb,$cookie)}sub accept_record {my$this=shift;my$record=shift;my$clumpers=$this->{'TBD'};while(@$clumpers){my$clumper_tuple=pop @$clumpers;my ($type,@rest)=@$clumper_tuple;my$cb=$this->{'CALLBACK'};if(0){}elsif($type eq 'KEYGROUP'){my ($group_spec)=@rest;my$key_groups=App::RecordStream::KeyGroups->new();$key_groups->add_groups($group_spec);my$keys=$key_groups->get_keyspecs($record);for my$spec (@$keys){$cb=$this->_wrap_key_cb($spec,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($spec),$cb)}}elsif($type eq 'VALUATION'){my ($name,$val)=@rest;$cb=$this->_wrap_key_cb($name,$val,$cb)}elsif($type eq 'CLUMPER'){my ($clumper)=@rest;$cb=App::RecordStream::Clumper::WrappedClumperCallback->new($clumper,$cb)}else {die "Internal error"}$this->{'CALLBACK'}=$cb}my ($cb,$cookie)=$this->_get_cb_and_cookie();$cb->clumper_callback_push_record($cookie,$record);return 1}sub _wrap_key_cb {my$this=shift;my$name=shift;my$val=shift;my$cb=shift;my$size=$this->{'KEY_SIZE'};my$cube=$this->{'KEY_CUBE'};my$clumper;if($cube){if(defined($size)){die "--cube with --size (or --adjacent) is no longer a thing (and it never made sense)"}$clumper=App::RecordStream::Clumper::CubeKeyPerfect->new_from_valuation($name,$val)}elsif(defined($size)){$clumper=App::RecordStream::Clumper::KeyLRU->new_from_valuation($name,$val,$size)}else {$clumper=App::RecordStream::Clumper::KeyPerfect->new_from_valuation($name,$val)}return App::RecordStream::Clumper::WrappedClumperCallback->new($clumper,$cb)}sub stream_done {my$this=shift;my ($cb,$cookie)=$this->_get_cb_and_cookie();$cb->clumper_callback_end($cookie)}sub main_usage {return (['key|-k ','Comma separated list of key fields. May be a key spec or key group'],['dlkey|-K ...','Specify a domain language key. See "Domain Language Integration" section in --help-more.'],['size|--sz|-n ','Number of running clumps to keep.'],['adjacent|-1','Only group together adjacent records. Avoids spooling records into memeory'],['cube','See "Cubing" section in --help-more.'],['clumper ...','Use this clumper to group records. May be specified multiple times. See --help-clumping.'],['dlclumper ...','Use this domain language clumper to group records. May be specified multiple times. See --help-clumping.'],)}sub help_usage {return (['list-clumpers','Bail and output a list of clumpers' ],['show-clumper ','Bail and output this clumper\'s detailed usage.'],)}1; APP_RECORDSTREAM_CLUMPER_OPTIONS $fatpacked{"App/RecordStream/Clumper/Window.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_WINDOW'; package App::RecordStream::Clumper::Window;use strict;use warnings;use App::RecordStream::Clumper::Base;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Clumper::Base';sub new {my$class=shift;my$size=shift;my$this={'size'=>$size,};bless$this,$class;return$this}sub long_usage {return <[0];my$window=$cookie->[1];push @$window,$record;if(@$window > $this->{'size'}){shift @$window}if(@$window >= $this->{'size'}){my$next_cookie=$next->clumper_callback_begin($bucket);for my$record (@$window){$next->clumper_callback_push_record($next_cookie,$record)}$next->clumper_callback_end($next_cookie)}}sub clumper_end {}App::RecordStream::Clumper->register_implementation('window',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new','window','SCALAR');1; Usage: window, Clump records by a rolling window of size . The windows are strict (that is partial windows are not given at the beginning or end). For example, if there are five input records (1, 2, 3, 4, 5) and the window is 3, then the first clump will be (1, 2, 3), the second (2, 3, 4) and the third and final (3, 4, 5). EOF APP_RECORDSTREAM_CLUMPER_WINDOW $fatpacked{"App/RecordStream/Clumper/WrappedClumperCallback.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_CLUMPER_WRAPPEDCLUMPERCALLBACK'; package App::RecordStream::Clumper::WrappedClumperCallback;use strict;use warnings;use App::RecordStream::Aggregator;use App::RecordStream::Record;sub new {my$class=shift;my$clumper=shift;my$next=shift;my$this={'CLUMPER'=>$clumper,'NEXT'=>$next,};bless$this,$class;return$this}sub clumper_callback_begin {my$this=shift;my$bucket=shift;return$this->{'CLUMPER'}->clumper_begin($bucket)}sub clumper_callback_push_record {my$this=shift;my$cookie=shift;my$record=shift;return$this->{'CLUMPER'}->clumper_push_record($cookie,$record,$this->{'NEXT'})}sub clumper_callback_end {my$this=shift;my$cookie=shift;return$this->{'CLUMPER'}->clumper_end($cookie,$this->{'NEXT'})}1; APP_RECORDSTREAM_CLUMPER_WRAPPEDCLUMPERCALLBACK $fatpacked{"App/RecordStream/DBHandle.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DBHANDLE'; package App::RecordStream::DBHandle;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::OptionalRequire 'DBI';BEGIN {App::RecordStream::OptionalRequire::require_done()}use Data::Dumper;use Getopt::Long;my$MODES={'sqlite'=>{'dbfile'=>['=s','testDb','Local file for database'],},'mysql'=>{'host'=>['=s',undef,'Mysql Host'],'dbname'=>['=s',undef,'Database to connect to'],},'oracle'=>{'db'=>['=s',undef,'Database name (tnsname) to connect to'],},'pg'=>{'host'=>['=s',undef,'Hostname to connect to'],'db'=>['=s',undef,'Database to connect to'],},'main'=>{'type'=>['=s','sqlite','Type of database to connect to'],'user'=>['=s','','User to connect as'],'password'=>['=s','','Password to connect as'],},};my$DESCRIPTIONS={'sqlite'=>'A simple local file based db','mysql'=>'Connect to a remote mysql database','oracle'=>'Connect to a remote Oracle database','pg'=>'Connect to a remote PostgreSQL database',};my$DISPATCH_TABLE={'sqlite'=>\&sqlite_dbh,'mysql'=>\&mysql_dbh,'oracle'=>\&oracle_dbh,'pg'=>\&pg_dbh,};sub get_dbh {my$args=shift;my$options={};parse_options($options,'main',$args);my$type=$options->{'type'};parse_options($options,$type,$args);return$DISPATCH_TABLE->{$type}->($options)}sub parse_options {my$options=shift;my$mode=shift;my$args=shift || \@ARGV;my$spec=get_option_spec($mode,$options);local@ARGV=@$args;my$saved_settings=Getopt::Long::Configure();Getopt::Long::Configure("pass_through");GetOptions(%$spec);Getopt::Long::Configure($saved_settings);set_defaults($mode,$options);@$args=@ARGV}sub set_defaults {my$mode=shift;my$opts=shift;my$options=$MODES->{$mode};for my$opt (keys %$options){my$default=@{$options->{$opt}}[1];if ((not defined$default)&& (!$opts->{$opt})){die "Must define $opt for type $mode"}$opts->{$opt}=$default unless (exists$opts->{$opt})}}sub get_option_spec {my$mode=shift;my$opts=shift;my$options=$MODES->{$mode};my%spec;for my$opt (keys %$options){my ($modifier)=@{$options->{$opt}};$spec{$opt .$modifier}=sub {add_opt($opts,@_)}}return \%spec}sub mysql_dbh {my$args=shift;my$database=$args->{'dbname'};my$host=$args->{'host'};my$user=$args->{'user'};my$password=$args->{'password'};my$dbh=DBI->connect("DBI:mysql:database=$database;host=$host",$user,$password,{RaiseError=>1,PrintError=>0 });return$dbh}sub sqlite_dbh {my$args=shift;my$db_file=$args->{'dbfile'};my$user=$args->{'user'};my$password=$args->{'password'};my$dbh=DBI->connect("dbi:SQLite:dbname=$db_file",$user,$password,{RaiseError=>1,PrintError=>0 });return$dbh}sub oracle_dbh {my$args=shift;my$user=$args->{'user'};my$password=$args->{'password'};my$database=$args->{'db'};my$dbh=DBI->connect("dbi:Oracle:$database",$user,$password,{RaiseError=>1,PrintError=>1 });return$dbh}sub pg_dbh {my$args=shift;my$database=$args->{'db'};my$host=$args->{'host'};my$user=$args->{'user'};my$password=$args->{'password'};my$dbh=DBI->connect("DBI:Pg:database=$database;host=$host",$user,$password,{RaiseError=>1,PrintError=>0 });return$dbh}sub add_opt {my$options=shift;my$arg_name=shift;my$value=shift;$options->{$arg_name}=$value}sub usage {my$usage='';$usage .= "Database Options\n";$usage .= type_usage('main');$usage .= "Datbase types:\n";for my$type (sort keys %$DESCRIPTIONS){my$description=$DESCRIPTIONS->{$type};$usage .= " $type - $description\n"}$usage .= "\n";for my$type (sort keys %$MODES){next if ($type eq 'main');$usage .= "Database Options for type: $type\n";$usage .= type_usage($type)}return$usage}sub type_usage {my$type=shift;my$print_mode=shift;my$usage='';$usage .= " Usage for --type $type\n" if ($print_mode);my$options=$MODES->{$type};for my$name (sort keys %$options){my$description=@{$options->{$name}}[2];my$default=@{$options->{$name}}[1];$usage .= " $name - $description";$usage .= " - Default: $default" if (defined$default && $default ne '');$usage .= "\n"}$usage .= "\n";return$usage}1; APP_RECORDSTREAM_DBHANDLE $fatpacked{"App/RecordStream/Deaggregator.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR'; package App::RecordStream::Deaggregator;use strict;use warnings;use App::RecordStream::BaseRegistry;use base ('App::RecordStream::BaseRegistry');sub make_deaggregator {my$registry_class=shift;my$spec=shift;return$registry_class->parse_single_nameless_implementation($spec)}sub typename {return "deaggregator"}1; APP_RECORDSTREAM_DEAGGREGATOR $fatpacked{"App/RecordStream/Deaggregator/Base.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR_BASE'; package App::RecordStream::Deaggregator::Base;1; APP_RECORDSTREAM_DEAGGREGATOR_BASE $fatpacked{"App/RecordStream/Deaggregator/Field.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR_FIELD'; package App::RecordStream::Deaggregator::Field;use strict;use warnings;use App::RecordStream::Deaggregator::Base;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use base 'App::RecordStream::Deaggregator::Base';sub new {my$class=shift;my$field=shift;return new_from_valuation($class,App::RecordStream::DomainLanguage::Valuation::KeySpec->new($field))}sub new_from_valuation {my$class=shift;my$valuation=shift;my$this={'valuation'=>$valuation,};bless$this,$class;return$this}sub deaggregate {my$this=shift;my$record=shift;my$valuation=$this->{'valuation'};my$value=$valuation->evaluate_record($record);return$this->deaggregate_field($value)}1; APP_RECORDSTREAM_DEAGGREGATOR_FIELD $fatpacked{"App/RecordStream/Deaggregator/Split.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR_SPLIT'; package App::RecordStream::Deaggregator::Split;use strict;use warnings;use App::RecordStream::Deaggregator::Field;use App::RecordStream::Deaggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Deaggregator::Field';sub new {my$class=shift;my$old_field=shift;my$delim=shift;my$new_field=shift;my$this=$class->SUPER::new($old_field);$this->{'delim'}=make_delim($delim);$this->{'new_field'}=$new_field;return$this}sub new_from_valuation {my$class=shift;my$valuation=shift;my$delim=shift;my$new_field=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'delim'}=$delim;$this->{'new_field'}=$new_field;return$this}sub make_delim {my$delim=shift;if($delim =~ /^\/(.*)\/$/){return qr/$1/}elsif($delim =~ /^\/(.*)\/i$/){return qr/$1/i}else {return qr/\Q$delim\E/}}sub deaggregate_field {my$this=shift;my$values=shift;my@ret;for my$value (split($this->{'delim'},$values,-1)){push@ret,{$this->{'new_field'}=>$value}}return \@ret}sub long_usage {return <register_implementation('split',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','split','VALUATION','SCALAR','SCALAR');1; Usage: split,,, Split the old field to create a new one. EOF APP_RECORDSTREAM_DEAGGREGATOR_SPLIT $fatpacked{"App/RecordStream/Deaggregator/Unarray.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR_UNARRAY'; package App::RecordStream::Deaggregator::Unarray;use strict;use warnings;use App::RecordStream::Deaggregator::Field;use App::RecordStream::Deaggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Deaggregator::Field';sub new {my$class=shift;my$old_field=shift;my$new_field=shift;my$this=$class->SUPER::new($old_field);$this->{'new_field'}=$new_field;return$this}sub new_from_valuation {my$class=shift;my$valuation=shift;my$new_field=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'new_field'}=$new_field;return$this}sub deaggregate_field {my$this=shift;my$values=shift;my@ret;for my$value (@$values){push@ret,{$this->{'new_field'}=>$value}}return \@ret}sub long_usage {return <register_implementation('unarray',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','unarray','VALUATION','SCALAR');1; Usage: unarray,, Split the array into individual \"element\" records EOF APP_RECORDSTREAM_DEAGGREGATOR_UNARRAY $fatpacked{"App/RecordStream/Deaggregator/Unhash.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DEAGGREGATOR_UNHASH'; package App::RecordStream::Deaggregator::Unhash;use strict;use warnings;use App::RecordStream::Deaggregator::Field;use App::RecordStream::Deaggregator;use App::RecordStream::DomainLanguage::Registry;use base 'App::RecordStream::Deaggregator::Field';sub new {my$class=shift;my$old_field=shift;my$new_key_field=shift;my$new_value_field=shift;my$this=$class->SUPER::new($old_field);$this->{'new_key_field'}=$new_key_field;$this->{'new_value_field'}=$new_value_field;return$this}sub new_from_valuation {my$class=shift;my$valuation=shift;my$new_key_field=shift;my$new_value_field=shift;my$this=$class->SUPER::new_from_valuation($valuation);$this->{'new_key_field'}=$new_key_field;$this->{'new_value_field'}=$new_value_field;return$this}sub deaggregate_field {my$this=shift;my$hashref=shift;my@ret;for my$key (sort(keys(%$hashref))){my$record={};$record->{$this->{'new_key_field'}}=$key;if(defined($this->{'new_value_field'})){$record->{$this->{'new_value_field'}}=$hashref->{$key}}push@ret,$record}return \@ret}sub long_usage {return <register_implementation('unhash',__PACKAGE__);App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','unhash','VALUATION','SCALAR');App::RecordStream::DomainLanguage::Registry::register_vfn(__PACKAGE__,'new_from_valuation','unhash','VALUATION','SCALAR','SCALAR');1; Usage: unhash,,[,] Split the hash into key/value \"pair\" records EOF APP_RECORDSTREAM_DEAGGREGATOR_UNHASH $fatpacked{"App/RecordStream/DomainLanguage.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE'; package App::RecordStream::DomainLanguage;use strict;use warnings;sub usage {return "DOMAIN LANGUAGE\n" .short_usage()._long_usage()}sub short_usage {return < argument below, a string scalar is expected, however quoting these can get fairly difficult and they can be confused with non- scalars. Example: __FORMAT_TEXT__ --dla "silly= uconcat(',', snip('{{x}} * 2'))" __FORMAT_TEXT__ To remedy this, one may use <> to inline a snippet which will be immediately understood by the typing mechanism as being code. Escaping inside this is as single quotes in Perl. Example With <> __FORMAT_TEXT__ --dla 'silly= uconcat(",", <<{{x}} * 2>>)' __FORMAT_TEXT__ Furthermore one may mark variables to be propagated in by prefixing CODE like <>: __FORMAT_TEXT__ --dla 'silly= \$f=2; uconcat(",", <>)' Function Library ii_agg(, [, ]) ii_aggregator(, [, ]) inject_into_agg(, [, ]) inject_into_aggregator(, [, ]) __FORMAT_TEXT__ Take an initial snippet, a combine snippet, and an optional squish snippet to produce an ad-hoc aggregator based on inject into. The initial snippet produces the aggregate value for an empty collection, then combine takes \$a representing the aggregate value so far and \$r representing the next record to add and returns the new aggregate value. Finally, the squish snippet takes \$a representing the final aggregate value so far and produces the final answer for the aggregator. Example(s): __FORMAT_TEXT__ Track count and sum to produce average: ii_agg(<<[0, 0]>>, <<[\$a->[0] + 1, \$a->[1] + {{ct}}]>>, <<\$a->[1] / \$a->[0]>>) for_field(qr/.../, ) __FORMAT_TEXT__ Takes a regex and a snippet of code. Creates an aggregator that creates a map. Keys in the map correspond to fields chosen by matching the regex against the fields from input records. Values in the map are produced by aggregators which the snippet must act as a factory for (\$f is the field). Example(s): __FORMAT_TEXT__ To aggregate the sums of all the fields beginning with "t" for_field(qr/^t/, <>) for_field(qr/.../, qr/.../, ) __FORMAT_TEXT__ Takes two regexes and a snippet of code. Creates an aggregator that creates a map. Keys in the map correspond to pairs of fields chosen by matching the regexes against the fields from input records. Values in the map are produced by aggregators which the snippet must act as a factory for (\$f1 is the first field, \$f2 is the second field). Example(s): __FORMAT_TEXT__ To find the covariance of all x-named fields with all y-named fields: for_field(qr/^x/, qr/^y/, <>) map_reduce_agg(, [, ]) map_reduce_aggregator(, [, ]) mr_agg(, [, ]) mr_aggregator(, [, ]) __FORMAT_TEXT__ Take a map snippet, a reduce snippet, and an optional squish snippet to produce an ad-hoc aggregator based on map reduce. The map snippet takes \$r representing a record and returns its mapped value. The reduce snippet takes \$a and \$b representing two mapped values and combines them. Finally, the squish snippet takes a mapped value \$a representing all the records and produces the final answer for the aggregator. Example(s): __FORMAT_TEXT__ Track count and sum to produce average: mr_agg(<<[1, {{ct}}]>>, <<[\$a->[0] + \$b->[0], \$a->[1] + \$b->[1]]>>, <<\$a->[1] / \$a->[0]>>) rec() record() __FORMAT_TEXT__ A valuation that just returns the entire record. __FORMAT_TEXT__ snip(snip) __FORMAT_TEXT__ Takes a snippet and returns both the snippet and the snippet as a valuation. Used to distinguished snippets from scalars in cases where it matters, e.g. min('{{x}}') interprets it is a keyspec when it was meant to be a snippet (and then a valuation), min(snip('{{x}}')) does what is intended. This is used internally by <<...>> and in fact <<...>> just translates to snip('...'). __FORMAT_TEXT__ subset_agg(, ) subset_aggregator(, ) __FORMAT_TEXT__ Takes a snippate to act as a record predicate and an aggregator and produces an aggregator that acts as the provided aggregator as run on the filtered view. Example(s): __FORMAT_TEXT__ An aggregator that counts the number of records with a time not above 6 seconds: subset_agg(<<{{time_ms}} <= 6000>>, ct()) type_agg(obj) type_scalar(obj) type_val(obj) __FORMAT_TEXT__ Force the object into a specific type. Can be used to force certain upconversions (or avoid them). __FORMAT_TEXT__ valuation(sub { ... }) val(sub { ... }) __FORMAT_TEXT__ Takes a subref, creates a valuation that represents it. The subref will get the record as its first and only argument. __FORMAT_TEXT__ Example(s): To get the square of the "x" field: val(sub{ \$[0]->{x} ** 2 }) xform(, ) __FORMAT_TEXT__ Takes an aggregator and a snippet and produces an aggregator the represents invoking the snippet on the aggregator's result. __FORMAT_TEXT__ Example(s): To take the difference between the first and second time fields of the record collection: xform(recs(), <<{{1/time}} - {{0/time}}>>) HELP APP_RECORDSTREAM_DOMAINLANGUAGE $fatpacked{"App/RecordStream/DomainLanguage/Executor.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_EXECUTOR'; package App::RecordStream::DomainLanguage::Executor;use strict;use warnings;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::Executor;my$next_id=0;sub new {my$class=shift;my$id=$next_id++;my$this={'ID'=>$id,};bless$this,$class;return$this}sub clear_vars {my$this=shift;my$id=$this->{'ID'};{no strict;no warnings;%{__PACKAGE__ ."::Sandbox" .$id ."::"}=()}}sub set_scalar {my$this=shift;my$var=shift;my$val=shift;my$id=$this->{'ID'};{no strict;no warnings;*{__PACKAGE__ ."::Sandbox" .$id ."::" .$var}=\$val}}sub get_scalar {my$this=shift;my$var=shift;my$id=$this->{'ID'};{no strict;no warnings;return ${__PACKAGE__ ."::Sandbox" .$id ."::" .$var}}}sub set_ref {my$this=shift;my$var=shift;my$ref=shift;my$id=$this->{'ID'};{no strict;no warnings;*{__PACKAGE__ ."::Sandbox" .$id ."::" .$var}=$ref}}sub import_registry {my$this=shift;for my$token (App::RecordStream::DomainLanguage::Registry::get_tokens()){my$subref=sub {my$value=App::RecordStream::DomainLanguage::Registry::evaluate($token,@_);my@pp=$value->get_possible_pairs();if(!@_ &&!@pp){$value->add_possibility('SCALAR',$token)}return$value};$this->set_ref($token,$subref);$this->set_ref("_$token",$subref)}}sub exec {my$__MY__this=shift;my$__MY__code=shift;$__MY__code=App::RecordStream::Executor->transform_code($__MY__code);my$__MY__id=$__MY__this->{'ID'};my$__MY__code_packaged="package " .__PACKAGE__ ."::Sandbox$__MY__id; $__MY__code";my$__MY__ret;{no strict;no warnings;$__MY__ret=eval$__MY__code_packaged;if($@){die $@}}return$__MY__ret}1; APP_RECORDSTREAM_DOMAINLANGUAGE_EXECUTOR $fatpacked{"App/RecordStream/DomainLanguage/Library.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_LIBRARY'; package App::RecordStream::DomainLanguage::Library;use strict;use warnings;use App::RecordStream::Aggregator::InjectInto::Subrefs;use App::RecordStream::DomainLanguage::Registry;use App::RecordStream::DomainLanguage::Snippet;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use App::RecordStream::DomainLanguage::Valuation::Sub;use App::RecordStream::DomainLanguage::Value;sub _identity {return $_[0]}App::RecordStream::DomainLanguage::Registry::register_fn(\&_identity,'type_agg','AGGREGATOR');App::RecordStream::DomainLanguage::Registry::register_fn(\&_identity,'type_valuation','VALUATION');App::RecordStream::DomainLanguage::Registry::register_fn(\&_identity,'type_scalar','SCALAR');sub _snippet_upgrade {my$snippet=shift;my$ret=App::RecordStream::DomainLanguage::Value->new();$ret->add_possibility('VALUATION',App::RecordStream::DomainLanguage::Valuation::Sub->new(sub {return$snippet->evaluate_as('SCALAR',{'$r'=>$_[0]})}));$ret->add_possibility('SNIPPET',$snippet);return$ret}App::RecordStream::DomainLanguage::Registry::register_fn(\&_snippet_upgrade,'snip','SNIPPET');sub _rec_valuation {return App::RecordStream::DomainLanguage::Valuation::Sub->new(sub {return $_[0]})}App::RecordStream::DomainLanguage::Registry::register_fn(\&_rec_valuation,'record');App::RecordStream::DomainLanguage::Registry::register_fn(\&_rec_valuation,'rec');sub _raw_valuation {my$v=shift;if(ref($v)eq "CODE"){return App::RecordStream::DomainLanguage::Valuation::Sub->new($v)}return App::RecordStream::DomainLanguage::Valuation::KeySpec->new($v)}App::RecordStream::DomainLanguage::Registry::register_fn(\&_raw_valuation,'valuation','SCALAR');App::RecordStream::DomainLanguage::Registry::register_fn(\&_raw_valuation,'val','SCALAR');sub inject_into_aggregator {my$initial_snippet=shift;my$combine_snippet=shift;my$squish_snippet=shift || App::RecordStream::DomainLanguage::Snippet->new('$a');my$initial_sub=sub {return$initial_snippet->evaluate_as('SCALAR')};my$combine_sub=sub {my$cookie=shift;my$record=shift;return$combine_snippet->evaluate_as('SCALAR',{'$a'=>$cookie,'$r'=>$record})};my$squish_sub=sub {my$cookie=shift;return$squish_snippet->evaluate_as('SCALAR',{'$a'=>$cookie})};return App::RecordStream::Aggregator::InjectInto::Subrefs->new($initial_sub,$combine_sub,$squish_sub)}for my$ii_name ('ii','inject_into'){for my$agg_name ('agg','aggregator'){App::RecordStream::DomainLanguage::Registry::register_fn(\&inject_into_aggregator,$ii_name .'_' .$agg_name,'SNIPPET','SNIPPET');App::RecordStream::DomainLanguage::Registry::register_fn(\&inject_into_aggregator,$ii_name .'_' .$agg_name,'SNIPPET','SNIPPET','SNIPPET')}}sub map_reduce_aggregator {my$map_snippet=shift;my$reduce_snippet=shift;my$squish_snippet=shift || App::RecordStream::DomainLanguage::Snippet->new('$a');my$map_sub=sub {my$record=shift;return$map_snippet->evaluate_as('SCALAR',{'$r'=>$record})};my$reduce_sub=sub {my$cookie1=shift;my$cookie2=shift;return$reduce_snippet->evaluate_as('SCALAR',{'$a'=>$cookie1,'$b'=>$cookie2})};my$squish_sub=sub {my$cookie=shift;return$squish_snippet->evaluate_as('SCALAR',{'$a'=>$cookie})};return App::RecordStream::Aggregator::MapReduce::Subrefs->new($map_sub,$reduce_sub,$squish_sub)}for my$mr_name ('mr','map_reduce'){for my$agg_name ('agg','aggregator'){App::RecordStream::DomainLanguage::Registry::register_fn(\&map_reduce_aggregator,$mr_name .'_' .$agg_name,'SNIPPET','SNIPPET');App::RecordStream::DomainLanguage::Registry::register_fn(\&map_reduce_aggregator,$mr_name .'_' .$agg_name,'SNIPPET','SNIPPET','SNIPPET')}}sub _subset_agg {my$match_snippet=shift;my$aggregator=shift;my$initial_sub=sub {return$aggregator->initial()};my$combine_sub=sub {my$cookie=shift;my$record=shift;if($match_snippet->evaluate_as('SCALAR',{'$r'=>$record})){$cookie=$aggregator->combine($cookie,$record)}return$cookie};my$squish_sub=sub {my$cookie=shift;return$aggregator->squish($cookie)};return App::RecordStream::Aggregator::InjectInto::Subrefs->new($initial_sub,$combine_sub,$squish_sub)}App::RecordStream::DomainLanguage::Registry::register_fn(\&_subset_agg,'subset_aggregator','SNIPPET','AGGREGATOR');App::RecordStream::DomainLanguage::Registry::register_fn(\&_subset_agg,'subset_agg','SNIPPET','AGGREGATOR');sub _xform_agg {my$aggregator=shift;my$snippet=shift;my$initial_sub=sub {return$aggregator->initial()};my$combine_sub=sub {my$cookie=shift;my$record=shift;return$aggregator->combine($cookie,$record)};my$squish_sub=sub {my$cookie=shift;my$result=$aggregator->squish($cookie);return$snippet->evaluate_as('SCALAR',{'$r'=>$result})};return App::RecordStream::Aggregator::InjectInto::Subrefs->new($initial_sub,$combine_sub,$squish_sub)}App::RecordStream::DomainLanguage::Registry::register_fn(\&_xform_agg,'xform','AGGREGATOR','SNIPPET');1; APP_RECORDSTREAM_DOMAINLANGUAGE_LIBRARY $fatpacked{"App/RecordStream/DomainLanguage/Registry.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_REGISTRY'; package App::RecordStream::DomainLanguage::Registry;use strict;use warnings;use App::RecordStream::DomainLanguage::Value;use Scalar::Util ('blessed');my$registry={};sub new_node {return {'SUBREF'=>undef,'EPSILONS'=>{},'NORMALS'=>{},'REPEATABLE'=>undef,}}sub register_ctor {my$pkg=shift;my$token=shift;my@types=@_;return register_fn(sub {return$pkg->new(@_)},$token,@types)}sub register_vfn {my$tgt=shift;my$meth=shift;my$token=shift;my@types=@_;return register_fn(sub {return$tgt->$meth(@_)},$token,@types)}sub register_fn {my$subref=shift;my$token=shift;my@types=@_;my$p=($registry->{$token}||= new_node());for my$type (@types){if($type =~ /^(.*)\*$/){my$raw_type=$1;$p=($p->{'EPSILONS'}->{$raw_type}||= new_node());$p->{'REPEATABLE'}=$raw_type}else {$p=($p->{'NORMALS'}->{$type}||= new_node())}}if($p->{'SUBREF'}){die "Collision in type registry at $token(" .join(", ",@types).")"}$p->{'SUBREF'}=$subref}sub get_tokens {return keys(%$registry)}sub evaluate {my$token=shift;my@raw_args=@_;my@value_args;for my$arg (@raw_args){if(blessed($arg)&& $arg->isa('App::RecordStream::DomainLanguage::Value')){push@value_args,$arg;next}my$value=App::RecordStream::DomainLanguage::Value->new("");my$done=0;if(blessed($arg)&& $arg->isa('App::RecordStream::DomainLanguage::Valuation')){$value->add_possibility('VALUATION',$arg);$done=1}if(blessed($arg)&& $arg->isa('App::RecordStream::Aggregator::Aggregation')){$value->add_possibility('AGGREGATOR',$arg);$done=1}if(blessed($arg)&& $arg->isa('App::RecordStream::Deaggregator::Base')){$value->add_possibility('DEAGGREGATOR',$arg);$done=1}if(blessed($arg)&& $arg->isa('App::RecordStream::Clumper::Base')){$value->add_possibility('CLUMPER',$arg);$done=1}if($done){push@value_args,$value;next}push@value_args,App::RecordStream::DomainLanguage::Value->new_from_scalar($arg)}my@results;evaluate_aux(\@results,($registry->{$token}|| {}),[],@value_args);my$ret=App::RecordStream::DomainLanguage::Value->new($token ."(" .join(", ",map {$_->get_description()}@value_args).")");for my$result (@results){if(blessed($result)&& $result->isa('App::RecordStream::DomainLanguage::Value')){for my$pair ($result->get_possible_pairs()){my ($type,$value)=@$pair;$ret->add_possibility($type,$value)}next}my$done=0;if(blessed($result)&& $result->isa('App::RecordStream::DomainLanguage::Valuation')){$ret->add_possibility('VALUATION',$result);$done=1}if(blessed($result)&& $result->isa('App::RecordStream::Aggregator::Aggregation')){$ret->add_possibility('AGGREGATOR',$result);$done=1}if(blessed($result)&& $result->isa('App::RecordStream::Deaggregator::Base')){$ret->add_possibility('DEAGGREGATOR',$result);$done=1}if(blessed($result)&& $result->isa('App::RecordStream::Clumper::Base')){$ret->add_possibility('CLUMPER',$result);$done=1}if($done){next}$ret->add_possibility('SCALAR',$result)}return$ret}sub evaluate_aux {my$results_ref=shift;my$registry_pos=shift;my$built_args=shift;my@values_left=@_;for my$type (keys(%{$registry_pos->{'EPSILONS'}})){evaluate_aux($results_ref,$registry_pos->{'EPSILONS'}->{$type},$built_args,@values_left)}if(!@values_left){my$subref=$registry_pos->{'SUBREF'};if($subref){push @$results_ref,$subref->(@$built_args)}return}my$next_value=shift@values_left;my$repeatable_type=$registry_pos->{'REPEATABLE'};if($repeatable_type){for my$arg ($next_value->get_possibilities($repeatable_type)){push @$built_args,$arg;evaluate_aux($results_ref,$registry_pos,$built_args,@values_left);pop @$built_args}}for my$type (keys(%{$registry_pos->{'NORMALS'}})){my$registry_pos_next=$registry_pos->{'NORMALS'}->{$type};for my$arg ($next_value->get_possibilities($type)){push @$built_args,$arg;evaluate_aux($results_ref,$registry_pos_next,$built_args,@values_left);pop @$built_args}}}1; APP_RECORDSTREAM_DOMAINLANGUAGE_REGISTRY $fatpacked{"App/RecordStream/DomainLanguage/Snippet.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_SNIPPET'; package App::RecordStream::DomainLanguage::Snippet;use strict;use warnings;use App::RecordStream::DomainLanguage::Executor;use App::RecordStream::DomainLanguage::Value;use App::RecordStream::Executor;sub new {my$class=shift;my$code=shift;my$vars=shift;$code=App::RecordStream::Executor->transform_code($code);$code=_transform_angles($code);my$this={'CODE'=>$code,'VARS'=>$vars,};bless$this,$class;return$this}sub evaluate_as {my$this=shift;my$type=shift;my$vars=shift || {};my$executor=App::RecordStream::DomainLanguage::Executor->new();$executor->import_registry();for my$var (%{$this->{'VARS'}}){for my$ref (@{$this->{'VARS'}->{$var}}){$executor->set_ref($var,$ref)}}for my$var (keys(%$vars)){if(0){}elsif($var =~ /^\$(.*)$/){$executor->set_scalar($1,$vars->{$var})}else {die "Bad var for snippet: '$var'"}}my$result=$executor->exec($this->{'CODE'});return App::RecordStream::DomainLanguage::Value::cast_or_die($type,$result)}sub _transform_angles {my$code=shift;my$pos=0;my$out='';while(1){my$top_level_entrance=index($code,'<<',$pos);if($top_level_entrance==-1){$out .= substr($code,$pos);last}my$level=1;my$pos2=$top_level_entrance + 2;my$top_level_exit;while(1){my$next_enter=index($code,'<<',$pos2);my$next_exit=index($code,'>>',$pos2);if($next_enter!=-1 && ($next_exit==-1 || $next_enter < $next_exit)){++$level;$pos2=$next_enter + 2;next}if($next_exit!=-1 && ($next_enter==-1 || $next_exit < $next_enter)){--$level;if($level==0){$top_level_exit=$next_exit;last}$pos2=$next_enter + 2;next}die "Unbalanced << and >> in snippet: $code"}$out .= substr($code,$pos,$top_level_entrance - $pos);$out .= _quote_snippet(substr($code,$top_level_entrance + 2,$top_level_exit - $top_level_entrance - 2));$pos=$top_level_exit + 2}return$out}sub _quote_snippet {my$code=shift;my@vars;if($code =~ s/^([a-zA-Z_][a-zA-Z_0-9]*(,[a-zA-Z_][a-zA-Z_0-9]*)*)\|//){@vars=split(/,/,$1)}return "snip(App::RecordStream::DomainLanguage::Snippet->new('$code', {" .join(", ",map {"'$_' => [\\\$$_, \\\@$_, \\\%$_]"}@vars)."}))"}1; APP_RECORDSTREAM_DOMAINLANGUAGE_SNIPPET $fatpacked{"App/RecordStream/DomainLanguage/Valuation.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION'; package App::RecordStream::DomainLanguage::Valuation;use strict;use warnings;1; APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION $fatpacked{"App/RecordStream/DomainLanguage/Valuation/KeySpec.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION_KEYSPEC'; package App::RecordStream::DomainLanguage::Valuation::KeySpec;use strict;use warnings;use App::RecordStream::DomainLanguage::Valuation;use base ('App::RecordStream::DomainLanguage::Valuation');sub new {my$class=shift;my$keyspec=shift;my$this={'KEYSPEC'=>$keyspec,};bless$this,$class;return$this}sub evaluate_record {my$this=shift;my$r=shift;my$keyspec=$this->{'KEYSPEC'};return ${$r->guess_key_from_spec($keyspec)}}1; APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION_KEYSPEC $fatpacked{"App/RecordStream/DomainLanguage/Valuation/Sub.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION_SUB'; package App::RecordStream::DomainLanguage::Valuation::Sub;use strict;use warnings;use App::RecordStream::DomainLanguage::Valuation;use base ('App::RecordStream::DomainLanguage::Valuation');sub new {my$class=shift;my$subref=shift;my$this={'SUBREF'=>$subref,};bless$this,$class;return$this}sub evaluate_record {my$this=shift;my$r=shift;my$subref=$this->{'SUBREF'};return$subref->($r)}1; APP_RECORDSTREAM_DOMAINLANGUAGE_VALUATION_SUB $fatpacked{"App/RecordStream/DomainLanguage/Value.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_DOMAINLANGUAGE_VALUE'; package App::RecordStream::DomainLanguage::Value;use strict;use warnings;use App::RecordStream::Aggregator::Internal::Constant;use App::RecordStream::DomainLanguage::Snippet;use App::RecordStream::DomainLanguage::Valuation::KeySpec;use App::RecordStream::DomainLanguage::Valuation::Sub;use Scalar::Util ('blessed');sub new {my$class=shift;my$desc=shift;my$this={'DESCRIPTION'=>$desc,'POSSIBILITIES'=>{},};bless$this,$class;return$this}sub new_from_scalar {my$class=shift;my$value=shift;my$this=$class->new("scalar($value)");$this->add_possibility('SCALAR',$value);return$this}sub add_possibility {my$this=shift;my$type=shift;my$value=shift;if($type eq 'SCALAR'){if(blessed($value)&& $value->isa('App::RecordStream::DomainLanguage::Snippet')){return$this->add_possibility('SNIPPET',$value)}}push @{$this->{'POSSIBILITIES'}->{$type}||= []},$value;if($type eq "SCALAR"){$this->add_possibility('VALUATION',App::RecordStream::DomainLanguage::Valuation::KeySpec->new($value));$this->add_possibility('SNIPPET',App::RecordStream::DomainLanguage::Snippet->new($value))}}sub get_possibilities {my$this=shift;my$type=shift;return @{$this->{'POSSIBILITIES'}->{$type}|| []}}sub get_possible_pairs {my$this=shift;my@ret;for my$type (keys(%{$this->{'POSSIBILITIES'}})){for my$value (@{$this->{'POSSIBILITIES'}->{$type}}){push@ret,[$type,$value]}}return@ret}sub get_description {my$this=shift;return$this->{'DESCRIPTION'}}sub _force {my$this=shift;my$type=shift;my$ar=$this->{'POSSIBILITIES'}->{$type}|| [];my$ct=@$ar;if($ct!=1){die "Cannot use '" .$this->{'DESCRIPTION'}."' as $type, $ct possibilities"}return$ar->[0]}sub initial {return shift->_force('AGGREGATOR')->initial(@_)}sub combine {return shift->_force('AGGREGATOR')->combine(@_)}sub squish {return shift->_force('AGGREGATOR')->squish(@_)}sub deaggregate {return shift->_force('DEAGGREGATOR')->deaggregate(@_)}sub clumper_begin {return shift->_force('CLUMPER')->clumper_begin(@_)}sub clumper_push_record {return shift->_force('CLUMPER')->clumper_push_record(@_)}sub clumper_end {return shift->_force('CLUMPER')->clumper_end(@_)}sub evaluate_record {return shift->_force('VALUATION')->evaluate_record(@_)}sub cast_or_die {my$type=shift;my$obj=shift;if($type eq 'AGGREGATOR'){return cast_agg_or_die($obj)}elsif($type eq 'DEAGGREGATOR'){return cast_deagg_or_die($obj)}elsif($type eq 'CLUMPER'){return cast_clumper_or_die($obj)}elsif($type eq 'VALUATION'){return cast_valuation_or_die($obj)}elsif($type eq 'SCALAR'){return cast_scalar_or_die($obj)}else {die "Bad type $type?"}}sub cast_valuation_or_die {my$obj=shift;if(ref($obj)&& ref($obj)eq "CODE"){return App::RecordStream::DomainLanguage::Valuation::Sub->new($obj)}if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Value')){my@val=$obj->get_possibilities('VALUATION');if(@val > 1){die "Multiple valuations for " .$obj->get_description()}if(@val==1){return$val[0]}}if(blessed($obj)&& $obj->isa('App::RecordStream::Aggregator::Aggregation')){die "Aggregation found where valuation expected"}if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Valuation')){return$obj}die "Unknown found where valuation expected"}sub cast_agg_or_die {my$obj=shift;if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Value')){my@agg=$obj->get_possibilities('AGGREGATOR');if(@agg > 1){die "Multiple aggregators for " .$obj->get_description()}if(@agg==1){return$agg[0]}my@scalar=$obj->get_possibilities('SCALAR');if(@scalar > 1){die "No aggregators and multiple scalars for " .$obj->get_description()}if(@scalar==1){$obj=$scalar[0]}else {die "No usable possibilities for " .$obj->get_description()}}if(blessed($obj)&& $obj->isa('App::RecordStream::Aggregator::Aggregation')){return$obj}if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Valuation')){die "Valuation found where aggregator expected"}return App::RecordStream::Aggregator::Internal::Constant->new($obj)}sub cast_deagg_or_die {my$obj=shift;if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Value')){my@deagg=$obj->get_possibilities('DEAGGREGATOR');if(@deagg > 1){die "Multiple deaggregators for " .$obj->get_description()}if(@deagg==1){return$deagg[0]}die "No usable possibilities for " .$obj->get_description()}if(blessed($obj)&& $obj->isa('App::RecordStream::Deaggregator::Base')){return$obj}my$s="unknown";if(blessed($obj)){$s=ref($obj)}die "Could not turn $s into a deaggregator"}sub cast_clumper_or_die {my$obj=shift;if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Value')){my@clumper=$obj->get_possibilities('CLUMPER');if(@clumper > 1){die "Multiple clumpers for " .$obj->get_description()}if(@clumper==1){return$clumper[0]}die "No usable possibilities for " .$obj->get_description()}if(blessed($obj)&& $obj->isa('App::RecordStream::Clumper::Base')){return$obj}my$s="unknown";if(blessed($obj)){$s=ref($obj)}die "Could not turn $s into a clumper"}sub cast_scalar_or_die {my$obj=shift;if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Value')){my@scalar=$obj->get_possibilities('SCALAR');if(@scalar > 1){die "Multiple scalar values for " .$obj->get_description()}if(@scalar==1){return$scalar[0]}die "No scalar possibilities for " .$obj->get_description()}if(blessed($obj)&& $obj->isa('App::RecordStream::Aggregator::Aggregation')){die "Aggregator found where scalar expected"}if(blessed($obj)&& $obj->isa('App::RecordStream::DomainLanguage::Valuation')){die "Valuation found where scalar expected"}return$obj}1; APP_RECORDSTREAM_DOMAINLANGUAGE_VALUE $fatpacked{"App/RecordStream/Executor.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_EXECUTOR'; package App::RecordStream::Executor;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Operation;my$NEXT_ID=0;my$DEFAULT_METHOD_NAME='__MY__DEFAULT';sub new {my$class=shift;my$snippets=shift;if (ref($snippets)ne 'HASH'){my$code=<{code=>$code,arg_names=>['r'],},}}my$this={ID=>$NEXT_ID,SNIPPETS=>$snippets,};$NEXT_ID++;bless$this,$class;$this->init();return$this}sub init {my$this=shift;$this->create_safe_package()}sub create_snippets {my$this=shift;my$code='';for my$name (keys %{$this->{'SNIPPETS'}}){my$arg_names=$this->{'SNIPPETS'}->{$name}->{'arg_names'};my$args_spec='';if ($arg_names){$args_spec='my (';$args_spec .= join(',',map {"\$$_"}@$arg_names);$args_spec .= ') = @_;'}my$method_name=$this->get_method_name($name);my$snippet=$this->transform_code($this->{'SNIPPETS'}->{$name}->{'code'});$code .= <{'ID'}}sub create_safe_package {my$this=shift;my$package_name=$this->get_safe_package_name();my$snippets=$this->create_snippets();my$code=<{'SNIPPETS'}}){my$method_name=$this->get_method_name($name);my$code_ref=\&{$package_name .'::' .$method_name};$this->{'SNIPPETS'}->{$name}->{'CODE_REF'}=$code_ref}}sub clear_vars {my$this=shift;my$package_name=$this->get_safe_package_name();my%method_names=map {$this->get_method_name($_)=>1}keys %{$this->{'SNIPPETS'}};{no strict;no warnings;for my$variable (keys %{$package_name .'::'}){next if (exists$method_names{$variable});undef *{$package_name .'::' .$variable};delete ${$package_name .'::'}{$variable}}}}sub set_scalar {my$this=shift;my$name=shift;my$val=shift;my$package_name=$this->get_safe_package_name();{no strict;no warnings;*{$package_name .'::' .$name}=\$val}}sub get_scalar {my$this=shift;my$name=shift;my$package_name=$this->get_safe_package_name();{no strict;no warnings;return ${$package_name .'::' .$name}}}sub set_executor_method {my$this=shift;my$name=shift;my$ref=shift;my$package_name=$this->get_safe_package_name();{no strict;no warnings;*{$package_name ."::" .$name}=$ref}}sub get_code_ref {my$this=shift;my$name=shift;$this->{'SNIPPETS'}->{$name}->{'CODE_REF'}}sub eval_safe_package {my$__MY__code=shift;my$code=<execute_method($DEFAULT_METHOD_NAME,@args)}sub execute_method {my ($this,$name,@args)=@_;return$this->get_code_ref($name)->(@args)}sub transform_code {my$this=shift;my$code=shift;while ($code =~ m/\{\{(.*?)\}\}/){my$specifier=$1;my$guessing_code='${App::RecordStream::KeySpec::find_key($r, qq{\@' .$specifier .'})}';$code =~ s/\{\{.*?\}\}/$guessing_code/}return$code}sub usage {return <{foo}->{bar 1}: (comma separate nested keys) {{zoo}} {{foo/ar 1}} # Even assign to values (set the foo key to the value 1) {{foo}} = 1 # And auto, vivify {{new_key/array_key/#0}} = 3 # creates an array within a hash within a hash # Index into an array {{array_key/#3}} # The value of index 3 of the array ref under the 'array_key' hash key. __FORMAT_TEXT__ This matching is a fuzzy keyspec matching, see --help-keyspecs for more details. __FORMAT_TEXT__ USAGE APP_RECORDSTREAM_EXECUTOR $fatpacked{"App/RecordStream/Executor/Getopt.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_EXECUTOR_GETOPT'; package App::RecordStream::Executor::Getopt;use strict;use warnings;sub new {my$class=shift;my$this={'STRINGS'=>[],'MODULES'=>[],};bless$this,$class;return$this}sub arguments {my$this=shift;return ('e=s'=>sub {$this->push_string($_[1])},'E=s'=>sub {$this->push_file($_[1])},'M=s'=>sub {$this->push_module($_[1],1)},'m=s'=>sub {$this->push_module($_[1],0)},)}sub get_strings {my$this=shift;my$args=shift;my$strings=$this->{'STRINGS'};if(!@$strings){if(!@$args){die "Missing expression.\n"}push @$strings,shift @$args}return map {@$_}$this->{'MODULES'},$strings}sub get_string {my$this=shift;return join("",$this->get_strings(@_))}sub push_string {my$this=shift;my$string=shift;push @{$this->{'STRINGS'}},$string}sub push_file {my$this=shift;my$file=shift;my$string=$this->_slurp($file);push @{$this->{'STRINGS'}},$string}sub _slurp {my$this=shift;my$file=shift;local $/;undef $/;open (my$fh,'<',$file)or die "Could not open code snippet file: $file: $!";my$code=<$fh>;close$fh;return$code}sub push_module {my$this=shift;my ($module,$import)=split /=/,shift,2;my$import_default=shift;my$statement;if (defined$import){$import =~ s/(?=[\\'])/\\/g;$statement="use $module (split(/,/, '$import', 0));"}elsif ($import_default){$statement="use $module;"}else {$statement="use $module ();"}push @{$this->{'MODULES'}},$statement}1; APP_RECORDSTREAM_EXECUTOR_GETOPT $fatpacked{"App/RecordStream/InputStream.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_INPUTSTREAM'; package App::RecordStream::InputStream;our$VERSION="4.0.25";use strict;use warnings;use IO::String;use JSON::MaybeXS;use App::RecordStream::Record;require App::RecordStream::Operation;my$json=JSON->new;my$ONE_OF=[qw(FH STRING FILE)];my$ARGUMENTS={FH=>0,STRING=>0,FILE=>0,NEXT=>0,};sub new_magic {my$class=shift;my$files=shift || \@ARGV;if (scalar @$files > 0){return$class->new_from_files($files)}return$class->new(FH=>\*STDIN)}sub new_from_files {my$class=shift;my$files=shift;my$last_stream;for my$file (reverse @$files){unless (-e $file && -r $file){die "File does not exist or is not readable: $file\n"}my$new_stream=$class->new(FILE=>$file,NEXT=>$last_stream);$last_stream=$new_stream}return$last_stream}sub new {my$class=shift;my%args=@_;my$this={};for my$key (keys %$ARGUMENTS){my$value=$args{$key};$this->{$key}=$value;if ($ARGUMENTS->{$key}){die "Did not supply required argument: $key" unless ($value)}}bless$this,$class;$this->_init();return$this}sub _init {my$this=shift;my$found={};for my$arg (@$ONE_OF){if ($this->{$arg}){$found->{$arg}=$this->{$arg}}}if (scalar keys %$found > 1){die "Must specify only one of " .join(' ',keys %$found)}unless (scalar keys %$found==1){die "Must specify one of " .join(' ',@$ONE_OF)}if ($this->get_string()){$this->{'FH'}=IO::String->new($this->get_string())}my$file=$this->get_file();if ($file){open(my$fh,'<',$file)or die "Cannot open $file: $!";$this->{'FH'}=$fh}}sub get_file {my$this=shift;return$this->{'FILE'}}sub get_string {my$this=shift;return$this->{'STRING'}}sub get_fh {return $_[0]->{'FH'}}sub get_record {my$this=shift;if ($this->is_done()){return$this->call_next_record()}my$fh=$this->get_fh();my$line=<$fh>;if (!$line){close$fh;$this->set_done();App::RecordStream::Operation::set_current_filename($this->get_filename());return$this->call_next_record()}my$record=$json->decode($line);bless$record,'App::RecordStream::Record';return$record}sub call_next_record {my$this=shift;my$next=$this->get_next();unless ($next){return undef}if ($next && $next->is_done()){$next=$next->get_next();$this->{'NEXT'}=$next}return$next->get_record()}sub get_filename {my$this=shift;if (!$this->is_done()){return$this->get_file()if ($this->get_file());return 'STRING_INPUT' if ($this->get_string());return 'STREAM_INPUT' if ($this->get_fh());return 'UNKNOWN'}elsif ($this->get_next()){return$this->get_next()->get_filename()}}sub get_next {my$this=shift;return$this->{'NEXT'}}sub is_done {return $_[0]->{'DONE'}}sub set_done {my$this=shift;$this->{'DONE'}=1}1; APP_RECORDSTREAM_INPUTSTREAM $fatpacked{"App/RecordStream/KeyGroups.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_KEYGROUPS'; package App::RecordStream::KeyGroups;our$VERSION="4.0.25";use strict;use warnings;sub new {my$class=shift;my@args=@_;my$this={KEY_GROUPS=>[],};bless$this,$class;$this->add_groups($_)foreach@args;return$this};sub has_any_group {my$this=shift;return (scalar @{$this->{'KEY_GROUPS'}})> 0}sub add_groups {my$this=shift;my$groups=shift;for my$group_spec (split(',',$groups)){my$group;if ($group_spec =~ m/^!/){$group=App::RecordStream::KeyGroups::Group->new($group_spec)}else {$group=App::RecordStream::KeyGroups::KeySpec->new($group_spec)}push @{$this->{'KEY_GROUPS'}},$group}}sub get_keyspecs_for_record {my$this=shift;my$record=shift;my@specs;for my$group (@{$this->{'KEY_GROUPS'}}){push@specs,@{$group->get_fields($record)}}return \@specs}sub get_keyspecs {my$this=shift;my$record=shift;if (!$this->{'KEY_SPECS'}){$this->{'KEY_SPECS'}=$this->get_keyspecs_for_record($record)}return$this->{'KEY_SPECS'}}sub usage {return <$spec,};return bless$this,$class}sub get_fields {my$this=shift;my$record=shift;if ($record->has_key_spec($this->{'SPEC'})){return [join('/',@{$record->get_key_list_for_spec($this->{'SPEC'})})]}return []}1;package App::RecordStream::KeyGroups::Group;my$VALID_OPTIONS={d=>'depth',depth=>'depth',s=>'sort','sort'=>'sort',f=>'full_match',full=>'full_match',rr=>'return_refs',returnrefs=>'return_refs' };sub new {my$class=shift;my$group_spec=shift;my$this={};bless$this,$class;$this->parse_group($group_spec);return$this}sub get_fields {my$this=shift;my$record=shift;my@specs;my$regex=$this->{'REGEX'};for my$spec (@{$this->get_specs($record)}){if ($spec =~ m/$regex/){push@specs,$spec}}if ($this->has_option('sort')){@specs=sort@specs}return \@specs}sub get_specs {my$this=shift;my$record=shift;my$min_depth=1;my$max_depth=1;if ($this->has_option('full_match')){$max_depth=-1}elsif ($this->has_option('depth')){my$depth=$this->option_value('depth');$min_depth=$depth;$max_depth=$depth}my$paths=[];$this->_get_paths({%$record},1,$min_depth,$max_depth,[],$paths);return [map {join('/',@$_)}@$paths]}sub _get_paths {my$this=shift;my$data=shift;my$current_depth=shift;my$min_depth=shift;my$max_depth=shift;my$current_keys=shift;my$found_paths=shift;if ($current_depth >= $min_depth){if (ref($data)eq '' || $this->has_option('return_refs')){push @$found_paths,[@$current_keys]}}if (ref($data)eq 'ARRAY'){my$index=-1;for my$value (@$data){$index++;if ($current_depth <= $max_depth || $max_depth==-1){$this->_get_paths($value,$current_depth+1,$min_depth,$max_depth,[@$current_keys,"\#index"],$found_paths)}}}if (ref($data)eq 'HASH'){for my$key (keys %$data){if ($current_depth <= $max_depth || $max_depth==-1){$this->_get_paths($data->{$key},$current_depth+1,$min_depth,$max_depth,[@$current_keys,$key],$found_paths)}}}}sub parse_group {my$this=shift;my$spec=shift;if ('!' ne substr($spec,0,1)){die "Malformed group spec: '$spec', does not start with '!'\n"}if (length($spec)< 2){die "Malformed group spec: '$spec', does not have enough length\n"}my$regex='';my$last_char='';my$found_end=0;my$start_option_index=1;for (my$index=1;$index < length($spec);$index++){$start_option_index++;my$current_char=substr($spec,$index,1);if ($current_char eq '!'){if ($last_char ne '\\'){$last_char=$current_char;$found_end=1;last}}$last_char=$current_char;$regex .= $current_char;next}die "Malformed group spec: Did not find terminating '!' in '$spec'\n" if (!$found_end);my$options_string=substr($spec,$start_option_index);my$options={};for my$option_group (split('!',$options_string)){my ($option,$value)=split('=',$option_group);if (my$normalized_option=$VALID_OPTIONS->{$option}){if (exists$options->{$normalized_option}){die "Already specified option '$option'. Bad option: '$option_group' in '$spec'\n"}else {$options->{$normalized_option}=$value}}else {die "Malformed group spec: Unrecognized option: '$option' in '$spec'\n"}}$this->{'REGEX'}=$regex;$this->{'OPTIONS'}=$options}sub has_option {my$this=shift;my$option=shift;return exists$this->{'OPTIONS'}->{$option}}sub option_value {my$this=shift;my$option=shift;return$this->{'OPTIONS'}->{$option}}1; KEY GROUPS __FORMAT_TEXT__ SYNTAX: !regex!opt1!opt2... Key groups are a way of specifying multiple fields to a recs command with a single argument or function. They are generally regexes, and have several options to control what fields they match. By default you give a regex, and it will be matched against all first level keys of a record to come up with the record list. For instance, in a record like this: __FORMAT_TEXT__ { 'zip': 1, 'zap': 2, 'foo': { 'bar': 3 } } __FORMAT_TEXT__ Key group: !z! would get the keys 'zip' and 'zap' You can have a literal '!' in your regex, just escape it with a \\. Normally, key groups will only match keys whose values are scalars. This can be changed with the 'returnrefs' or rr flag. With the above record !f! would match no fields, but !f!rr would match foo (which has a value of a hash ref) Options on KeyGroups: __FORMAT_TEXT__ returnrefs, rr - Return keys that have reference values (default:off) full, f - Regex should match against full keys (recurse fully) depth=NUM,d=NUM - Only match keys at NUM depth (regex will match against full keyspec) sort, s - sort keyspecs lexically HELP APP_RECORDSTREAM_KEYGROUPS $fatpacked{"App/RecordStream/KeySpec.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_KEYSPEC'; package App::RecordStream::KeySpec;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::KeySpec;use Data::Dumper;my$registry={};sub find_key {my ($data,$spec,$no_vivify,$throw_error)=@_;my$spec_obj=__PACKAGE__->new($spec);return$spec_obj->guess_key($data,$no_vivify,$throw_error)}sub new {my$class=shift;my$spec=shift;if (exists$registry->{$spec}){return$registry->{$spec}}my$this={SPEC=>$spec,};bless$this,$class;$this->init();$registry->{$spec}=$this;return$this}sub init {my$this=shift;$this->_parse_key_spec()}{my$guessed_keys={};sub _search_string_to_key {my$key_chain=shift;my$string=shift;return$guessed_keys->{join('-',@$key_chain)}->{$string}}sub _add_string_key_mapping {my$key_chain=shift;my$string=shift;my$key=shift;$guessed_keys->{join('-',@$key_chain)}->{$string}=$key}}sub _guess_key_name_raw {my ($this,$data,$key_chain,$search_string)=@_;my$fuzzy=$this->{'FUZZY'};if (UNIVERSAL::isa($data,'ARRAY')){if ($search_string =~ m/^#(\d+)$/){return $1}else {die "Cannot select non-numeric index: $search_string (did you forget to prefix with a '#'?) for array: " .Dumper($data)}}return$search_string if (!$fuzzy);my$found_key;if (my$key=_search_string_to_key($key_chain,$search_string)){return$key}if (defined$data->{$search_string}){$found_key=$search_string}else {for my$key (CORE::sort(CORE::keys %$data)){if ($key =~ m/^\Q$search_string\E/i){$found_key=$key}}}if (!$found_key){for my$key (CORE::sort(CORE::keys %$data)){if ($key =~ m/$search_string/i){$found_key=$key}}}if (!$found_key){$found_key=$search_string}_add_string_key_mapping($key_chain,$search_string,$found_key);return$found_key}sub has_key_spec {my ($this,$data)=@_;eval {$this->guess_key($data,0,1)};if ($@ =~ m/^NoSuchKey/){return 0}elsif ($@){die $@}return 1}sub get_key_list_for_spec {my ($this,$data)=@_;return$this->_guess_key_recurse($data,[],1,0,1,@{$this->{'PARSED_KEYS'}},)}sub _parse_key_spec {my ($this)=@_;my$spec=$this->{'SPEC'};my$fuzzy=0;my$spec_name=$spec;if (substr($spec,0,1)eq '@'){$fuzzy=1;$spec=substr($spec,1)}my$keys=[];my$current_key='';my$last_char='';for (my$index=0;$index < length($spec);$index++){my$current_char=substr($spec,$index,1);if ($current_char eq '/' && $last_char ne '\\'){push @$keys,$current_key;$current_key='';$last_char='';next}else {if ($current_char eq '/'){chop$current_key}$current_key .= $current_char;$last_char=$current_char;next}}if ($current_key ne ''){push @$keys,$current_key}$this->{'PARSED_KEYS'}=$keys;$this->{'FUZZY'}=$fuzzy}{my$keylookup_hash={};sub guess_key {my ($this,$data,$no_vivify,$throw_error)=@_;my@args=@{$this->{'PARSED_KEYS'}};$no_vivify ||= 0;$throw_error ||= 0;my$args_string=join('-',@args,$no_vivify,$throw_error);if (my$code=$keylookup_hash->{$args_string}){return$code->($data)}my$keys=$this->_guess_key_recurse($data,[],$no_vivify,$throw_error,1,@args,);my$code=$this->_generate_keylookup_sub($keys,$no_vivify);$keylookup_hash->{$args_string}=$code;return$code->($data)}}sub _generate_keylookup_sub {my$this=shift;my$keys=shift;my$no_vivify=shift;my$throw_error=shift;if (scalar @$keys==0){return eval 'sub { if ( \$throw_error ) { die "NoSuchKey"; } return ""; }'}my$code_string='sub { my $record = shift;';my$key_accessor='$record';my$action="return ''";$action="die 'NoSuchKey'" if ($throw_error);my$check_actions='';for my$key (@$keys){if ($key =~ m/^#(\d+)$/){my$index=$1;$key_accessor .= "->[$index]"}else {my@hex_bytes=unpack('C*',$key);my$hex_string='';for my$byte (@hex_bytes){$hex_string .= "\\x" .sprintf ("%lx",$byte)}$key_accessor .= "->{\"$hex_string\"}"}$check_actions .= "$action if ( ! exists $key_accessor );"}if ($no_vivify || $throw_error){$code_string .= $check_actions}$code_string .= "return \\($key_accessor)}";my$sub_ref=eval$code_string;if ($@){warn "Unexpected error in creating key lookup!\n";die $@}return$sub_ref}sub _guess_key_recurse {my ($this,$data,$key_chain,$no_vivify,$throw_error,$return_key_chain,$search_string,@next_strings)=@_;my$type=ref($data);if ($type eq 'SCALAR' || UNIVERSAL::isa(\$data,'SCALAR')){die "Cannot look for $search_string in scalar: " .Dumper($data)}my$key=$this->_guess_key_name_raw($data,$key_chain,$search_string);my$value;if ($type eq 'ARRAY'){$value=\($data->[$key]);$key="#$key"}else {if ($no_vivify && (!exists$data->{$key})){return$return_key_chain ? []: ''}$value=\($data->{$key})}if (scalar@next_strings > 0){if (!defined $$value){die "NoSuchKey" if ($throw_error);if ($no_vivify){return$return_key_chain ? []: ''}if (substr($next_strings[0],0,1)eq '#'){$$value=[]}else {$$value={}}}return$this->_guess_key_recurse($$value,[@$key_chain,$key],$no_vivify,$throw_error,$return_key_chain,@next_strings,)}return$return_key_chain ? [@$key_chain,$key]: $value}sub keyspec_help {return <{'hr'}={};$this->{'head'}=undef;$this->{'tail'}=undef;$this->{'ct'}=0;bless$this,$class;return$this}sub find {my ($this,$k)=@_;my$data=$this->{'hr'}->{$k};if($data){$this->_unlink($data->[0]);$this->_head($data->[0]);return$data->[1]}return undef}sub put {my ($this,$k,$v)=@_;my$data=$this->{'hr'}->{$k};if($data){$data->[1]=$v;$this->_unlink($data->[0]);$this->_head($data->[0]);return}my$node=[undef,undef,$k];$this->_head($node);$this->{'hr'}->{$k}=[$node,$v]}sub _unlink {my ($this,$node)=@_;if($node->[0]){$node->[0]->[1]=$node->[1]}else {$this->{'head'}=$node->[1]}if($node->[1]){$node->[1]->[0]=$node->[0]}else {$this->{'tail'}=$node->[0]}--$this->{'ct'}}sub _head {my ($this,$node)=@_;$node->[0]=undef;$node->[1]=$this->{'head'};$this->{'head'}=$node;if($node->[1]){$node->[1]->[0]=$node}else {$this->{'tail'}=$node}++$this->{'ct'}}sub purgenate {my ($this,$size)=@_;my@goners;while($this->{'ct'}> $size){my$node=$this->{'tail'};$this->_unlink($node);my$key=$node->[2];my$data=delete$this->{'hr'}->{$key};push@goners,$data->[1]}return@goners}1; APP_RECORDSTREAM_LRUSHERIFF $fatpacked{"App/RecordStream/Manual/Examples.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_MANUAL_EXAMPLES'; use strict; use warnings; package App::RecordStream::Manual::Examples; =head1 NAME App::RecordStream::Manual::Examples - A set of simple recs examples =head1 DESCRIPTION This file provides a couple of useful examples of recs chains and how each of them break down. This is meant as a learning tool for folks new recs that would like to see some cool things you can do with RecordStream. Another good resource is L, also viewable by running C, which is a humorous story meant to get the newest users used to recs. =head1 EXAMPLES =head2 How many processes is each user on my system running? recs fromps | recs collate --key uid -a count | recs sort --key count=n | recs totable Broken down this is: =over =item 1. recs fromps First get the records of all the prcesses currently running =item 2. recs collate --key uid -a count Grouping by the C field, count how many records fall into the group (stored in the C field by default) =item 3. recs sort --key count=n Sort the resulting records by the C field numerically (rather than lexically) =item 4. recs totable Print the output in a nicely formatted plain text table =back =head2 How many processes for each user at each priority level? recs fromps | recs collate --key uid,priority -a count | recs toptable --x priority --y uid --v count Broken down: =over =item 1. recs fromps First get the records of all the prcesses currently running =item 2. recs collate --key uid,priority -a count Grouping by the uid and the priority field, count how many records fall into the group =item 3. recs toptable --x priority --y uid -v count Create a 2 dimensional table (a I

ivot I), across the top put the priority values, down the side put the uid, in each cell put the value of the count field for that priority/uid. =back =head2 Prep a report on number of modules logging to Xorg.log What Xorg modules put information in my Xorg.log at startup, and what log level are they logged at? I need this in CSV format for importing into a spreadsheet program. recs frommultire --re 'type,module=\((\S*)\) ([^:]+):' /var/log/Xorg.0.log \ | recs collate --key type,module -a ct \ | recs sort --key ct=n \ | recs tocsv --header =over =item 1. recs frommultire --re 'type,module=\((\S*)\) ([^:]+):' /var/log/Xorg.0.log Parse out the type and module from the Xorg log file. That regex captures non-whitespace inside a literal C<()> pair, then captures text after a space up to the first C<:> (colon). =item 2. recs collate --key type,module -a ct Collate records into groups of type-modules, and count how many in each group across all records =item 3. recs sort --key ct=n Sort by the count, numerically =item 4. recs tocsv --header Output a table in spreadsheet format (no ASCII art), delimited by commas =back =head1 SEE ALSO =over =item * See L for an overview of the scripts and the system =item * Run C or see L for a humorous introduction to RecordStream =back =cut 1; APP_RECORDSTREAM_MANUAL_EXAMPLES $fatpacked{"App/RecordStream/Manual/Story.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_MANUAL_STORY'; use strict; use warnings; package App::RecordStream::Manual::Story; =head1 NAME App::RecordStream::Manual::Story - A humorous introduction to recs =head1 DESCRIPTION This was a humorous article written to give an introduction to the RecordStream tools, please read with tongue firmly planted in cheek. =head1 And So It Begins Every ninja knows the value of tools. How could one accomplish anything without a mouse quenched in the blood of a live and willing dragon or a monitor whose CRT is hewn from a single enormous diamond, formed in the fire of Hades? Indeed, most ninja scholars agree unanimously that the hundred-handed conference room reservation prana cannot be done with out a keyboard each key of which is greased with the blood of a different enemy of your ancestors'. Given that you have already collected the 12 ancient swords of the sun and the IBM Model M keyboard, you must now choose wisely which to use in each of your challenges. When one wrestles Leviathan one does not do it with chopsticks. Similarly, when a ninja faces a complex dataset they do not come at it with grep (well, many times you start with grep). The true ninja runs atop the cubicle dividers, slaughtering all until the dataset is rendered meaningless. code ninjas use recs. =head1 On the first day, we analyzed an access log Say you have only seconds to report URL statistics from an apache access log before the ancient sea wyrm of Atlantis raises from the Puget Sound and destroys Seattle entirely. Then you might type something like this: recs-frommultire --re 'latency=TIME: (\d*)' --re 'method,url="([^" ]*) ([^" ?]*)' access.log \ | recs-xform '$r->{url} =~ s/(get\.cgi)\/.*/$1/;' \ | recs-collate -k url --perfect -a 'avg,latency' -a count \ | recs-sort -k 'avg_latency=-n' \ | head -n 5 \ | recs-totable Scared yet? A proper tool should always inspire fear in the weak. With appropriate mastery, you too can learn to banish ancient horrors using recs. But one does not learn recs-jitsu all at once; one must learn it kata by kata. First one must understand the principles and overall form of this arcane art. Recs, or RecordStream is a collection of scripts that facilitates the parsing of files into JSON records and the transformation of those records. Many common UNIX programs like grep, sort, and uniq have recs analogs and several recs scripts allow transformations unheard of using typical UNIX tools. In general the tools fall into three categories: those that produce JSON records, those that operate on JSON records, and those that convert JSON records into output. A typical use of recs will consist of one of the first type, one, or more of the second type, and one of the third type. To begin using recs, you'll have to decide on how to get your data into JSON. There are several scripts available to do this, one of the most powerful of which is recs-frommultire. It allows you to write multiple regular expressions to capture fields. =head1 recs-frommultire - parsing data into JSON To understand how our invocation of recs-frommultire was written, you'll want to see our access log. Here are four sample lines: 192.168.151.55 - - [10/Sep/2007:01:01:55 -0700] "GET /view_image.cgi?uid=bernard&badge=1 HTTP/1.1" 200 3528 TIME: 0 192.168.153.89 - - [10/Sep/2007:01:02:28 -0700] "GET /x.gif HTTP/1.1" 304 - TIME: 0 192.168.153.105 - - [10/Sep/2007:01:02:32 -0700] "GET /dbfiles/get.cgi/data.xml HTTP/1.1" 200 7338 TIME: 1 192.168.151.66 - - [10/Sep/2007:01:02:41 -0700] "GET /helpdesk.html HTTP/1.1" 200 40 TIME: 1 For reference the invocation was: recs-frommultire --re 'latency=TIME: (\d*)' --re 'method,url="([^" ]*) ([^" ?]*)' access.log The first option specifies one field, named "latency" which is the only capture group of the first regular expression. The second option specifies two fields, named "method" and "url" which are the two capture groups of the second regular expression. The final argument is the file to parse. Each regular expression is run against each line. When a field would be duplicated, all matches so far are flushed as a record. The output from recs-frommultire looks like: {"url":"/view_image.cgi","method":"GET","latency":"0"} {"url":"/x.gif","method":"GET","latency":"0"} {"url":"/dbfiles/get.cgi/data.xml","method":"GET","latency":"1"} {"url":"/helpdesk.html","method":"GET","latency":"1"} =head1 recs-xform - arbitrary manipulation of records JSON is mostly human readable and as you can see each record has three fields, "url", "method", and "latency". Unfortunately "url" isn't quite as we want it. As it stands the key for get.cgi requests is included in the URL but that will mess up our statistics so we'd like to get rid of it which brings us to our next stage in the pipeline: recs-xform '$r->{url} =~ s/(get\.cgi)\/.*/$1/;' recs-xform is both simple and powerful: it executes arbitrary, inline perl on each record. The record is represented as a App::RecordStream::Record object in the scalar $r, but all the fields can be accessed as if it were no more than a hashref. In this case we are using a substitution command to strip the key off of get.cgi requests. At this point our data is ready to be aggregated and made into statistics. =head1 recs-collate - Generate aggregate statistics recs-collate -k url --perfect -a avg,latency -a count recs-collate is the crown jewel of recs analysis. It groups records from input together, computes aggregate information about them, and dumps this aggregate information as output records. "-k url" requests that records be grouped by their "url" field. "--perfect" indicates that they should be grouped together even if they are not adjacent in input (adjacent only is the default). "-a avg,latency" requests that the average aggregator be used on the latency field. "-a count" requests that the count aggregator be used. Aggregators are one of the most powerful features of recs. As of writing there are 21 distinct aggregators ready for use. Some of the most powerful are: average: averages provided field count: counts (non-unique) records distinctcount: count unique values from provided field maximum: maximum value for a field percentile: value of pXX for field sum: sums provided field You can find out what all of them are with `recs-collate --list-aggregators`. Here are a few sample records from after the collate step: {"count":11,"url":"/dbfiles/list.cgi","avg_latency":21.0909090909091} {"count":2,"url":"/linkGenerator/Host.cgi","avg_latency":0.5} {"count":3,"url":"/view_image.cgi","avg_latency":0.333333333333333} {"count":21,"url":"/dbfiles/check.cgi","avg_latency":0.476190476190476} =head1 recs-sort - ordering records in a stream Now that the collation has been done the records have the numbers we desire, but they are neither in a useful order, nor a pretty format. The first we rectify with recs-sort: recs-sort -k 'avg_latency=-n' We have specified that the records are to be sorted by their avg_latency field and they are to be sorted numerically, descending (negative n) =head1 recs-totable - pretty output of data Finally, we convert JSON back to something slightly more human readable: head -n 5 | recs-totable Since JSON records are one to a line, we can use good ol' UNIX head to take the 5 top offenders. And we use recs-totable to convert those top ten to a nicely formatted text table: avg_latency ct url ----------------- ---- ----------------------- 21.0909090909091 11 /dbfiles/list.cgi 1.36368901114811 6907 /view_image.cgi 1.02898550724638 345 /helpdesk.html 1 1 /dbfiles/ 0.727272727272727 11 /linkGenerator/Host.cgi =head1 And so it ends When faced with awesome prowess like this, what can a 346-foot, 26000-ton sea monster from beyond the stars do but slink back to its cave and bide its time beneath downtown Seattle? Should you find yourself locked in mortal combat with a unspeakable horror of your own you can always turn to --help. All recs scripts come equipped with detailed usage instructions triggered by the --help option. You can also turn to `man recs` (if the man file is installed correctly) =head1 SEE ALSO =over =item * See L for an overview of the scripts and the system =item * Run C or see L for a set of simple recs examples =back =cut 1; APP_RECORDSTREAM_MANUAL_STORY $fatpacked{"App/RecordStream/Operation.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION'; package App::RecordStream::Operation;our$VERSION="4.0.25";use strict;use warnings;use Carp;use FindBin qw($Script $RealScript);use Getopt::Long;use Text::Autoformat;use App::RecordStream::Clumper;use App::RecordStream::DomainLanguage;use App::RecordStream::Executor;use App::RecordStream::KeyGroups;use App::RecordStream::Site;use App::RecordStream::Stream::Base;use App::RecordStream::Stream::Printer;use base 'App::RecordStream::Stream::Base';sub usage {subclass_should_implement(shift)}sub new {my$class=shift;my$args=shift;my$next=shift;my$this={NEXT=>$next,};bless$this,$class;$this->init_help();$this->init($args);return$this}sub init_help {my$this=shift;$this->{'HELP_TYPES'}={all=>{USE=>0,SKIP_IN_ALL=>1,CODE=>\&all_help,DESCRIPTION=>'Output all help for this script',},snippet=>{USE=>0,SKIP_IN_ALL=>0,CODE=>\&snippet_help,DESCRIPTION=>'Help on code snippets',},keygroups=>{USE=>0,SKIP_IN_ALL=>0,CODE=>\&keygroups_help,DESCRIPTION=>'Help on keygroups, a way of specifying multiple keys',},keyspecs=>{USE=>0,SKIP_IN_ALL=>0,CODE=>\&keyspecs_help,DESCRIPTION=>'Help on keyspecs, a way to index deeply and with regexes',},basic=>{USE=>1,SKIP_IN_ALL=>0,CODE=>\&basic_help,OPTION_NAME=>'help',DESCRIPTION=>'This help screen',},'keys'=>{USE=>0,SKIP_IN_ALL=>1,CODE=>\&keys_help,DESCRIPTION=>'Help on keygroups and keyspecs',},domainlanguage=>{USE=>0,SKIP_IN_ALL=>0,CODE=>\&domainlanguage_help,DESCRIPTION=>'Help on the recs domain language, a [very complicated] way of specifying valuations (which act like keys) or aggregators',},clumping=>{USE=>0,SKIP_IN_ALL=>0,CODE=>\&clumping_help,DESCRIPTION=>'Help on clumping; mechanisms to group records across a stream' },};$this->add_help_types()}sub options_string {my ($this,$options)=@_;push @$options,['filename-key|fk ','Add a key with the source filename (if no filename is applicable will put NONE)']if$this->does_record_output;my$string=$this->_options_format($options);$string .= "\n Help Options:\n";my$help_options=[];for my$type (sort keys %{$this->{'HELP_TYPES'}}){my$info=$this->{'HELP_TYPES'}->{$type};next unless ($info->{'USE'});my$option_name=$info->{'OPTION_NAME'}|| "help-$type";my$description=$info->{'DESCRIPTION'};push @$help_options,[$option_name,$description]}$string .= $this->_options_format($help_options,2);while (chomp$string > 0){}return$string}sub _options_format {my ($this,$options,$indent_level)=@_;$indent_level=1 if (not defined$indent_level);my$max_length=0;for my$pair (@$options){my ($name)=@$pair;my$name_length=length($name);$max_length=$name_length if ($name_length > $max_length)}my$string='';my$description_indent_level=($indent_level * 3)+ $max_length + 4;for my$pair (@$options){my ($name,$description)=@$pair;my$formatted=$this->format_text($description,$description_indent_level);my$description_prefix=(' ' x ($indent_level * 3)).'--' .$name;$description_prefix .= ' ' x ($description_indent_level - length($description_prefix));my$prefix_size=length($description_prefix);$string .= $description_prefix;$string .= substr$formatted,$prefix_size}return$string}{my$size_initialized=0;my$size=80;sub get_terminal_size {if (!$size_initialized){$size_initialized=1;if (eval {require Term::ReadKey;1}){eval {$size=(Term::ReadKey::GetTerminalSize())[0]}}elsif ($ENV{COLUMNS}){$size=$ENV{COLUMNS}}}return$size}}sub format_text {my ($this,$text,$left_indent)=@_;$left_indent ||= 0;return autoformat$text,{left=>$left_indent + 1,right=>get_terminal_size(),all=>1,}}sub add_help_types {}sub use_help_type {my$this=shift;my$type=shift;$this->{'HELP_TYPES'}->{$type}->{'USE'}=1;$this->{'HELP_TYPES'}->{'all'}->{'USE'}=1}sub add_help_type {my$this=shift;my$type=shift;my$action=shift;my$description=shift;my$skip_in_all=shift;my$option_name=shift || 0;$this->{'HELP_TYPES'}->{$type}={USE=>1,SKIP_IN_ALL=>$skip_in_all,CODE=>$action,OPTION_NAME=>$option_name,DESCRIPTION=>$description,}}sub parse_options {my$this=shift;my$args=shift || [];my$options_spec=shift || {};my$configuration_options=shift || [];for my$help_type (keys %{$this->{'HELP_TYPES'}}){my$type_info=$this->{'HELP_TYPES'}->{$help_type};next unless ($type_info->{'USE'});my$help_option=$type_info->{'OPTION_NAME'}|| 'help-' .$help_type;$options_spec->{$help_option}||= sub {$type_info->{'CODE'}->($this);exit 1}}$options_spec->{'--filename-key|fk=s'}=\($this->{'FILENAME_KEY'})if$this->does_record_output;my$starting_config=Getopt::Long::Configure();Getopt::Long::Configure('no_ignore_case',@$configuration_options);local@ARGV=@$args;unless (GetOptions(%$options_spec)){$this->_set_wants_help(1)}Getopt::Long::Configure($starting_config);@$args=@ARGV}sub update_current_filename {my ($this,$filename)=@_;set_current_filename($filename)}sub _set_wants_help {my$this=shift;my$help=shift;$this->{'WANTS_HELP'}=$help}sub get_wants_help {my$this=shift;return$this->{'WANTS_HELP'}}sub _set_exit_value {my$this=shift;my$value=shift;$this->{'EXIT_VALUE'}=$value}sub get_exit_value {my$this=shift;return$this->{'EXIT_VALUE'}|| 0}sub print_usage {my$this=shift;my$message=shift;if ($message){chomp$message;warn "$message\n";if ($message =~ m/FATAL/){return}}my$usage=$this->usage();while (chomp$usage > 0){}my$formatted_usage=$this->format_usage($usage);while (chomp$formatted_usage > 0){}print$formatted_usage ."\n"}sub format_usage {my ($this,$usage)=@_;my$lines=[split("\n",$usage)];my$output='';my$capturing=0;my$accumulator=0;my$current_indent=0;while(@$lines){my$line=shift @$lines;chomp$line;if ($line =~ m/^\s*__FORMAT_TEXT__\s*$/){if ($capturing){$capturing=0;$output .= $this->format_text($accumulator,$current_indent)}else {$capturing=1;my$first_line=shift @$lines;chomp$first_line;my ($indention)=$first_line =~ m/^(\s*)/;$first_line =~ s/\s*//;$current_indent=length($indention);$accumulator=$first_line}}elsif ($capturing){if ($line =~ m/^\s*$/){$accumulator .= "\n\n"}else {$line =~ s/^\s*//;$accumulator .= " $line"}}else {$output .= $line ."\n"}}return$output}sub init {}sub wants_input {return 1}sub does_record_output {return 1}sub finish {my$this=shift;$this->stream_done();$this->{'NEXT'}->finish()}{my$filename;sub get_current_filename {return$filename || 'NONE'}sub set_current_filename {my$name=shift;$filename=$name}}sub subclass_should_implement {my$this=shift;croak "Subclass should implement: " .ref($this)}sub stream_done {}sub push_record {my ($this,$record)=@_;if ($this->{'FILENAME_KEY'}){${$record->guess_key_from_spec($this->{'FILENAME_KEY'})}=get_current_filename()}return$this->{'NEXT'}->accept_record($record)}sub push_line {my ($this,$line)=@_;$this->{'NEXT'}->accept_line($line)}sub load_operation {my$script=shift;my$operation=$script;die "Script not named recs-*: $script" unless ($script =~ s/^recs-//);my@modules=("App::RecordStream::Operation::$script");App::RecordStream::Site::bootstrap();my@sites=sort {$a->{'priority'}<=> $b->{'priority'}}App::RecordStream::Site::list_sites();for my$site (@sites){unshift@modules,$site->{'path'}."::Operation::$script"}my$module;my@errors;for my$try_module (@modules){eval "require $try_module";if($@){if ($@ =~ m/^Please install missing/){warn $@;exit 1}push@errors,"Could not load $try_module: $@"}else {$module=$try_module;last}}if(!$module){die "Could not find operation $script:\n" .join("",@errors)}return$module}sub is_recs_operation {my$script=shift;if ($script =~ m/^recs-/){eval {load_operation($script)};return 0 if ($@);return 1}return 0}sub create_operation {my$script=shift;my$args=shift;my$next=shift || App::RecordStream::Stream::Printer->new();my$module=load_operation($script);my$op;eval {$op=$module->new($args,$next)};if ($@ || $op->get_wants_help()){if (!$op){$op=bless {},$module;$op->init_help()}$op->print_usage($@);exit 1}return$op}sub basic_help {my$this=shift;$this->print_usage($@)}sub all_help {my$this=shift;for my$type (sort keys %{$this->{'HELP_TYPES'}}){my$info=$this->{'HELP_TYPES'}->{$type};next if ($info->{'SKIP_IN_ALL'});next if (!$info->{'USE'});print "Help from: --help-$type:\n";$info->{'CODE'}->($this);print "\n"}}sub keys_help {my$this=shift;$this->keyspecs_help();print "\n";$this->keygroups_help()}sub snippet_help {my$this=shift;print$this->format_usage(App::RecordStream::Executor::usage())}sub keyspecs_help {my$this=shift;print$this->format_usage(App::RecordStream::KeySpec::keyspec_help())}sub keygroups_help {my$this=shift;print$this->format_usage(App::RecordStream::KeyGroups::usage())}sub domainlanguage_help {my$this=shift;print$this->format_usage(App::RecordStream::DomainLanguage::usage())}sub clumping_help {my$this=shift;print$this->format_usage(App::RecordStream::Clumper::usage())}sub main {my$command=shift || $Script;$|=1;if ($command eq 'recs-operation'){print <wants_input()){@ARGV=@args;while(my$line=<>){chomp$line;App::RecordStream::Operation::set_current_filename($ARGV);if (!$op->accept_line($line)){last}}}$op->finish();exit$op->get_exit_value()}1; WARNING! recs-operation invoked directly! recs-operation is a wrapper for all other recs commands. You do not want to use this script. It uses the App::RecordStream::Operation::* modules to performation operations, like recs-grep. If you are looking for implementation of those scripts, look in those modules. Otherwise, use a different recs script like recs-grep or recs-collate directly. Terminating program. MESSAGE APP_RECORDSTREAM_OPERATION $fatpacked{"App/RecordStream/Operation/annotate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_ANNOTATE'; package App::RecordStream::Operation::annotate;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);use App::RecordStream::Executor::Getopt;use App::RecordStream::Executor;use App::RecordStream::Record;sub init {my$this=shift;my$args=shift;my$key_groups=App::RecordStream::KeyGroups->new();my$executor_options=App::RecordStream::Executor::Getopt->new();my$spec={$executor_options->arguments(),'keys|k=s'=>sub {$key_groups->add_groups($_[1])},};$this->parse_options($args,$spec,['bundling']);my$expression=$executor_options->get_string($args);my$executor=App::RecordStream::Executor->new(<<" CODE");if (!$key_groups->has_any_group()){die "Must specify at least one --key, maybe you want recs-xform instead?\n"}$this->{'EXECUTOR'}=$executor;$this->{'KEYGROUP'}=$key_groups;$this->{'ANNOTATIONS'}={}}sub accept_record {my$this=shift;my$record=shift;my$specs=$this->{'KEYGROUP'}->get_keyspecs_for_record($record);my@values;for my$key (sort @$specs){my$value=${$record->guess_key_from_spec($key)};push@values,$value}my$synthetic_key=join(chr(30),@values);if (exists$this->{'ANNOTATIONS'}->{$synthetic_key}){$this->apply_annotation($synthetic_key,$record);$this->push_record($record);return 1}my$executor=$this->{'EXECUTOR'};my$store={};my$hash=create_recorder({$record->as_hash()},$store);my$new_record=App::RecordStream::Record->new($hash);my$returned_record=$executor->execute_code($new_record);$this->{'ANNOTATIONS'}->{$synthetic_key}=$store;$this->push_record($returned_record);return 1}sub apply_annotation {my$this=shift;my$annotation_key=shift;my$record=shift;my$stores=$this->{'ANNOTATIONS'}->{$annotation_key};for my$keyspec (keys %$stores){my$value=$stores->{$keyspec};${$record->guess_key_from_spec($keyspec)}=$value}}sub add_help_types {my$this=shift;$this->use_help_type('snippet');$this->use_help_type('keyspecs');$this->use_help_type('keygroups');$this->use_help_type('keys')}sub usage {my$this=shift;my$options=[App::RecordStream::Executor::options_help(),['keys','Keys to match records by, maybe specified multiple times, may be a keygroup or keyspec'],];my$args_string=$this->options_string($options);return <new($current_keyspec,$store);my$spec='';if (defined$current_keyspec){$spec=$current_keyspec .'/'}if (ref($data)eq 'HASH'){my%new_hash;for my$key (keys %$data){my$value=$data->{$key};my$new_value=$value;if (ref($value)eq 'HASH' || ref($value)eq 'ARRAY'){my$new_data=create_recorder($value,$store,$spec .$key);$new_value=$new_data}$new_hash{$key}=$new_value}my%hash;my$recorder=tie%hash,'RecordingHash',\%new_hash,$recorder;return \%hash}else {my@new_array;my$index=0;for my$value (@$data){my$new_value=$value;if (ref($value)eq 'HASH' || ref($value)eq 'ARRAY'){my$new_data=create_recorder($value,$store,$spec .'#' .$index);$new_value=$new_data}push@new_array,$new_value;$index++}my@array;my$recorder=tie@array,'RecordingArray',\@new_array,$recorder;return \@array}}1;package KeyspecRecorder;sub new {my$class=shift;my$current_keyspec=shift;my$store=shift;my$this=bless {KEYSPEC=>$current_keyspec,STORES=>$store,},$class;return$this}sub get_stores {my$this=shift;return$this->{'STORES'}}sub get_keyspec {my$this=shift;return$this->{'KEYSPEC'}}sub add_store {my$this=shift;my$sub_spec=shift;my$value=shift;my$spec=$sub_spec;if (defined$this->get_keyspec()){$spec=$this->get_keyspec().'/' .$sub_spec}$this->get_stores()->{$spec}=$value}1;package RecordingHash;use Tie::Hash;use base qw(Tie::ExtraHash);sub TIEHASH {my$class=shift;my$hash=shift;my$recorder=shift;my$this=bless [$hash,$recorder ],$class;return$this}sub STORE {my ($this,$key,$value)=@_;my ($hash,$data)=@$this;$hash->{$key}=$value;$this->get_recorder()->add_store($key,$value)}sub get_recorder {return $_[0]->[1]}package RecordingArray;use Tie::Array;use base qw(Tie::Array);sub TIEARRAY {my$class=shift;my$array=shift;my$recorder=shift;my$this=bless [$array,$recorder ],$class;return$this}sub STORE {my ($this,$index,$value)=@_;my ($array,$recorder)=@$this;$array->[$index]=$value;$this->get_recorder()->add_store('#' .$index,$value)}sub PUSH {my ($this,@new_items)=@_;my ($array,$recorder)=@$this;my$start_index=scalar @$array;push @$array,@new_items;my$num_to_push=scalar@new_items;my$item_index=0;for my$index ($start_index..($start_index+$num_to_push-1)){$recorder->add_store('#' .$index,$new_items[$item_index]);$item_index++}}sub FETCHSIZE {scalar @{$_[0]->[0]}}sub STORESIZE {$#{$_[0]->[0]}=$_[1]-1}sub FETCH {$_[0]->[0]->[$_[1]]}sub CLEAR {@{$_[0]->[0]}=()}sub POP {pop(@{$_[0]->[0]})}sub SHIFT {shift(@{$_[0]->[0]})}sub EXISTS {exists $_[0]->[0]->[$_[1]]}sub DELETE {delete $_[0]->[0]->[$_[1]]}sub UNSHIFT {die "UNSHIFT Unsupported in annotate, consider using xform"}sub SPLICE {die "SPLICE Unsupported in annotate, consider using xform"}sub get_recorder {return $_[0][1]}1; $expression ; # Safe from a trailing comment in \$expression \$r CODE Usage: recs-annotate [] __FORMAT_TEXT__ is evaluated as perl on each record of input (or records from ) with \$r set to a App::RecordStream::Record object and \$line set to the current line number (starting at 1). Records are analyzed for changes, those changes are applied to each successive record that matches --keys Only use this script if you have --keys fields that are repeated, otherwise recs-xform will be faster __FORMAT_TEXT__ IMPORTANT SNIPPET NOTE __FORMAT_TEXT__ Because of the way annotations are recorded, you cannot use UNSHIFT or SPLICE on array refs that already exist in the record you are modifiying. Additionally, deletes, removes, unshifts, and other 'removing' operations will not apply to later records. If you need this behavior, consider using recs-xform __FORMAT_TEXT__ $args_string Examples: # Annotate records with IPs with hostnames, only doing lookup once ... | recs-annotate --key ip '{{hostname}} = `host {{ip}}`' # Record md5sums of files ... | recs-annotate --key filename '{{md5}} = `md5sum {{filename}}`' # Add url contents to records ... | recs-annotate --key url '{{contents}} = `curl {{url}}`' USAGE APP_RECORDSTREAM_OPERATION_ANNOTATE $fatpacked{"App/RecordStream/Operation/assert.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_ASSERT'; package App::RecordStream::Operation::assert;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Executor::Getopt;use App::RecordStream::Executor;use Data::Dumper ();sub init {my$this=shift;my$args=shift;$this->{DIAGNOSTIC}='';$this->{VERBOSE}=0;my$executor_options=App::RecordStream::Executor::Getopt->new();my$spec={'diagnostic|d=s'=>\$this->{DIAGNOSTIC},'verbose|v'=>\$this->{VERBOSE},$executor_options->arguments(),};$this->parse_options($args,$spec,['bundling']);my$expression=$executor_options->get_string($args);my$executor=App::RecordStream::Executor->new($expression);$this->{'ASSERTION'}=$expression;$this->{'EXECUTOR'}=$executor}sub accept_record {my$this=shift;my$record=shift;unless ($this->{'EXECUTOR'}->execute_code($record)){die "Assertion failed! $this->{DIAGNOSTIC}\n","Expression: « $this->{ASSERTION} »\n","Filename: ",$this->get_current_filename,"\n","Line: $.\n",($this->{VERBOSE}? ("Record: ",Data::Dumper->Dump([$record->as_hashref],['r']),"\n"): ())}$this->push_record($record);return 1}sub add_help_types {my$this=shift;$this->use_help_type('snippet')}sub usage {my$this=shift;my$options=[['diagnostic|-d '=>'Include the diagnostic string in any failed assertion errors'],['verbose|-v'=>'Verbose output for failed assertions; dumps the current record'],App::RecordStream::Executor::options_help(),];my$args_string=$this->options_string($options);my$usage=< [] __FORMAT_TEXT__ Asserts that every record in the stream must pass the given . is evaluated as Perl on each record of input (or records from ) with \$r set to a App::RecordStream::Record object and \$line set to the current line number (starting at 1). If does not evaluate to true, processing is immediately aborted and an error message printed. See --help-snippets for more information on code snippets. __FORMAT_TEXT__ $args_string Examples: Require each record to have a "date" field. recs-assert '\$r->{date}' USAGE APP_RECORDSTREAM_OPERATION_ASSERT $fatpacked{"App/RecordStream/Operation/chain.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_CHAIN'; package App::RecordStream::Operation::chain;our$VERSION="4.0.25";use strict;use warnings;use App::RecordStream::Operation;use App::RecordStream::Stream::Printer;use base qw(App::RecordStream::Operation);sub init {my$this=shift;my$args=shift;my ($show_chain,$dry_run);my$spec={'show-chain'=>\$show_chain,'n'=>sub {$show_chain=1;$dry_run=1},};$this->parse_options($args,$spec,['require_order']);return unless (@$args);unless (App::RecordStream::Operation::is_recs_operation($args->[0])){die "First chained command must be standard recs command not shell command!\n"}$this->{'SAVED_ARGS'}=[@$args]if ($show_chain);if ($dry_run){$this->{'DRY_RUN'}=$dry_run;return}my$operations=$this->create_operations($args);my ($first_operation,$last_operation,$continuation_pid)=$this->setup_operations($operations);$this->{'CHAIN_HEAD'}=$first_operation;$this->{'CHAIN_TAIL'}=$last_operation;$this->{'CONTINUATION_PID'}=$continuation_pid}sub print_chain {my$this=shift;my$args=shift;my@current_command;my$was_shell=0;$this->push_line("Chain Starts with:");my$indent=1;my$last;for my$arg (@$args){if ($arg eq '|'){$was_shell=$this->print_command(\@current_command,$last,\$indent);$last=[@current_command];@current_command=();next}push@current_command,$arg}$this->print_command(\@current_command,$last,\$indent)}sub print_command {my$this=shift;my$current_command=shift;my$last=shift;my$indent=shift;my$message='';if (defined$last){if (App::RecordStream::Operation::is_recs_operation($last->[0])&& App::RecordStream::Operation::is_recs_operation($current_command->[0])){$message .= "Passed in memory to "}else {$message .= "Passed through a pipe to ";$$indent++}}my$prefix=' ' x $$indent .$message;if (App::RecordStream::Operation::is_recs_operation($current_command->[0])){$this->push_line($prefix ."Recs command: " .join(' ',@$current_command));return 0}else {$this->push_line($prefix ."Shell command: " .join(' ',@$current_command));return 1}}sub setup_operations {my$this=shift;my$operations=shift;$operations=[@$operations];my ($first_operation,$last_operation,$continuation_pid);while (my$operation=shift @$operations){if ($operation->[0]eq 'SHELL'){my$in_continuation;($in_continuation,$continuation_pid)=$this->setup_fork($operation->[1]);if ($in_continuation){$first_operation=undef;$last_operation=undef;$continuation_pid=undef;next}else {last}}elsif ($operation->[0]eq 'RECS'){}else {die}$first_operation ||= $operation;$last_operation=$operation}return ($first_operation,$last_operation,$continuation_pid)}sub create_operations {my$this=shift;my$args=shift;my@single_command;my@operations;for my$arg (@$args){if ($arg eq '|'){$this->add_operation(\@single_command,\@operations);@single_command=();next}push@single_command,$arg}$this->add_operation(\@single_command,\@operations);return \@operations}sub add_operation {my$this=shift;my$single_command=shift;my$operations=shift;my$idx=@$operations;my$push_shim=App::RecordStream::Operation::chain::PushShim->new($operations,$idx);if (App::RecordStream::Operation::is_recs_operation($single_command->[0])){my ($sc1,@args)=@$single_command;my$operation=App::RecordStream::Operation::create_operation($sc1,\@args,$push_shim);push @$operations,['RECS',$operation,\@args]}else {push @$operations,['SHELL',[@$single_command]]}}sub setup_fork {my$this=shift;my$command_arguments=shift;my$continuation_pid=open(STDOUT,"|-");die "cannot fork: $!" unless defined$continuation_pid;if (!$continuation_pid){return 1}my$shell_pid=open(STDOUT,"|-");die "cannot fork: $!" unless defined$shell_pid;if (!$shell_pid){exec (@$command_arguments)}return (0,$continuation_pid)}sub wants_input {return 0}sub stream_done {my$this=shift;if (my$args=$this->{'SAVED_ARGS'}){$this->print_chain($args)}if ($this->{'DRY_RUN'}){return}my$head=$this->{'CHAIN_HEAD'};if ($head){my$head_operation=$head->[1];my$head_args=$head->[2];if ($head_operation->wants_input()){local@ARGV=@$head_args;while(my$line=<>){chomp$line;App::RecordStream::Operation::set_current_filename($ARGV);if (!$head_operation->accept_line($line)){last}}}$head_operation->finish()}else {while(<>){chomp;$this->push_line($_)}}close(STDOUT);my$continuation_pid=$this->{'CONTINUATION_PID'};if ($continuation_pid){waitpid$continuation_pid,0}else {}}sub get_exit_value {my$this=shift;if (my$tail=$this->{'CHAIN_TAIL'}){return$tail->[1]->get_exit_value()}return 0}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[['show-chain','Before running the commands, print out what will happen in the chain'],['n','Do not run commands, implies --show-chain'],];my$args_string=$this->options_string($options);return <$operations,IDX=>$idx,};bless$this,$class;return$this}sub accept_record {my$this=shift;my$record=shift;return$this->_get_delegate()->accept_record($record)}sub accept_line {my$this=shift;my$line=shift;return$this->_get_delegate()->accept_line($line)}sub finish {my$this=shift;$this->_get_delegate()->finish()}sub _get_delegate {my$this=shift;my$delegate=$this->{'DELEGATE'};if (!$delegate){my$operations=$this->{'OPERATIONS'};my$idx=$this->{'IDX'};if ($idx + 1 < @$operations){my$next_operation=$operations->[$idx + 1];if ($next_operation->[0]eq 'RECS'){$delegate=$next_operation->[1]}elsif ($next_operation->[0]eq 'SHELL'){$delegate=App::RecordStream::Stream::Printer->new()}else {die}}else {$delegate=App::RecordStream::Stream::Printer->new()}$this->{'DELEGATE'}=$delegate}return$delegate}1; Usage: recs-chain | | ... __FORMAT_TEXT__ Creates an in-memory chain of recs operations. This avoid serialization and deserialization of records at each step in a complex recs pipeline. For ease of use the chain of recs commands main contain non-recs command, anything that does not start with a recs- is interpreted as a shell command. That command is forked off to the shell. In this case, serialization and deserialization costs apply, but only to and from the shell command, everything else is done in memory. If you have many shell commands in a row, there is extra over head, you should instead consider splitting those into separate pipes. See the examples for more information on this. Arugments are specified in on the command line separated by pipes. For most shells, you will need to escape the pipe character to avoid having the shell interpret the pipe as a shell pipe. __FORMAT_TEXT__ $args_string Examples: Parse some fields, sort and collate, all in memory recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| recs-collate --a perc,90,data Use shell commands in your recs stream recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| grep foo \\| recs-collate --a perc,90,data Many shell commands should be split into real pipes recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-xform '\$r->{now} = time();' \ | grep foo | sort | uniq | recs-chain recs-collate --a perc,90,data \\| recs-totable USAGE APP_RECORDSTREAM_OPERATION_CHAIN $fatpacked{"App/RecordStream/Operation/collate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_COLLATE'; package App::RecordStream::Operation::collate;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Aggregator;use App::RecordStream::Clumper::Options;use App::RecordStream::DomainLanguage::Library;use App::RecordStream::DomainLanguage::Snippet;use App::RecordStream::DomainLanguage;use App::RecordStream::Operation::collate::BaseClumperCallback;use App::RecordStream::Operation;sub init {my$this=shift;my$args=shift;App::RecordStream::Aggregator->load_implementations();my$clumper_options=$this->{'CLUMPER_OPTIONS'}=App::RecordStream::Clumper::Options->new();my@aggregators;my%dlaggregators;my@mr_aggregators;my@ii_aggregators;my$incremental=0;my$bucket=1;my$list_aggregators=0;my$aggregator=0;my$spec={$clumper_options->main_options(),"aggregator|a=s"=>sub {push@aggregators,$_[1]},"dlaggregator|A=s"=>sub {build_dlaggregator(\%dlaggregators,$_[1])},"mr-agg=s{4}"=>\@mr_aggregators,"ii-agg=s{4}"=>\@ii_aggregators,"incremental|i"=>\$incremental,"bucket!"=>\$bucket,"list-aggregators"=>\$list_aggregators,"show-aggregator=s"=>\$aggregator,$clumper_options->help_options(),"list"=>\$list_aggregators,};$this->parse_options($args,$spec);if ($list_aggregators){die sub {print App::RecordStream::Aggregator->list_implementations()}}if ($aggregator){die sub {App::RecordStream::Aggregator->show_implementation($aggregator)}}my$aggregator_objects=App::RecordStream::Aggregator->make_aggregators(@aggregators);$aggregator_objects={%$aggregator_objects,%dlaggregators};for(my$i=0;$i < @mr_aggregators;1){my$name=$mr_aggregators[$i++];my$map_string=$mr_aggregators[$i++];my$reduce_string=$mr_aggregators[$i++];my$squish_string=$mr_aggregators[$i++];my$map_snippet=App::RecordStream::DomainLanguage::Snippet->new($map_string);my$reduce_snippet=App::RecordStream::DomainLanguage::Snippet->new($reduce_string);my$squish_snippet=App::RecordStream::DomainLanguage::Snippet->new($squish_string);$aggregator_objects->{$name}=App::RecordStream::DomainLanguage::Library::map_reduce_aggregator($map_snippet,$reduce_snippet,$squish_snippet)}for(my$i=0;$i < @ii_aggregators;1){my$name=$ii_aggregators[$i++];my$initial_string=$ii_aggregators[$i++];my$combine_string=$ii_aggregators[$i++];my$squish_string=$ii_aggregators[$i++];my$initial_snippet=App::RecordStream::DomainLanguage::Snippet->new($initial_string);my$combine_snippet=App::RecordStream::DomainLanguage::Snippet->new($combine_string);my$squish_snippet=App::RecordStream::DomainLanguage::Snippet->new($squish_string);$aggregator_objects->{$name}=App::RecordStream::DomainLanguage::Library::inject_into_aggregator($initial_snippet,$combine_snippet,$squish_snippet)}$clumper_options->check_options(App::RecordStream::Operation::collate::BaseClumperCallback->new($aggregator_objects,$incremental,$bucket,sub {$this->push_record($_[0])}))}sub build_dlaggregator {my$dlaggregators_ref=shift;my$string=shift;my$name;if($string =~ s/^([^=]*)=//){$name=$1}else {die "Bad domain language aggregator option (missing '=' to separate name and code): " .$string}$dlaggregators_ref->{$name}=App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('AGGREGATOR')}sub accept_record {my$this=shift;my$record=shift;$this->{'CLUMPER_OPTIONS'}->accept_record($record)}sub stream_done {my$this=shift;$this->{'CLUMPER_OPTIONS'}->stream_done()}sub print_usage {my$this=shift;my$message=shift;if ($message && UNIVERSAL::isa($message,'CODE')){$message->();exit 1}$this->SUPER::print_usage($message)}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs');$this->use_help_type('keygroups');$this->use_help_type('keys');$this->use_help_type('domainlanguage');$this->use_help_type('clumping');$this->add_help_type('aggregators',sub {print App::RecordStream::Aggregator->list_implementations()},'List the aggregators');$this->add_help_type('more',sub {$this->more_help()},'Larger help documentation')}sub usage {my$this=shift;my$options=[['dlaggregator|-A ...','Specify a domain language aggregate. See "Domain Language Integration" below.'],['aggregator|-a ','Colon separated list of aggregate field specifiers. See "Aggregates" section below.'],['mr-agg ','Specify a map reduce aggregator via 3 snippets, similar to mr_agg() from the domain language.'],['ii-agg ','Specify an inject into aggregator via 3 snippets, similar to ii_agg() from the domain language.'],['incremental','Output a record every time an input record is added to a clump (instead of every time a clump is flushed).'],['[no]-bucket','With --bucket outputs one record per clump, with --no-bucket outputs one record for each record that went into the clump.'],$this->{'CLUMPER_OPTIONS'}->main_usage(),['list-aggregators|--list','Bail and output a list of aggregators' ],['show-aggregator ','Bail and output this aggregator\'s detailed usage.'],$this->{'CLUMPER_OPTIONS'}->help_usage(),];my$args_string=$this->options_string($options);return <usage().<format_usage($usage)}1; Usage: recs-collate [] __FORMAT_TEXT__ Take records, grouped togther by --keys, and compute statistics (like average, count, sum, concat, etc) within those groups. For starting with collate, try doing single --key collates with some number of aggregators (list available in --list-agrregators) __FORMAT_TEXT__ Arguments: $args_string Examples: Count clumps of adjacent lines with matching x fields. recs-collate --adjacent --key x --aggregator count Count number of each x field value in the entire file. recs-collate --key x --aggregator count Finds the maximum latency for each date, hour pair recs-collate --key date,hour --aggregator worst_latency=max,latency Find the median value of x+y in records recs-collate --dlaggregator "m=perc(50,snip(<<{{x}}+{{y}}>>))" USAGE Aggregates: __FORMAT_TEXT__ Aggregates are specified as [=][,]. The default field name is aggregator and arguments joined by underscores. See --list-aggregators for a list of available aggregators. Fieldname maybe a key spec. (i.e. foo/bar=sum,field). Additionally, all key name arguments to aggregators maybe be key specs (i.e. foo=max,latency/url), but not key groups __FORMAT_TEXT__ Cubing: __FORMAT_TEXT__ Instead of added one entry for each input record, we add 2 ** (number of key fields), with every possible combination of fields replaced with the default of "ALL". This is not meant to be used with --adjacent or --size. If our key fields were x and y then we'd get output records for {x = 1, y = 2}, {x = 1, y = ALL}, {x = ALL, y = 2} and {x = ALL, y = ALL}. __FORMAT_TEXT__ Domain Lanuage Integration: USAGE __FORMAT_TEXT__ Either aggregates or keys may be specified using the recs domain language. Both --dlkey and --dlaggregator require an options of the format '='. --dlkey requires the code evaluate as a valuation, --dlaggregator requires the code evaluate as an aggregator. See --help-domainlanguage for a more complete description of its workings and a list of available functions. See the examples below for a more gentle introduction. __FORMAT_TEXT__ Examples: Count clumps of adjacent lines with matching x fields. recs-collate --adjacent --key x --aggregator count Count number of each x field in the entire file. recs-collate --key x --aggregator count Count number of each x field in the entire file, including an "ALL" line. recs-collate --key x --aggregator count --cube Produce a cummulative sum of field profit up to each date recs-collate --key date --adjacent --incremental --aggregator profit_to_date=sum,profit Produce record count for each date, hour pair recs-collate --key date,hour --aggregator count Finds the maximum latency for each date, hour pair recs-collate --key date,hour --aggregator worst_latency=max,latency Produce a list of hosts in each datacenter. recs-collate --key dc --dlaggregator "hosts=uconcat(', ', 'host')" Sum all time fields recs-collate --key ... --dlaggregator 'times=for_field(qr/^t/, <>)' Find the median value of x+y in records recs-collate --dlaggregator "m=perc(50,snip(<<{{x}}+{{y}}>>))" Count people by first three letters of their name recs-collate --dlkey "tla=<>" -a ct USAGE APP_RECORDSTREAM_OPERATION_COLLATE $fatpacked{"App/RecordStream/Operation/collate/BaseClumperCallback.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_COLLATE_BASECLUMPERCALLBACK'; package App::RecordStream::Operation::collate::BaseClumperCallback;use strict;use warnings;use App::RecordStream::Aggregator;use App::RecordStream::Record;sub new {my$class=shift;my$aggregators=shift;my$incremental=shift;my$bucket=shift;my$record_cb=shift;my$this={'AGGREGATORS'=>$aggregators,'INCREMENTAL'=>$incremental,'BUCKET'=>$bucket,'RECORD_CB'=>$record_cb,};bless$this,$class;return$this}sub clumper_callback_begin {my$this=shift;my$bucket=shift;return [$bucket,$this->{'BUCKET'}? undef : [],App::RecordStream::Aggregator::map_initial($this->{'AGGREGATORS'})]}sub clumper_callback_push_record {my$this=shift;my$cookie=shift;my$record=shift;push @{$cookie->[1]},$record if(!$this->{'BUCKET'});$cookie->[2]=App::RecordStream::Aggregator::map_combine($this->{'AGGREGATORS'},$cookie->[2],$record);if($this->{'INCREMENTAL'}){$this->clumper_callback_end($cookie)}}sub clumper_callback_end {my$this=shift;my$cookie=shift;for my$proto_result ($this->{'BUCKET'}? ($cookie->[0]): @{$cookie->[1]}){my$result={%$proto_result,%{App::RecordStream::Aggregator::map_squish($this->{'AGGREGATORS'},$cookie->[2])},};my$record=App::RecordStream::Record->new();for my$key (keys(%$result)){my$value=$result->{$key};${$record->guess_key_from_spec($key)}=$value}$this->{'RECORD_CB'}->($record)}}1; APP_RECORDSTREAM_OPERATION_COLLATE_BASECLUMPERCALLBACK $fatpacked{"App/RecordStream/Operation/decollate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_DECOLLATE'; package App::RecordStream::Operation::decollate;use strict;use warnings;use App::RecordStream::Deaggregator;use App::RecordStream::DomainLanguage::Library;use App::RecordStream::DomainLanguage;use App::RecordStream::Operation;use App::RecordStream::Record;use base 'App::RecordStream::Operation';sub init {my$this=shift;my$args=shift;App::RecordStream::Deaggregator->load_implementations();my@deaggregators;my@dldeaggregators;my$list_deaggregators=0;my$deaggregator=0;my$spec={"deaggregator|d=s"=>sub {push@deaggregators,split(/:/,$_[1])},"dldeaggregator=s"=>sub {push@dldeaggregators,build_dldeaggregator($_[1])},"list-deaggregators"=>\$list_deaggregators,"show-deaggregator=s"=>\$deaggregator,};$this->parse_options($args,$spec);if($list_deaggregators){die sub {print App::RecordStream::Deaggregator->list_implementations()}}if($deaggregator){die sub {App::RecordStream::Deaggregator->show_implementation($deaggregator)}}my@deaggregator_objects;for my$spec (@deaggregators){push@deaggregator_objects,App::RecordStream::Deaggregator->make_deaggregator($spec)}@deaggregator_objects=(@deaggregator_objects,@dldeaggregators);$this->{'DEAGGREGATORS'}=\@deaggregator_objects}sub build_dldeaggregator {my$string=shift;return App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('DEAGGREGATOR')}sub accept_record {my$this=shift;my$record=shift;$this->accept_record_aux(0,$record);return 1}sub accept_record_aux {my$this=shift;my$depth=shift;my$record=shift;if($depth < @{$this->{'DEAGGREGATORS'}}){my$deaggregator=$this->{'DEAGGREGATORS'}->[$depth];for my$deaggregated_record (@{$deaggregator->deaggregate($record)}){$this->accept_record_aux($depth + 1,App::RecordStream::Record->new({%$record,%$deaggregated_record}))}}else {$this->push_record($record)}}sub print_usage {my$this=shift;my$message=shift;if($message && UNIVERSAL::isa($message,'CODE')){$message->();exit(1)}$this->SUPER::print_usage($message)}sub add_help_types {my$this=shift;$this->use_help_type('domainlanguage');$this->add_help_type('deaggregators',sub {print App::RecordStream::Deaggregator->list_implementations()},'List the deaggregators')}sub usage {my$this=shift;my$options=[['dldeaggregator ...','Specify a domain language aggregate. See "Domain Language Integration" below.'],['deaggregator|-d ','Colon separated list of aggregate field specifiers. See "Deaggregates" section below.'],['list-deaggregators','Bail and output a list of deaggregators.'],['show-deaggregator ','Bail and output this deaggregator\'s detailed usage.'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Decollate records of input (or records from ) into output records. __FORMAT_TEXT__ Arguments: $args_string Deaggregates: __FORMAT_TEXT__ Deaggregates are specified as [,]. See --list-deaggregators for a list of available deaggregators. In general, key name arguments to deaggregators may be key specs, but not key groups __FORMAT_TEXT__ Domain Lanuage Integration: USAGE __FORMAT_TEXT__ Deaggregates may be specified using the recs domain language. --dldeaggregator requires the code evaluate as a deaggregator. See --help-domainlanguage for a more complete description of its workings and a list of available functions. See the examples below for a more gentle introduction. __FORMAT_TEXT__ Examples: Split the "hosts" field into individual "host" fields recs-decollate --dldeaggregator '_split(hosts,qr/, */,host)' USAGE APP_RECORDSTREAM_OPERATION_DECOLLATE $fatpacked{"App/RecordStream/Operation/delta.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_DELTA'; package App::RecordStream::Operation::delta;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);sub init {my$this=shift;my$args=shift;my$key_groups=App::RecordStream::KeyGroups->new();my$spec={"key|k=s"=>sub {$key_groups->add_groups($_[1])},};$this->parse_options($args,$spec);die "Must specify --key\n" unless$key_groups->has_any_group();$this->{'KEY_GROUPS'}=$key_groups}sub accept_record {my$this=shift;my$record=shift;my$last_record=$this->{'LAST_RECORD'};if ($last_record){for my$key (@{$this->{'KEY_GROUPS'}->get_keyspecs($last_record)}){if (${$record->guess_key_from_spec($key)}and ${$last_record->guess_key_from_spec($key)}){${$last_record->guess_key_from_spec($key)}=${$record->guess_key_from_spec($key)}- ${$last_record->guess_key_from_spec($key)}}else {${$last_record->guess_key_from_spec($key)}=undef}}$this->push_record($last_record)}$this->{'LAST_RECORD'}=$record;return 1}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs');$this->use_help_type('keygroups');$this->use_help_type('keys')}sub usage {my$this=shift;my$options=[['key|-k ','Comma separated list of the fields that should be transformed. Fields not in this list will be passed through unchanged, using the *first* record of each delta pair. This may be a keyspec or a keygroup, see "--help-keyspecs" for more information'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Transforms absolute values into deltas between adjacent records. __FORMAT_TEXT__ Arguments: $args_string Examples: Transforms a cumulative counter of errors into a count of errors per record. recs-delta --key=errors USAGE APP_RECORDSTREAM_OPERATION_DELTA $fatpacked{"App/RecordStream/Operation/eval.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_EVAL'; package App::RecordStream::Operation::eval;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Executor::Getopt;use App::RecordStream::Executor;sub init {my$this=shift;my$args=shift;my$chomp=0;my$executor_options=App::RecordStream::Executor::Getopt->new();my$spec={'chomp'=>\$chomp,$executor_options->arguments(),};$this->parse_options($args,$spec,['bundling']);my$expression=$executor_options->get_string($args);my$executor=App::RecordStream::Executor->new($expression);$this->{'EXECUTOR'}=$executor;$this->{'CHOMP'}=$chomp}sub accept_record {my$this=shift;my$record=shift;my$executor=$this->{'EXECUTOR'};my$value=$executor->execute_code($record);chomp$value if($this->{'CHOMP'});$this->push_line($value);return 1}sub add_help_types {my$this=shift;$this->use_help_type('snippet');$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[App::RecordStream::Executor::options_help(),['chomp','Chomp eval results (to avoid duplicate newlines when already newline-terminated)'],];my$args_string=$this->options_string($options);my$usage=< [] __FORMAT_TEXT__ is evaluated as perl on each record of input (or records from ) with \$r set to a App::RecordStream::Record object and \$line set to the current line number (starting at 1). The result of each evaluation is printed on a line by itself (this is not a recs stream). See App::RecordStream::Record for help on what the \$r object can do. See --help-snippets for more information on code snippets __FORMAT_TEXT__ $args_string Examples: Print the host field from each record. recs-eval '\$r->{host}' Prepare to gnuplot field y against field x. recs-eval '\$r->{x} . " " . \$r->{y}' Set up a script (this would be presumably piped to sh) recs-eval '"./myscript --value \$r->{foo}"' USAGE APP_RECORDSTREAM_OPERATION_EVAL $fatpacked{"App/RecordStream/Operation/examples.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_EXAMPLES'; use strict;use warnings;package App::RecordStream::Operation::examples;use base qw(App::RecordStream::Operation::help::FromManual);1; APP_RECORDSTREAM_OPERATION_EXAMPLES $fatpacked{"App/RecordStream/Operation/flatten.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FLATTEN'; package App::RecordStream::Operation::flatten;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);my$INVALID_REF_TYPES=[qw(SCALAR ARRAY CODE REF GLOB LVALUE FORMAT IO VSTRING Regexp)];sub init {my$this=shift;my$args=shift;my@fields;my$default_depth=1;my$separator='-';my$add_field=sub {my ($depth,$field_names)=@_;my$key_groups=App::RecordStream::KeyGroups->new();$key_groups->add_groups($field_names);push@fields,[$depth,$key_groups]};my$spec={(map {($_ ."=s")=>$add_field}(1..9)),"depth=i"=>\$default_depth,"key|k|field|f=s"=>sub {$add_field->($default_depth,$_[1])},"deep=s"=>sub {$add_field->(-1,$_[1])},"separator=s"=>\$separator,};$this->parse_options($args,$spec);$this->{'FIELDS'}=\@fields;$this->{'SEPARATOR'}=$separator;$this->{'DEFAULT_DEPTH'}=$default_depth}sub accept_record {my$this=shift;my$record=shift;my$fields=$this->{'FIELDS'};my$separator=$this->{'SEPARATOR'};for my$pair (@$fields){my ($depth,$key_groups)=@$pair;for my$spec (@{$key_groups->get_keyspecs($record)}){eval {my$value=$this->remove_spec($record,$spec);$this->split_field($record,$spec,$depth,$value)};if ($@ =~ m/Cannot flatten into/){warn $@;undef $@;next}elsif ($@){die $@}}}$this->push_record($record);return 1}sub remove_spec {my ($this,$record,$spec)=@_;my$key_list=$record->get_key_list_for_spec($spec);my$last_key=pop @$key_list;my$new_spec=join('/',@$key_list);my$data=$record;if ($new_spec){$data=${$record->guess_key_from_spec($new_spec,1)}}my$ref_type=ref($data);if (!grep {$_ eq $ref_type}@$INVALID_REF_TYPES){return delete$data->{$last_key}}else {die "Cannot flatten into ref type: '$ref_type', must be a hash! skipping spec $spec!\n"}}sub split_field {my ($this,$record,$name,$depth,$value)=@_;my$separator=$this->{'SEPARATOR'};if($depth!=0 && ref($value)eq "ARRAY"){for(my$i=0;$i < @$value;++$i){$this->split_field($record,$name .$separator .$i,$depth - 1,$value->[$i])}return}if($depth!=0 && ref($value)eq "HASH"){for my$key (keys(%$value)){$this->split_field($record,$name .$separator .$key,$depth - 1,$value->{$key})}return}${$record->guess_key_from_spec($name)}=$value}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs');$this->use_help_type('keygroups');$this->use_help_type('keys')}sub usage {my$this=shift;my$options=[[' ','For this comma-separated list of fields flatten to depth n (1-9).'],['depth ','Change the default depth, negative being arbitrary depth (defaults to 1).'],['key ','For this comma-separated list of fields flatten to the default depth (may NOT be a a key spec).'],['deep ','For this comma-separated list of fields flatten to arbitrary depth.'],['separator ','Use this string to separate joined field names (defaults to "-").'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Flatten nested structures in records. NOTE: This script implements a strategy for dealing with nested structures that is almost always better handled by using keyspecs or keygroups. It should, in general, be as easy or easier to use those concepts with the data manipulations you actually want to accomplish. __FORMAT_TEXT__ Arguments: $args_string __FORMAT_TEXT__ All field values may be keyspecs or keygroups, value of keyspec must not be an array element __FORMAT_TEXT__ Examples: Under recs-flatten -1 field We see {"field" => "value"} becomes {"field" => "value"} {"field" => {"subfield" => "value"}} becomes {"field-subfield" => "value"} {"field" => ["value1", "value2"]} becomes {"field-0" => "value1", "field-1" => "value2"} {"field" => {"subfield" => [0, 1]}} becomes {"field-subfield" => [0, 1]}} Under recs-flatten --deep x We see {"x" => {"y" => [{"z" = "v"}]}} becomes {"x-y-0-z" => "v"} USAGE APP_RECORDSTREAM_OPERATION_FLATTEN $fatpacked{"App/RecordStream/Operation/fromapache.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMAPACHE'; package App::RecordStream::Operation::fromapache;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Record;use App::RecordStream::OptionalRequire qw(Apache::Log::Parser);App::RecordStream::OptionalRequire::require_done();sub init {my$this=shift;my$args=shift;my$fast;my$strict;my$verbose;my$woothee;my$spec={"fast:s"=>\$fast,"strict:s"=>\$strict,"verbose"=>\$verbose,"woothee"=>\$woothee,};$this->parse_options($args,$spec);my%opts;if (defined$fast){if ($fast eq ''){$opts{fast}=1}else {$opts{fast}=eval$fast;die "eval of option fast failed. $@" if $@}}if (defined$strict){if ($strict eq ''){$opts{strict}=1}else {$opts{strict}=eval$strict;die "eval of option strict failed. $@" if $@}}unless ($opts{fast}or $opts{strict}){$opts{fast}=1}if ($verbose){$opts{verbose}=1}if ($woothee){App::RecordStream::OptionalRequire::optional_use("Woothee");App::RecordStream::OptionalRequire::require_done();$this->{'WOOTHEE'}=1}$this->{'PARSER'}=Apache::Log::Parser->new(%opts)}sub accept_line {my$this=shift;my$line=shift;my$parser=$this->{'PARSER'};if (my$hash=$parser->parse($line)){my$record=App::RecordStream::Record->new($hash);$record->{woothee}=Woothee->parse($record->{agent})if$this->{'WOOTHEE'};$this->push_record($record)}return 1}sub usage {my$this=shift;my$options=[['fast',q{'fast' parser works relatively fast. It can process only 'common', 'combined' and custom styles with compatibility with 'common', and cannot work with backslash-quoted double-quotes in fields. (This is the default)} ],['strict',q{'strict' parser works relatively slow. It can process any style format logs, with specification about separator, and checker for perfection. It can also process backslash-quoted double-quotes properly.} ],['verbose',q{Verbose output.} ],['woothee',q{Each agent field of records is parse by Woothee to produce woothee field.} ],];my$args_string=$this->options_string($options);return < __FORMAT_TEXT__ Each line of input (or lines of ) is parse by Apache::Log::Parser to produce an output record. __FORMAT_TEXT__ Arguments: $args_string Examples: Get records from typical apache log recs-fromapache < /var/log/httpd-access.log A more detailed how to use (See perldoc Apache::Log::Parser) recs-fromapache --strict '[qw(combined common vhost_common)]' < /var/log/httpd-access.log Get records except access of crawler recs-fromapache --woothee < /var/log/httpd-access.log | recs-grep '\$r->{woothee}{category} ne "crawler"' USAGE APP_RECORDSTREAM_OPERATION_FROMAPACHE $fatpacked{"App/RecordStream/Operation/fromatomfeed.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMATOMFEED'; package App::RecordStream::Operation::fromatomfeed;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Record;use App::RecordStream::OptionalRequire 'LWP::UserAgent';use App::RecordStream::OptionalRequire 'XML::Twig';BEGIN {App::RecordStream::OptionalRequire::require_done()}sub init {my$this=shift;my$args=shift;my$follow=1;my$max=undef;my%options=("follow!"=>\$follow,'max=s'=>\$max,);$this->parse_options($args,\%options);$this->{'COUNT'}=0;$this->{'FOLLOW'}=$follow;$this->{'MAX'}=$max;$this->{'URLS'}=$args}sub wants_input {return 0}sub stream_done {my ($this)=@_;my$ua=$this->make_user_agent();my$request=HTTP::Request->new();$request->method('GET');my$twig_roots={'/*/entry'=>sub {$this->handle_entry_elem(@_)}};if ($this->{'FOLLOW'}){$twig_roots->{'/*/link[ @rel="next" and @href ]' }=sub {$this->handle_link_elem(@_)}}my$twig=XML::Twig->new(twig_roots=>$twig_roots);while (my$url=shift @{$this->{'URLS'}}){$this->update_current_filename($url);$request->uri($url);my$response=$ua->request($request);if (!$response->is_success){warn "# $0 GET $url failed: " .$response->message;$this->_set_exit_value(1);next}$twig->parse($response->content)}}sub handle_entry_elem {my ($this,$twig,$entry_elem)=@_;$this->{'COUNT'}++;my$record=App::RecordStream::Record->new($entry_elem->simplify);$this->push_record($record);if (defined$this->{'MAX'}&& $this->{'COUNT'}>= $this->{'MAX'}){$this->{'URLS'}=[];$twig->finish_now}$twig->purge}sub handle_link_elem {my ($this,$twig,$link_elem)=@_;unshift @{$this->{'URLS'}},$link_elem->att('href');$twig->purge}sub make_user_agent {return LWP::UserAgent->new()}sub usage {my$this=shift;my$options=[['[no]follow','Follow atom feed next links (or not). Defaults on.'],['max=','Print at most entries and then exit.'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Produce records from atom feed entries. Recs from atom feed will get entries from paginated atom feeds and create a record stream from the results. The keys of the record will be the fields in the atom field entry. Recs from atom feed will follow the 'next' link in a feed to retrieve all entries. __FORMAT_TEXT__ Arguments: $args_string Examples: Dump an entire feed recs-fromatomfeed "http://my.xml.com" Dumps just the first page of entries recs-fromatomfeed --nofollow "http://my.xml.com" Dumps just the first 10 entries recs-fromatomfeed --max 10 "http://my.xml.com" USAGE APP_RECORDSTREAM_OPERATION_FROMATOMFEED $fatpacked{"App/RecordStream/Operation/fromcsv.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMCSV'; package App::RecordStream::Operation::fromcsv;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);use Text::CSV;sub init {my$this=shift;my$args=shift;my@fields;my$header_line=undef;my$strict=0;my$delim=',';my$escape='"';my$quote='"';my$spec={"keys|k|field|f=s"=>sub {push@fields,split(/,/,$_[1])},"header"=>\$header_line,"strict"=>\$strict,"delim|d=s"=>\$delim,"escape=s"=>\$escape,"quote=s"=>\$quote,};$this->parse_options($args,$spec);die "Delimiter must be a single character\n\n" unless length$delim==1;my$csv_args={binary=>1,eol=>$/,sep_char=>$delim,escape_char=>$escape,quote_char=>($quote eq '' ? undef : $quote),};if (!$strict){$csv_args->{'allow_whitespace'}=1;$csv_args->{'allow_loose_quotes'}=1;$csv_args->{'allow_loose_escapes'}=1}$this->{'FIELDS'}=\@fields;$this->{'HEADER_LINE'}=$header_line;$this->{'PARSER'}=new Text::CSV($csv_args);$this->{'EXTRA_ARGS'}=$args}sub wants_input {return 0}sub stream_done {my$this=shift;my$files=$this->{'EXTRA_ARGS'};if (scalar @$files > 0){for my$file (@$files){$this->update_current_filename($file);open(my$fh,'<',$file)or die "Could not open file: $!\n";$this->get_records_from_handle($fh);close$fh}}else {$this->get_records_from_handle(\*STDIN)}}sub get_records_from_handle {my ($this,$handle)=@_;my$parser=$this->{'PARSER'};my$do_headers=$this->{'HEADER_LINE'};my@fields=@{$this->{'FIELDS'}};while(my$row=$parser->getline($handle)){if ($do_headers){push@fields,@$row;$do_headers=0;next}my@values=@$row;my$record=App::RecordStream::Record->new();for(my$i=0;$i < @values;++$i){my$key=$fields[$i]|| $i;${$record->guess_key_from_spec($key)}=$values[$i]}$this->push_record($record)}my ($code,$msg,$pos)=$parser->error_diag;unless ($parser->eof and ($code==0 or $code==2012)){my ($line,$file)=($.,$this->get_current_filename);die "fromcsv: parse error: $msg ($code)",", roughly at position $pos, line $line, file $file\n"}}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[['key|k ','Comma separated list of field names. May be specified multiple times, may be key specs' ],['header','Take field names from the first line of input' ],['strict','Do not trim whitespaces, allow loose quoting (quotes inside quotes), or allow the use of escape characters when not strictly needed. (not recommended, for most cases, though may help with parsing quoted fields containing newlines)' ],['delim|-d ',"Field delimiter to use when reading input lines (default ',')."],['escape ',"Escape character used in quoted fields (default '\x22')."],['quote ',"Quote character used in quoted fields (default '\x22'). Use the empty string to indicate no quoted fields."],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Each line of input (or lines of ) is split on commas to produce an output record. Fields are named numerically (0, 1, etc.), or as given by --field, or as read by --header. Lines may be split on delimiters other than commas by providing --delim. __FORMAT_TEXT__ Arguments: $args_string Examples: Parse csv separated fields x and y. recs-fromcsv --field x,y Parse data with a header line specifying fields recs-fromcsv --header Parse tsv data (using bash syntax for a literal tab) recs-fromcsv --delim \$'\\t' USAGE APP_RECORDSTREAM_OPERATION_FROMCSV $fatpacked{"App/RecordStream/Operation/fromdb.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMDB'; package App::RecordStream::Operation::fromdb;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::OptionalRequire 'DBI';BEGIN {App::RecordStream::OptionalRequire::require_done()}use App::RecordStream::DBHandle;use App::RecordStream::Record;sub init {my$this=shift;my$args=shift;my ($table_name,$sql);my$spec={'table=s'=>\$table_name,'sql=s'=>\$sql,};$this->parse_options($args,$spec,['pass_through']);$this->{'TABLE_NAME'}=$table_name;my$dbh=App::RecordStream::DBHandle::get_dbh($args);$this->{'DBH'}=$dbh;die("Must define --table or --sql\n")unless ($table_name || $sql);unless ($sql){$sql="SELECT * FROM $table_name"}$this->{'SQL'}=$sql}sub wants_input {return 0}sub stream_done {my$this=shift;my$sth=$this->{'DBH'}->prepare($this->{'SQL'});$sth->execute();while (my$row=$sth->fetchrow_hashref()){my$record=App::RecordStream::Record->new(%$row);$this->push_record($record)}}sub usage {my$this=shift;my$options=[['table','Name of the table to dump, this is a shortcut for --sql \'SELECT * from tableName\''],['sql','SQL select statement to run'],];my$args_string=$this->options_string($options);my$usage=< 9' EXAMPLES APP_RECORDSTREAM_OPERATION_FROMDB $fatpacked{"App/RecordStream/Operation/fromjsonarray.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMJSONARRAY'; package App::RecordStream::Operation::fromjsonarray;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Record;use JSON::MaybeXS;sub init {my ($this,$args)=@_;my@fields;my$preserve_empty=undef;my$spec={'key|k=s'=>sub {push@fields,split(/,/,$_[1])},};$this->parse_options($args,$spec);$this->{'EXTRA_ARGS'}=$args;$this->{'FIELDS'}=\@fields;$this->{'JSON'}=JSON->new()}sub wants_input {return 0}sub stream_done {my ($this)=@_;my$files=$this->{'EXTRA_ARGS'};if (scalar @$files > 0){for my$file (@$files){$this->update_current_filename($file);open(my$fh,'<',$file)or die "Could not open file: $!\n";$this->get_records_from_handle($fh);close$fh}}else {$this->get_records_from_handle(\*STDIN)}}sub get_records_from_handle {my ($this,$fh)=@_;my$json=$this->{'JSON'};my$fields=$this->{'FIELDS'};my$has_fields=scalar @$fields;my$contents=do {local $/;<$fh>};my@arrays=$json->incr_parse($contents);for my$item (map {@$_}@arrays){$item=App::RecordStream::Record->new($item);my$record=$item;if ($has_fields){$record=App::RecordStream::Record->new();for my$field (@$fields){$record->set($field,${$item->guess_key_from_spec($field)})}}$this->push_record($record)}}sub usage {my ($this)=@_;my$options=[['key|k ','Optional Comma separated list of field names. If none specified, use all keys. May be specified multiple times, may be key specs' ],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Import JSON objects from within a JSON array. __FORMAT_TEXT__ Arguments: $args_string USAGE APP_RECORDSTREAM_OPERATION_FROMJSONARRAY $fatpacked{"App/RecordStream/Operation/fromkv.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMKV'; package App::RecordStream::Operation::fromkv;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);use strict;use warnings;sub init {my$this=shift;my$args=shift;my$kv_delim=" ";my$entry_delim="\n";my$record_delim="END\n";my$spec={"kv-delim|f=s"=>\$kv_delim,"entry-delim|e=s"=>\$entry_delim,"record-delim|r=s"=>\$record_delim,};$this->parse_options($args,$spec);$this->{'KV_DELIM'}=$kv_delim;$this->{'ENTRY_DELIM'}=$entry_delim;$this->{'RECORD_DELIM'}=$record_delim;$this->{'ACC'}=undef}sub wants_input {return 1}sub stream_done {my$this=shift;my$acc=$this->{'ACC'};if(defined($acc)){$this->process_record($acc)}}sub accept_line {my$this=shift;my$line=shift;if(!defined($this->{'ACC'})){$this->{'ACC'}=''}$this->{'ACC'}.= "$line\n";my$record_delim=$this->{'RECORD_DELIM'};if($this->{'ACC'}=~ s/^(.*?)\Q$record_delim\E//s){$this->process_record($1)}return 1}sub process_record {my$this=shift;my$line=shift;my$kv_delim=$this->{'KV_DELIM'};my$entry_delim=$this->{'ENTRY_DELIM'};$line =~ s/^\s+|\s+$//g;my@entries=split(/\Q$entry_delim\E/,$line);if (scalar(@entries)> 0){my$current_record={};for my$entry (@entries){my@pair=split($kv_delim,$entry);$current_record->{$pair[0]}=$pair[1]if scalar(@pair)==2}$this->push_record(App::RecordStream::Record->new($current_record))}}sub usage {my$this=shift;my$options=[['record-delim|r ','Delimiter to for separating records (defaults to "END\\n").'],['entry-delim|e ','Delimiter to for separating entries within records (defaults to "\\n").'],['kv-delim|f ','Delimiter to for separating key/value pairs within an entry (defaults to " ").'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Records are generated from character input with the form "...". Records have the form "...". Entries are pairs of the form "". __FORMAT_TEXT__ Arguments: $args_string Examples: Parse memcached stat metrics into records echo -ne 'stats\\r\\n' | nc -i1 localhost 11211 | tr -d "\\r" | awk '{if (! /END/) {print \$2" "\$3} else {print \$0}}' | recs-fromkv Parse records separated by "E\\n" with entries separated by '\|' and pairs separated by '=' recs-fromkv --kv-delim '=' --entry-delim '\|' --record-delim \$(echo -ne "E\\n") Parse records separated by "%\\n" with entries separated by "\\n" and pairs separated by '=' recs-fromkv --kv-delim '=' --record-delim \$(echo -ne "%\\n") Parse records separated by '%' with entries separated by '\|' and pairs separated by '=' recs-fromkv --kv-delim '=' --entry-delim '\|' --record-delim '%' USAGE APP_RECORDSTREAM_OPERATION_FROMKV $fatpacked{"App/RecordStream/Operation/frommongo.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMMONGO'; package App::RecordStream::Operation::frommongo;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use MongoDB;use Data::Dumper;use JSON::PP;sub init {my$this=shift;my$args=shift;my ($host,$user,$pass,$db_name,$query,$collection);my$spec={"host=s"=>\$host,"user=s"=>\$user,"password|pass=s"=>\$pass,"name|dbname=s"=>\$db_name,"query=s"=>\$query,"collection=s"=>\$collection,};$this->parse_options($args,$spec);unless(defined$host && defined$db_name && defined$query && defined$collection){die "Must specify all of --host, --name, --collection, and --query\n"}my$params={host=>$host,db_name=>$db_name,};$params->{'username'}=$user if defined$user;$params->{'password'}=$pass if defined$pass;my$json=JSON::PP->new()->allow_barekey()->allow_singlequote()->relaxed(1);$this->{'CONNECT_PARAMS'}=$params;$this->{'DB_NAME'}=$db_name;$this->{'COLLECTION'}=$collection;$this->{'QUERY'}=$json->decode($query)}sub wants_input {return 0}sub stream_done {my$this=shift;my$cursor=$this->get_query();while (my$object=$cursor->next()){$this->push_record($object)}}sub get_query {my$this=shift;if ($this->{'CURSOR'}){return$this->{'CURSOR'}}my$client=MongoDB::MongoClient->new(%{$this->{'CONNECT_PARAMS'}},);my$db=$client->get_database($this->{'DB_NAME'});my$collection=$db->get_collection($this->{'COLLECTION'});return$this->{'CURSOR'}=$collection->find($this->{'QUERY'})}sub usage {my$this=shift;my$options=[['host ','URI for your mongo instance, may include user:pass@URI'],['user ','User to authenticate as.'],['password ','Password for --user'],['name ','Name of database to connect to'],['collection ','Name of collection to query against'],['query ','JSON query string to run against the --collection'],];my$args_string=$this->options_string($options);return <sub {$this->add_regex($_[1],0,0)},"pre-flush-regex|pre=s"=>sub {$this->add_regex($_[1],1,0)},"post-flush-regex|post=s"=>sub {$this->add_regex($_[1],0,1)},"double-flush-regex|double=s"=>sub {$this->add_regex($_[1],1,1)},"clobber"=>sub {$this->_set_clobber(1)},"keep-all"=>sub {$this->_set_keep_all(1)},"keep=s"=>sub {$this->add_keep(split(/,/,$_[1]))},);$this->parse_options($args,\%options);$this->{'RECORD'}=App::RecordStream::Record->new()}sub add_regex {my ($this,$string,$pre_flush,$post_flush)=@_;$this->{'REGEXES'}||= [];my$fields=[];if($string =~ /^([^=]*)=(.*)$/){$fields=[split(/,/,$1)];$string=$2}push @{$this->{'REGEXES'}},[$string,$fields,$pre_flush,$post_flush]}sub _get_regexes {my ($this)=@_;return$this->{'REGEXES'}|| []}sub _set_clobber {my ($this,$value)=@_;$this->{'CLOBBER'}=$value}sub get_clobber {my ($this)=@_;return$this->{'CLOBBER'}|| 0}sub _set_keep_all {my ($this,$value)=@_;$this->{'KEEP_ALL'}=$value}sub get_keep_all {my ($this)=@_;return$this->{'KEEP_ALL'}|| 0}sub add_keep {my$this=shift;$this->{'KEEP'}||= {};for my$field (@_){$this->{'KEEP'}->{$field}=1}}sub check_keep {my ($this,$field)=@_;$this->{'KEEP'}||= {};return$this->get_keep_all()|| exists($this->{'KEEP'}->{$field})}sub accept_line {my$this=shift;my$line=shift;my$regex_index=0;for my$regex (@{$this->_get_regexes()}){my ($string,$fields,$pre_flush,$post_flush)=@$regex;my$field_prefix="$regex_index-";if(my@groups=($line =~ $string)){my$pairs=$this->get_field_value_pairs(\@groups,$fields,$field_prefix);if(!$this->get_clobber()){for my$pair (@$pairs){my ($name,$value)=@$pair;if(defined ${$this->{'RECORD'}->guess_key_from_spec($name)}){$pre_flush=1}}}if($pre_flush){$this->flush_record()}for my$pair (@$pairs){my ($name,$value)=@$pair;${$this->{'RECORD'}->guess_key_from_spec($name)}=$value}if($post_flush){$this->flush_record()}}++$regex_index}return 1}sub stream_done {my$this=shift;my$record=$this->{'RECORD'};if(!$this->get_clobber()&& scalar($record->keys())){$this->flush_record()}}sub get_field_value_pairs {my ($this,$groups,$fields,$prefix)=@_;my@field_names;my%groups_used;for(my$i=0;$i < @$fields;++$i){my$field=$fields->[$i];my$field_name;if($field =~ /^\$(\d+)$/){my$n=$1 - 1;$field_name=$groups->[$n];$groups_used{$n}=1}else {$field_name=$field}push@field_names,$field_name}my@pairs;my$pair_index=0;for(my$i=0;$i < @$groups;++$i){if($groups_used{$i}){next}my$field_name=($pair_index < @field_names)? $field_names[$pair_index]: ($prefix .$pair_index);push@pairs,[$field_name,$groups->[$i]];$pair_index++}return \@pairs}sub flush_record {my$this=shift;my$record=$this->{'RECORD'};my$record2=App::RecordStream::Record->new();for my$field ($record->keys()){if($this->check_keep($field)){$record2->set($field,$record->get($field))}}$this->push_record($record);$this->{'RECORD'}=$record2}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[['no-flush-regex|--regex|--re ','Add a normal regex.'],['pre-flush-regex|--pre ','Add a regex that flushes before interpretting fields when matched.'],['post-flush-regex|--post ','Add a regex that flushes after interpretting fields when matched.'],['double-flush-regex|--double ','Add a regex that flushes both before and after interprettying fields when matched.'],['clobber','Do not flush records when a field from a match would clobber an already existing field and do not flush at EOF.'],['keep-all','Do not clear any fields on a flush.'],['keep ','Do not clear this comma separated list of fields on a flush.'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Match multiple regexes against each line of input (or lines of ). Various parameters control when the accumulated fields are flushed to output as a record and which, if any, fields are cleared when the record is flushed. By default regexes do not necessarily flush on either side, would-be field collisions cause a flush, EOF causes a flush if any fields are set, and all fields are cleared on a flush. __FORMAT_TEXT__ Arguments: $args_string __FORMAT_TEXT__ - Syntax is: ',=REGEX'. KEY field names are optional. The key names may be key specs, see '--help-keyspecs' for more. Field names may not be keygroups. If field matches \$NUM, then that match number in the regex will be used as the field name __FORMAT_TEXT__ Examples: Typical use case one: parse several fields on separate lines recs-frommultire --re 'fname,lname=^Name: (.*) (.*)\$' --re 'addr=^Address: (.*)\$' Typical use case two: some fields apply to multiple records ("department" here) recs-frommultire --post 'fname,lname=^Name: (.*) (.*)\$' --re 'department=^Department: (.*)\$' --clobber --keep team USAGE APP_RECORDSTREAM_OPERATION_FROMMULTIRE $fatpacked{"App/RecordStream/Operation/fromps.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMPS'; package App::RecordStream::Operation::fromps;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::OptionalRequire qw(Proc::ProcessTable);App::RecordStream::OptionalRequire::require_done();sub init {my$this=shift;my$args=shift;my$process_table=$this->get_process_table();my@fields;my$spec={'key|k|field|f=s'=>sub {push@fields,split(',',$_[1])},};$this->parse_options($args,$spec);if (scalar@fields < 1){@fields=grep {defined($_)}$process_table->fields()}$this->{'FIELDS'}=\@fields}sub get_process_table {my$this=shift;$this->{'PROCESS_TABLE'}||= Proc::ProcessTable->new();return$this->{'PROCESS_TABLE'}}sub set_process_table {my$this=shift;my$table=shift;$this->{'PROCESS_TABLE'}=$table}sub set_converter {my$this=shift;my$func=shift;$this->{'CONVERTER'}=$func}sub get_converter {my$this=shift;$this->{'CONVERTER'}||= sub {return (getpwuid($_[0]))[0]};return$this->{'CONVERTER'}}sub wants_input {return 0}sub stream_done {my$this=shift;my$table=$this->get_process_table();my$fields=$this->{'FIELDS'};for my$proc (@{$table->table()}){my$record=App::RecordStream::Record->new();for my$field (@$fields){my$value=$proc->{$field};if ($field eq 'uid'){$value=$this->get_converter()->($value)}$record->{$field}=$value if (defined$value)}$this->push_record($record)}}sub usage {my$this=shift;my@fields=Proc::ProcessTable->new()->fields();my$all_fields=join (', ',grep {defined}@fields);my$options=[['keys ','Fields to output. May be specified multiple times, may be comma separated. Default to all fields These are Proc::ProcessTable keys, and thus may not be keyspecs or groups'],];my$args_string=$this->options_string($options);my$default_fields=$ENV{GENERATING_STATIC_DOC}? < __FORMAT_TEXT__ Prints out JSON records converted from the process table. __FORMAT_TEXT__ $args_string $default_fields Examples: Get records for the process table recs-fromps Only get uid and pid recs-fromps --keys uid,pid USAGE APP_RECORDSTREAM_OPERATION_FROMPS $fatpacked{"App/RecordStream/Operation/fromre.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMRE'; package App::RecordStream::Operation::fromre;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);sub init {my$this=shift;my$args=shift;my%options=("key|k|field|f=s"=>sub {$this->add_field(split(/,/,$_[1]))},);$this->parse_options($args,\%options);if(!@$args){die "Missing expression\n"}$this->_set_pattern(shift @$args)}sub _set_pattern {my ($this,$value)=@_;$this->{'PATTERN'}=$value}sub get_pattern {my ($this)=@_;return$this->{'PATTERN'}|| 0}sub add_field {my$this=shift;$this->{'FIELDS'}||= [];push @{$this->{'FIELDS'}},@_}sub get_field {my ($this,$index)=@_;if($this->{'FIELDS'}&& $index < @{$this->{'FIELDS'}}){return$this->{'FIELDS'}->[$index]}else {return$index}}sub accept_line {my$this=shift;my$line=shift;if(my@groups=($line =~ $this->get_pattern())){my$record=App::RecordStream::Record->new();my$index=0;for my$value (@groups){${$record->guess_key_from_spec($this->get_field($index))}=$value;++$index}$this->push_record($record)}return 1}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[['key|-k ','Comma separated list of key names. May be specified multiple times. may be a key spec, see \'man recs\' for more'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ is matched against each line of input (or lines of ). Each successfully match results in one output record whose field values are the capture groups from the match. Lines that do not match are ignored. Keys are named numerically (0, 1, etc.) or as given by --key. For spliting on a delimeter, see recs-fromsplit. __FORMAT_TEXT__ Arguments: $args_string Examples: Parse greetings recs-fromre --key name,age '^Hello, my name is (.*) and I am (\\d*) years? old\$' Parse a single key named time from a group of digits at the beginning of the line recs-fromre --key time '^(\\d+)' Map three sets of <>s to a record with keys named 0, 1, and 2 recs-fromre '<(.*)>\\s*<(.*)>\\s*<(.*)>' USAGE APP_RECORDSTREAM_OPERATION_FROMRE $fatpacked{"App/RecordStream/Operation/fromsplit.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMSPLIT'; package App::RecordStream::Operation::fromsplit;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);sub init {my$this=shift;my$args=shift;my$headers=0;my$strict=0;my%options=("delim|d=s"=>sub {$this->_set_delimiter($_[1])},"key|k|field|f=s"=>sub {$this->add_field(split(/,/,$_[1]))},"header"=>\$headers,"strict"=>\$strict,);$this->parse_options($args,\%options);$this->{'HEADER'}=$headers;$this->{'STRICT'}=$strict}sub _set_delimiter {my ($this,$value)=@_;$this->{'DELIMITER'}=$value}sub get_delimiter {my ($this)=@_;if(!defined($this->{'DELIMITER'})){return ','}return$this->{'DELIMITER'}}sub add_field {my$this=shift;$this->{'FIELDS'}||= [];push @{$this->{'FIELDS'}},@_}sub get_field {my ($this,$index)=@_;if($this->{'FIELDS'}&& $index < @{$this->{'FIELDS'}}){return$this->{'FIELDS'}->[$index]}else {return$index}}sub accept_line {my$this=shift;my$line=shift;if ($this->{'HEADER'}){$this->add_field($_)for @{$this->get_values_for_line($line)};delete$this->{'HEADER'}}else {my$record=App::RecordStream::Record->new();my$index=0;for my$value (@{$this->get_values_for_line($line)}){${$record->guess_key_from_spec($this->get_field($index))}=$value;++$index}$this->push_record($record)}return 1}sub get_values_for_line {my$this=shift;my$line=shift;my@values;my$delim=$this->get_delimiter();if ($this->{'STRICT'}){@values=split(/\Q$delim\E/,$line,-1)}else {@values=split(/$delim/,$line,-1)}return \@values}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[['delim|-d ',"Delimiter to use for splitting input lines (default ',')."],['key|-k ','Comma separated list of key names. May be specified multiple times, may be key specs'],['header','Take key names from the first line of input.'],['strict','Delimiter is not treated as a regex'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Each line of input (or lines of ) is split on provided delimiter to produce an output record. Keys are named numerically (0, 1, etc.) or as given by --key. __FORMAT_TEXT__ Arguments: $args_string Examples: Parse space separated keys x and y. recs-fromsplit --key x,y --delim ' ' Parse comma separated keys a, b, and c. recs-fromsplit --key a,b,c USAGE APP_RECORDSTREAM_OPERATION_FROMSPLIT $fatpacked{"App/RecordStream/Operation/fromtcpdump.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMTCPDUMP'; package App::RecordStream::Operation::fromtcpdump;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::OptionalRequire qw(NetPacket::Ethernet :ALL);use App::RecordStream::OptionalRequire qw(NetPacket::IP :ALL);use App::RecordStream::OptionalRequire qw(NetPacket::TCP :ALL);use App::RecordStream::OptionalRequire qw(NetPacket::UDP :ALL);use App::RecordStream::OptionalRequire qw(NetPacket::ARP :ALL);use App::RecordStream::OptionalRequire qw(Net::Pcap pcap_open_offline pcap_loop pcap_next_ex);use App::RecordStream::OptionalRequire qw(Net::DNS::Packet);App::RecordStream::OptionalRequire::require_done();use Data::Dumper;my$IP_FLAGS={'more_fragments'=>IP_FLAG_MOREFRAGS,'dont_fragment'=>IP_FLAG_DONTFRAG,'congestion'=>IP_FLAG_CONGESTION,};my$TCP_FLAGS={FIN=>FIN,SYN=>SYN,RST=>RST,PSH=>PSH,ACK=>ACK,URG=>URG,ECE=>ECE,CWR=>CWR,};my$ARP_OPCODES={+ARP_OPCODE_REQUEST,'ARP_REQUEST',+ARP_OPCODE_REPLY,'ARP_REPLY',+RARP_OPCODE_REQUEST,'RARP_REQUEST',+RARP_OPCODE_REPLY,'RARP_REPLY',};my$DEFAULT_SUPPRESSED_FIELDS=[qw(data _frame _parent type)];sub init {my$this=shift;my$args=shift;my$data=0;my$spec={'data'=>\$data,};$this->parse_options($args,$spec);if (!@$args){die "Missing capture file\n"}$this->{'FILES'}=$args;$this->{'DATA'}=$data}sub wants_input {return 0}sub stream_done {my$this=shift;for my$filename (@{$this->{'FILES'}}){$this->update_current_filename($filename);$this->dump_packets($filename)}}sub dump_packets {my$this=shift;my$file=shift;my$error;my$pcap=pcap_open_offline($file,\$error);die$error if ($error);my ($raw_packet,%header);while(pcap_next_ex($pcap,\%header,\$raw_packet)==1){my$record={'length'=>$header{'len'},'caplen'=>$header{'caplen'},'file'=>$file,};if ($header{'tv_sec'}){$record->{'timestamp'}=join('.',$header{'tv_sec'},$header{'tv_usec'})}$this->push_record($this->create_packet_record($raw_packet,$record))}}sub create_packet_record {my$this=shift;my$packet=shift;my$record=shift;my ($eth_obj)=NetPacket::Ethernet->decode($packet);$this->propagate_fields('ethernet',$eth_obj,$record);my$type='ethernet';my$data=$eth_obj->{'data'};if ($eth_obj->{type}==ETH_TYPE_IP){my$ip_obj=NetPacket::IP->decode($eth_obj->{data});$this->propagate_fields('ip',$ip_obj,$record,[qw(flags)]);$type='ip';$data=$ip_obj->{'data'};$record->{'ip'}->{'flags'}=$this->get_flag_list($ip_obj->{'flags'},$IP_FLAGS,);if($ip_obj->{proto}==IP_PROTO_TCP){my$ip_data_len=$ip_obj->{len}- $ip_obj->{hlen}* 4;if ($ip_data_len < length($ip_obj->{data})){my$truncated_data=substr($ip_obj->{'data'},0,$ip_data_len);$ip_obj->{'data'}=$truncated_data}my$tcp_obj=NetPacket::TCP->decode($ip_obj->{data});$this->propagate_fields('tcp',$tcp_obj,$record);$type='tcp';$data=$tcp_obj->{'data'};$record->{'tcp'}->{'flags'}=$this->get_flag_list($tcp_obj->{'flags'},$TCP_FLAGS,);$this->attach_dns_info($record,$tcp_obj)}elsif ($ip_obj->{'proto'}==IP_PROTO_UDP){my$udp_obj=NetPacket::UDP->decode ($ip_obj->{data});$this->propagate_fields('udp',$udp_obj,$record);$type='udp';$data=$udp_obj->{'data'};$this->attach_dns_info($record,$udp_obj)}}elsif ($eth_obj->{'type'}==ETH_TYPE_ARP){$type='arp';my$arp_obj=NetPacket::ARP->decode($eth_obj->{data});$this->propagate_fields('arp',$arp_obj,$record,[qw(opcode)]);my$opcode=$arp_obj->{'opcode'};$record->{'arp'}->{'opcode'}=$ARP_OPCODES->{$opcode}}$record->{'type'}=$type;$record->{'data'}=$data if ($this->{'DATA'});return App::RecordStream::Record->new($record)}sub attach_dns_info {my$this=shift;my$record=shift;my$packet=shift;unless ($packet->{'dest_port'}==53 || $packet->{'src_port'}==53){return}my$data=$packet->{'data'};my$dns_packet=Net::DNS::Packet->new(\$data);my@answers=$dns_packet->answer();if (!$this->{'DATA'}){$dns_packet->{'buffer'}='';for my$answer (@answers){$answer->{'rdata'}=''}}$record->{'dns'}=$dns_packet;$record->{'dns'}->{'answer'}=\@answers}sub get_flag_list {my$this=shift;my$flags=shift;my$flags_hash=shift;my$to_return={};for my$name (keys %$flags_hash){if ($flags & $flags_hash->{$name}){$to_return->{$name}=1}}return$to_return}sub propagate_fields {my$this=shift;my$dest_key=shift;my$src=shift;my$dest=shift;my$extra_suppressed=shift;my$suppressed={map {$_=>1}@$DEFAULT_SUPPRESSED_FIELDS,@$extra_suppressed };for my$key (keys %$src){next if ($suppressed->{$key});$dest->{$dest_key}->{$key}=$src->{$key}}}sub usage {my$this=shift;my$options=[['data','Include raw data bytes of deepest packet level'],];my$args_string=$this->options_string($options);my$ip_flag_names=join(', ',sort keys %$IP_FLAGS);my$tcp_flag_names=join(', ',sort keys %$TCP_FLAGS);my$arp_opcodes=join(', ',sort values %$ARP_OPCODES);return < ... __FORMAT_TEXT__ Runs tcpdump and puts out records, one for each packet. Expects pcap files. Will put the name of the originating capture file in the 'file' field. Will parse packet types: ethernet, ip, udp, arp, tcp The type key will indicate the highest level parsed. DNS information will be parsed for TCP or UDP packets that are from or to port 53. The parsed representation of the packet for each valid level will be placed in the corresponding key. For instance, for a tcp packet, there will be information in the keys 'ethernet', 'ip', and 'tcp' By default, data output is suppressed due to poor interaction with terminal programs. __FORMAT_TEXT__ Flags will be parsed into hash of strings Possible IP flags: $ip_flag_names Poassible TCP flags: $tcp_flag_names ARP opcodes will be matched Possible opcodes: $arp_opcodes Creating a pcap file: __FORMAT_TEXT__ Run a tcpdump command with -w FILE to produce a pcap file. For instance: sudo tcpdump -w /var/tmp/capture.pcap Optionally, include all the data and timing information: sudo tcpdump -w capture.pcap -s4096 -S -tt See 'man tcpdump' for more information. __FORMAT_TEXT__ Arguments $args_string Examples Get records for all packets recs-fromtcpdump capture.pcap USAGE APP_RECORDSTREAM_OPERATION_FROMTCPDUMP $fatpacked{"App/RecordStream/Operation/fromxferlog.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMXFERLOG'; package App::RecordStream::Operation::fromxferlog;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Record;use App::RecordStream::OptionalRequire qw(Net::FTPServer::XferLog);App::RecordStream::OptionalRequire::require_done();sub init {my$this=shift;my$args=shift;my$spec={};$this->parse_options($args,$spec)}sub accept_line {my$this=shift;my$line=shift;if (my$hash=Net::FTPServer::XferLog->parse_line($line)){my$record=App::RecordStream::Record->new($hash);$this->push_record($record)}return 1}sub usage {my$this=shift;return < __FORMAT_TEXT__ Each line of input (or lines of ) is parse by Net::FTPServer::XferLog to produce an output record. __FORMAT_TEXT__ Examples: Get records from typical xferlog recs-fromxferlog < /var/log/xferlog USAGE APP_RECORDSTREAM_OPERATION_FROMXFERLOG $fatpacked{"App/RecordStream/Operation/fromxml.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_FROMXML'; package App::RecordStream::Operation::fromxml;our$VERSION="4.0.25";use strict;use warnings;use base qw(App::RecordStream::Operation);use App::RecordStream::Record;use App::RecordStream::OptionalRequire 'HTTP::Request';use App::RecordStream::OptionalRequire 'LWP::UserAgent';use App::RecordStream::OptionalRequire 'List::MoreUtils',qw(uniq);use App::RecordStream::OptionalRequire 'XML::Twig';BEGIN {App::RecordStream::OptionalRequire::require_done()}sub init {my$this=shift;my$args=shift;my@elements;my$nested=0;my$spec={'element=s'=>sub {push@elements,split(/,/,$_[1])},'nested'=>\$nested,};$this->parse_options($args,$spec);$this->{'ELEMENTS'}=[uniq@elements ];$this->{'NESTED'}=$nested;my$has_files=scalar @$args;$this->{'HAS_URIS'}=$has_files;$this->{'EXTRA_ARGS'}=$args;$this->{'OPEN_TAGS'}=0}sub wants_input {return 0}sub stream_done {my$this=shift;my$elements=$this->{'ELEMENTS'};my$elem_prefix='/*/';my$attr_prefix='/';if ($this->{'NESTED'}){$elem_prefix .= '/';$attr_prefix .= '/'}my%start_tag_handlers;my%twig_roots;for my$element (@$elements){my$elem_expr=$elem_prefix .$element;my$attr_expr=$attr_prefix .'[@' .$element .']';my$default_hash={};if (@$elements > 1){$default_hash->{'element'}=$element}$start_tag_handlers{$elem_expr}=sub {$this->{'OPEN_TAGS'}++};$twig_roots{$elem_expr}=sub {$this->handle_element($default_hash,@_)};$twig_roots{$attr_expr}=sub {$this->handle_attribute($element,$default_hash,@_)}}my$twig=XML::Twig->new(start_tag_handlers=>\%start_tag_handlers,twig_roots=>\%twig_roots);while (my$xml=$this->get_xml_string()){$twig->parse($xml)}}sub handle_element {my ($this,$default_hash,$twig,$elem)=@_;$this->{'OPEN_TAGS'}--;if ($this->{'OPEN_TAGS'}==0){my$s=$elem->simplify('forcearray'=>1,'keyattr'=>[]);$this->push_value($s,$default_hash);$twig->purge}return 0}sub handle_attribute {my ($this,$name,$default_hash,$twig,$elem)=@_;if ($this->{'OPEN_TAGS'}==0){$this->push_value($elem->att($name),$default_hash)}}sub push_value {my$this=shift;my$value=shift;my$default_hash=shift;if (UNIVERSAL::isa($value,'HASH')){my$record=App::RecordStream::Record->new($value);for my$key (keys %$default_hash){$record->{$key}=$default_hash->{$key}}$this->push_record($record)}elsif (UNIVERSAL::isa($value,'ARRAY')){for my$item (@$value){$this->push_value($item,$default_hash)}}else {my$record=App::RecordStream::Record->new(%$default_hash);$record->{'value'}=$value;$this->push_record($record)}}sub get_xml_string {my$this=shift;my$uris=$this->{'EXTRA_ARGS'};my$contents;if ($this->{'HAS_URIS'}){return undef unless (@$uris);my$uri=shift @$uris;$this->update_current_filename($uri);my$ua=$this->make_user_agent();my$response=$ua->request($this->get_request($uri));if (!$response->is_success()){warn "GET uri: '$uri' failed, skipping!\n";return$this->get_xml_string()}$contents=$response->content()}else {local $/;$contents=}return$contents}sub get_request {my$this=shift;my$uri=shift;my$request=HTTP::Request->new();$request->method('GET');$request->uri($uri);return$request}sub make_user_agent {return LWP::UserAgent->new()}sub usage {my$this=shift;my$options=[['element ','May be comma separated, may be specified multiple times. Sets the elements/attributes to print records for'],['nested','search for elements at all levels of the xml document'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ Reads either from STDIN or from the specified URIs. Parses the xml documents, and creates records for the specified elements. If multiple element types are specified, will add a {'element' => element name} field to the output record. __FORMAT_TEXT__ $args_string Examples: Create records for the bar element at the top level of myXMLDoc recs-fromxml --element bar file:myXMLDoc Create records for all foo and bar elements from the URL recs-fromxml --element foo,bar --nested http://google.com USAGE APP_RECORDSTREAM_OPERATION_FROMXML $fatpacked{"App/RecordStream/Operation/generate.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_GENERATE'; package App::RecordStream::Operation::generate;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);use App::RecordStream::Executor::Getopt;use App::RecordStream::Executor;use Data::Dumper;use App::RecordStream::InputStream;use App::RecordStream::Record;use App::RecordStream::Executor;sub init {my$this=shift;my$args=shift;my$keychain='_chain';my$passthrough=0;my$executor_options=App::RecordStream::Executor::Getopt->new();my$spec={'keychain=s'=>\$keychain,'passthrough'=>\$passthrough,$executor_options->arguments(),};$this->parse_options($args,$spec,['bundling']);my$expression=$executor_options->get_string($args);my$executor=App::RecordStream::Executor->new($expression);$this->{'KEYCHAIN'}=$keychain;$this->{'PASSTHROUGH'}=$passthrough;$this->{'EXECUTOR'}=$executor}sub accept_record {my$this=shift;my$record=shift;$this->push_record($record)if$this->{'PASSTHROUGH'};my$interpolated_command=$this->{'EXECUTOR'}->execute_code($record);if ($@){chomp $@;warn "# $0 interpolating command threw: " .$@ ."\n";return 1}my$pid=open(my$pipe,"-|",$interpolated_command);if (!$pid){warn "# $0 open(..., \"$interpolated_command |\") failed: $!\n";return 1}my$generator_stream=App::RecordStream::InputStream->new(FH=>$pipe);while(my$generated_record=$generator_stream->get_record()){${$generated_record->guess_key_from_spec($this->{'KEYCHAIN'})}=$record->as_hashref();$this->push_record($generated_record)}return 1}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs')}sub usage {my$this=shift;my$options=[App::RecordStream::Executor::options_help(),['passthrough','Emit input record in addition to generated records' ],['keychain ','Use \'name\' as the chain key (default is \'_chain\') may be a key spec, see \'--help-keyspecs\' for more info' ],];my$args_string=$this->options_string($options);my$usage=< [] __FORMAT_TEXT__ Executes for each record to generate a record stream, which is then output with a chain link back to the original record. is executed opened as a command for each record of input (or records from ) with \$r set to a App::RecordStream::Record object. The output lines of each command execution are interpreted as a serialized Recs records, one per line. Each such line is reconstituted as a App::RecordStream::Record, and the '_chain' key is added to the record before it is printed. The value of the '_chain' key is the record that was originally passed to the eval expression. __FORMAT_TEXT__ For instance. If you did: recs-generate "recs-fromatomfeed http://...?key=\$r->{title}..." with input that looked like: { 'title' : 'foo' } { 'title' : 'bar' } then recs-generate would end up executing: recs-fromatomfeed http://...?key=foo... __FORMAT_TEXT__ and interpreting the result as a series of new line separated records. If the result from recs-fromatomfeed was something like: __FORMAT_TEXT__ { 'title' : 'zip' } { 'title' : 'zap' } __FORMAT_TEXT__ then recs-generate would add the chain link so the output would look like: __FORMAT_TEXT__ { 'title' : 'zip', 'chain' : { 'title' : 'foo' } } { 'title' : 'zap', 'chain' : { 'title' : 'foo' } } Arguments: $args_string Examples: Chain recs from a feed to recs from a second feed and the print the titles. recs-fromatomfeed "http://..." | recs-generate "recs-fromatomfeed http://...?key=\$r->{title}" | recs-eval 'join("\t", \$r->{title}, \$r->{chain}->{title})' USAGE APP_RECORDSTREAM_OPERATION_GENERATE $fatpacked{"App/RecordStream/Operation/grep.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_GREP'; package App::RecordStream::Operation::grep;our$VERSION="4.0.25";use strict;use base qw(App::RecordStream::Operation);use App::RecordStream::Executor::Getopt;use App::RecordStream::Executor;sub init {my$this=shift;my$args=shift;my$anti_match;my$context=0;my$after=0;my$before=0;my$executor_options=App::RecordStream::Executor::Getopt->new();my$spec={"-v"=>\$anti_match,"C=s"=>\$context,"A=s"=>\$after,"B=s"=>\$before,$executor_options->arguments(),};$this->parse_options($args,$spec,['bundling']);my$expression=$executor_options->get_string($args);my$executor=App::RecordStream::Executor->new($expression);$this->{'ANTI_MATCH'}=$anti_match;if ($context){$after=$before=$context}$this->{'AFTER'}=$after;$this->{'BEFORE'}=$before;$this->{'ACCUMULATOR'}=[];$this->{'EXECUTOR'}=$executor}sub accept_record {my$this=shift;my$record=shift;my$executor=$this->{'EXECUTOR'};my$result=$executor->execute_code($record);$result=not $result if ($this->{'ANTI_MATCH'});my$pushed_record=0;if ($result){if ($this->{'BEFORE'}){while(my$record=shift @{$this->{'ACCUMULATOR'}}){$this->push_record($record)}}$this->push_record($record);$pushed_record=1;$this->{'SEEN_RECORD'}=1;if ($this->{AFTER}> 0){$this->{'FORCED_OUTPUT'}=$this->{'AFTER'}}}elsif ($this->{'BEFORE'}> 0){push @{$this->{'ACCUMULATOR'}},$record;if ((scalar @{$this->{'ACCUMULATOR'}})> $this->{'BEFORE'}){shift @{$this->{'ACCUMULATOR'}}}}if ($this->{'FORCED_OUTPUT'}&& (!$pushed_record)){$this->push_record($record);$this->{'FORCED_OUTPUT'}--}return 1}sub stream_done {my$this=shift;$this->_set_exit_value(1)unless ($this->{'SEEN_RECORD'})}sub add_help_types {my$this=shift;$this->use_help_type('keyspecs');$this->use_help_type('snippet')}sub usage {my$this=shift;my$options=[App::RecordStream::Executor::options_help(),['v','Anti-match. Records NOT matching will be returned'],['C NUM','Provide NUM records of context around matches, equivalent to -A NUM and -B NUM'],['A NUM','Print out NUM following records after a match'],['B NUM','Print out the previous NUM records on a match'],];my$args_string=$this->options_string($options);return < [] __FORMAT_TEXT__ is evaluated as perl on each record of input (or records from ) with \$r set to a App::RecordStream::Record object and \$line set to the current line number (starting at 1). Records for which the evaluation is a perl true are printed back out. __FORMAT_TEXT__ Arguments: $args_string Examples: Filter to records with field 'name' equal to 'John' recs-grep '\$r->{name} eq "John"' Find fields without ppid = 3456 recs-grep -v '{{ppid}} == 3456' Filter to records with all methods equal to 'PUT' recs-grep -MList::MoreUtils=all 'all { \$_ eq 'PUT' } \@{\$r->{methods}}' USAGE APP_RECORDSTREAM_OPERATION_GREP $fatpacked{"App/RecordStream/Operation/help.pm"} = '#line '.(1+__LINE__).' "'.__FILE__."\"\n".<<'APP_RECORDSTREAM_OPERATION_HELP'; use strict;use warnings;package App::RecordStream::Operation::help;use base qw(App::RecordStream::Operation);use Carp qw;sub usage {my$this=shift;my$args_string=$this->options_string([]);return <SUPER::init_help(@_);for my$type (keys %{$this->{'HELP_TYPES'}}){$this->use_help_type($type);$this->{'HELP_TYPES'}{$type}{OPTION_NAME}||= $type}}sub init {my$this=shift;my$args=shift;$this->parse_options($args,{});my$op=shift @$args;if ($op){local@ARGV=("--help");App::RecordStream::Operation::main("recs-$op")}else {$this->_set_wants_help(1)}}sub does_record_output {0}sub wants_input {0}sub accept_line {croak "This operation does not accept input."}1; Usage: recs help recs help