class Fluent::ViaqDataModelFilter

Public Instance Methods

add_elasticsearch_index_name_field(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 299
def add_elasticsearch_index_name_field(tag, time, record)
  found = false
  @elasticsearch_index_names.each do |ein|
    if ein.matcher.match(tag)
      found = true
      return unless ein.enabled
      if ein.name_type == :operations_full || ein.name_type == :project_full
        field_name = @elasticsearch_index_name_field
        need_time = true
      else
        field_name = @elasticsearch_index_prefix_field
        need_time = false
      end

      case ein.name_type
      when :operations_full, :operations_prefix
        prefix = ".operations"
      when :project_full, :project_prefix
        if (k8s = record['kubernetes']).nil?
          log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes field: #{record}")
          break
        elsif (name = k8s['namespace_name']).nil?
          log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_name field: #{record}")
          break
        elsif (uuid = k8s['namespace_id']).nil?
          log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_id field: #{record}")
          break
        else
          prefix = "project." + name + "." + uuid
        end
      end

      if ENV['CDM_DEBUG']
        unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
          log.error("prefix #{prefix} need_time #{need_time} time #{record[@dest_time_name]}")
        end
      end

      if need_time
        ts = DateTime.parse(record[@dest_time_name])
        record[field_name] = prefix + "." + ts.strftime("%Y.%m.%d")
      else
        record[field_name] = prefix
      end
      if ENV['CDM_DEBUG']
        unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
          log.error("record[#{field_name}] = #{record[field_name]}")
        end
      end

      break
    end
  end
  unless found
    if ENV['CDM_DEBUG']
      unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
        log.error("no match for tag #{tag}")
      end
    end
  end
end
add_pipeline_metadata(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 288
def add_pipeline_metadata (tag, time, record)
  (record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] = {
    "ipaddr4"     => @ipaddr4,
    "ipaddr6"     => @ipaddr6,
    "inputname"   => "fluent-plugin-systemd",
    "name"        => "fluentd",
    "received_at" => Time.now.utc.to_datetime.rfc3339(6),
    "version"     => @pipeline_version
  }
end
check_for_match_and_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 264
def check_for_match_and_format(tag, time, record)
  return unless @formatters
  return if @formatter_cache_nomatch[tag]
  fmtr = @formatter_cache[tag]
  unless fmtr
    idx = @formatters.index{|fmtr| fmtr.matcher.match(tag)}
    if idx && (fmtr = @formatters[idx]).enabled
      @formatter_cache[tag] = fmtr
    else
      @formatter_cache_nomatch[tag] = true
      return
    end
  end
  fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type)

  if record[@dest_time_name].nil? && record['time'].nil?
    record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
  end

  if fmtr.fmtr_remove_keys
    fmtr.fmtr_remove_keys.each{|k| record.delete(k)}
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 144
def configure(conf)
  super
  @keep_fields = {}
  @default_keep_fields.each{|xx| @keep_fields[xx] = true}
  @extra_keep_fields.each{|xx| @keep_fields[xx] = true}
  @keep_empty_fields_hash = {}
  @keep_empty_fields.each do |xx|
    @keep_empty_fields_hash[xx] = true
    @keep_fields[xx] = true
  end
  if @use_undefined && @keep_fields.key?(@undefined_name)
    raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields"
  end
  if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name)
    raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields"
  end
  if @formatters
    @formatters.each do |fmtr|
      matcher = ViaqMatchClass.new(fmtr.tag, nil)
      fmtr.instance_eval{ @params[:matcher] = matcher }
      fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type }
      if fmtr.remove_keys
        fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') }
      else
        fmtr.instance_eval{ @params[:fmtr_remove_keys] = nil }
      end
      case fmtr.type
      when :sys_journal, :k8s_journal
        fmtr_func = method(:process_journal_fields)
      when :sys_var_log
        fmtr_func = method(:process_sys_var_log_fields)
      when :k8s_json_file
        fmtr_func = method(:process_k8s_json_file_fields)
      end
      fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func }
    end
    @formatter_cache = {}
    @formatter_cache_nomatch = {}
  end
  begin
    @docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
  rescue
    @docker_hostname = nil
  end
  @ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
  @ipaddr6 = ENV['IPADDR6'] || '::1'
  @pipeline_version = (ENV['FLUENTD_VERSION'] || 'unknown fluentd version') + ' ' + (ENV['DATA_VERSION'] || 'unknown data version')
  # create the elasticsearch index name tag matchers
  unless @elasticsearch_index_names.empty?
    @elasticsearch_index_names.each do |ein|
      matcher = ViaqMatchClass.new(ein.tag, nil)
      ein.instance_eval{ @params[:matcher] = matcher }
    end
  end
