# File lib/fluent/plugin/filter_viaq_data_model.rb, line 299 def add_elasticsearch_index_name_field(tag, time, record) found = false @elasticsearch_index_names.each do |ein| if ein.matcher.match(tag) found = true return unless ein.enabled if ein.name_type == :operations_full || ein.name_type == :project_full field_name = @elasticsearch_index_name_field need_time = true else field_name = @elasticsearch_index_prefix_field need_time = false end case ein.name_type when :operations_full, :operations_prefix prefix = ".operations" when :project_full, :project_prefix if (k8s = record['kubernetes']).nil? log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes field: #{record}") break elsif (name = k8s['namespace_name']).nil? log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_name field: #{record}") break elsif (uuid = k8s['namespace_id']).nil? log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_id field: #{record}") break else prefix = "project." + name + "." + uuid end end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("prefix #{prefix} need_time #{need_time} time #{record[@dest_time_name]}") end end if need_time ts = DateTime.parse(record[@dest_time_name]) record[field_name] = prefix + "." + ts.strftime("%Y.%m.%d") else record[field_name] = prefix end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("record[#{field_name}] = #{record[field_name]}") end end break end end unless found if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("no match for tag #{tag}") end end end end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 288 def add_pipeline_metadata (tag, time, record) (record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] = { "ipaddr4" => @ipaddr4, "ipaddr6" => @ipaddr6, "inputname" => "fluent-plugin-systemd", "name" => "fluentd", "received_at" => Time.now.utc.to_datetime.rfc3339(6), "version" => @pipeline_version } end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 264 def check_for_match_and_format(tag, time, record) return unless @formatters return if @formatter_cache_nomatch[tag] fmtr = @formatter_cache[tag] unless fmtr idx = @formatters.index{|fmtr| fmtr.matcher.match(tag)} if idx && (fmtr = @formatters[idx]).enabled @formatter_cache[tag] = fmtr else @formatter_cache_nomatch[tag] = true return end end fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type) if record[@dest_time_name].nil? && record['time'].nil? record['time'] = Time.at(time).utc.to_datetime.rfc3339(6) end if fmtr.fmtr_remove_keys fmtr.fmtr_remove_keys.each{|k| record.delete(k)} end end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 144 def configure(conf) super @keep_fields = {} @default_keep_fields.each{|xx| @keep_fields[xx] = true} @extra_keep_fields.each{|xx| @keep_fields[xx] = true} @keep_empty_fields_hash = {} @keep_empty_fields.each do |xx| @keep_empty_fields_hash[xx] = true @keep_fields[xx] = true end if @use_undefined && @keep_fields.key?(@undefined_name) raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields" end if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name) raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields" end if @formatters @formatters.each do |fmtr| matcher = ViaqMatchClass.new(fmtr.tag, nil) fmtr.instance_eval{ @params[:matcher] = matcher } fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type } if fmtr.remove_keys fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') } else fmtr.instance_eval{ @params[:fmtr_remove_keys] = nil } end case fmtr.type when :sys_journal, :k8s_journal fmtr_func = method(:process_journal_fields) when :sys_var_log fmtr_func = method(:process_sys_var_log_fields) when :k8s_json_file fmtr_func = method(:process_k8s_json_file_fields) end fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func } end @formatter_cache = {} @formatter_cache_nomatch = {} end begin @docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip rescue @docker_hostname = nil end @ipaddr4 = ENV['IPADDR4'] || '127.0.0.1' @ipaddr6 = ENV['IPADDR6'] || '::1' @pipeline_version = (ENV['FLUENTD_VERSION'] || 'unknown fluentd version') + ' ' + (ENV['DATA_VERSION'] || 'unknown data version') # create the elasticsearch index name tag matchers unless @elasticsearch_index_names.empty? @elasticsearch_index_names.each do |ein| matcher = ViaqMatchClass.new(ein.tag, nil) ein.instance_eval{ @params[:matcher] = matcher } end end end
recursively delete empty fields and empty lists/hashes from thing
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 215 def delempty(thing) if thing.respond_to?(:delete_if) if thing.kind_of? Hash thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)} else # assume single element iterable thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)} end end thing end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 361 def filter(tag, time, record) if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("input #{time} #{tag} #{record}") end end check_for_match_and_format(tag, time, record) add_pipeline_metadata(tag, time, record) if @use_undefined # undefined contains all of the fields not in keep_fields undefined = record.reject{|k,v| @keep_fields.key?(k)} # only set the undefined field if there are undefined fields unless undefined.empty? record[@undefined_name] = undefined # remove the undefined fields from the record top level record.delete_if{|k,v| undefined.key?(k)} end end # remove the field from record if it is not in the list of fields to keep and # it is empty record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))} # probably shouldn't remove everything . . . log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty? # rename the time field if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name) val = record.delete(@src_time_name) unless @rename_time_if_missing && record.key?(@dest_time_name) record[@dest_time_name] = val end end if !@elasticsearch_index_names.empty? add_elasticsearch_index_name_field(tag, time, record) elsif ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("not adding elasticsearch index name or prefix") end end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("output #{time} #{tag} #{record}") end end record end
if thing doesn't respond to empty? then assume it isn't empty e.g. 0.respond_to?(:empty?) == false - the FixNum 0 is not empty
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 210 def isempty(thing) thing.respond_to?(:empty?) && thing.empty? end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 243 def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil) record['message'] = record['message'] || record['log'] record['level'] = (record['stream'] == 'stdout') ? 'info' : 'err' if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && (k8shost = record['kubernetes'].fetch('host', nil)) record['hostname'] = k8shost elsif @docker_hostname record['hostname'] = @docker_hostname end if record[@dest_time_name].nil? # e.g. already has @timestamp unless record['time'].nil? # convert from string - parses a wide variety of formats rectime = Time.parse(record['time']) else # convert from time_t rectime = Time.at(time) end record['time'] = rectime.utc.to_datetime.rfc3339(6) end end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 226 def process_sys_var_log_fields(tag, time, record, fmtr_type = nil) record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}} if record[@dest_time_name].nil? # e.g. already has @timestamp # handle the case where the time reported in /var/log/messages is for a previous year timeobj = Time.at(time) if timeobj > Time.now timeobj = Time.new((timeobj.year - 1), timeobj.month, timeobj.day, timeobj.hour, timeobj.min, timeobj.sec, timeobj.utc_offset) end record['time'] = timeobj.utc.to_datetime.rfc3339(6) end if record['host'].eql?('localhost') && @docker_hostname record['hostname'] = @docker_hostname else record['hostname'] = record['host'] end end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 204 def shutdown super end
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 200 def start super end