end
delempty(thing) click to toggle source

recursively delete empty fields and empty lists/hashes from thing

# File lib/fluent/plugin/filter_viaq_data_model.rb, line 215
def delempty(thing)
  if thing.respond_to?(:delete_if)
    if thing.kind_of? Hash
      thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)}
    else # assume single element iterable
      thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)}
    end
  end
  thing
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 361
def filter(tag, time, record)
  if ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("input #{time} #{tag} #{record}")
    end
  end

  check_for_match_and_format(tag, time, record)
  add_pipeline_metadata(tag, time, record)
  if @use_undefined
    # undefined contains all of the fields not in keep_fields
    undefined = record.reject{|k,v| @keep_fields.key?(k)}
    # only set the undefined field if there are undefined fields
    unless undefined.empty?
      record[@undefined_name] = undefined
      # remove the undefined fields from the record top level
      record.delete_if{|k,v| undefined.key?(k)}
    end
  end
  # remove the field from record if it is not in the list of fields to keep and
  # it is empty
  record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))}
  # probably shouldn't remove everything . . .
  log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty?
  # rename the time field
  if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name)
    val = record.delete(@src_time_name)
    unless @rename_time_if_missing && record.key?(@dest_time_name)
      record[@dest_time_name] = val
    end
  end

  if !@elasticsearch_index_names.empty?
    add_elasticsearch_index_name_field(tag, time, record)
  elsif ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("not adding elasticsearch index name or prefix")
    end
  end
  if ENV['CDM_DEBUG']
    unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
      log.error("output #{time} #{tag} #{record}")
    end
  end
  record
end
isempty(thing) click to toggle source

if thing doesn't respond to empty? then assume it isn't empty e.g. 0.respond_to?(:empty?) == false - the FixNum 0 is not empty

# File lib/fluent/plugin/filter_viaq_data_model.rb, line 210
def isempty(thing)
  thing.respond_to?(:empty?) && thing.empty?
end
process_k8s_json_file_fields(tag, time, record, fmtr_type = nil) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 243
def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
  record['message'] = record['message'] || record['log']
  record['level'] = (record['stream'] == 'stdout') ? 'info' : 'err'
  if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) &&           (k8shost = record['kubernetes'].fetch('host', nil))
    record['hostname'] = k8shost
  elsif @docker_hostname
    record['hostname'] = @docker_hostname
  end
  if record[@dest_time_name].nil? # e.g. already has @timestamp
    unless record['time'].nil?
      # convert from string - parses a wide variety of formats
      rectime = Time.parse(record['time'])
    else
      # convert from time_t
      rectime = Time.at(time)
    end
    record['time'] = rectime.utc.to_datetime.rfc3339(6)
  end
end
process_sys_var_log_fields(tag, time, record, fmtr_type = nil) click to toggle source
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 226
def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
  record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}}
  if record[@dest_time_name].nil? # e.g. already has @timestamp
    # handle the case where the time reported in /var/log/messages is for a previous year
    timeobj = Time.at(time)
    if timeobj > Time.now
      timeobj = Time.new((timeobj.year - 1), timeobj.month, timeobj.day, timeobj.hour, timeobj.min, timeobj.sec, timeobj.utc_offset)
    end
    record['time'] = timeobj.utc.to_datetime.rfc3339(6)
  end
  if record['host'].eql?('localhost') && @docker_hostname
    record['hostname'] = @docker_hostname
  else
    record['hostname'] = record['host']
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 204
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_viaq_data_model.rb, line 200
def start
  super
end