Fluentd v1.2.0 has been released

Hi users!

We have released v1.2.0. ChangeLog is here. This release includes new features and improvements.

output: Backup for broken chunks

Fluentd receives various events from various data sources. This sometimes have a problem in Output plugins.

For example, if one application generates invalid events for data destination, e.g. schema mismatch, buffer flush always failed. Other case is generated events are invalid for output configuration, e.g. required field is missing.

Fluentd has retry feature for temporal failures but there errors are never succeeded. So Fluentd should not retry unexpected "broken chunks".

Since v1.2.0, fluentd routes broken chunks to backup directory. By default, backup root directory is /tmp/fluent. If you set root_dir in <system>, root_dir is used. File path consists of several parameters for unique path and ${root_dir}/backup/worker${worker_id}/${plugin_id}/{chunk_id}.log is path template.

If you have following configuration and unrecoverable error happens inside @type sql plugin,

<system>
  root_dir /var/log/fluentd
</system>
# ...
<match app.**>
  @type sql
  @id out_sql
  <buffer>
    # ...
  </buffer>
</match>

chunk is routed to /var/log/fluentd/backup/worker0/out_sql/56644156d63a108eda2f487c58140736.log.

Currently, fluentd routes chunks to backup directory when Output plugin raises following errors during buffer flush.

  • Fluent::UnrecoverableError: Output plugin can raise this error to avoid retry for plugin specific case.
  • TypeError: This error sometimes happens when an event has unexpected type in target field.
  • ArgumentError: This error sometimes happens when library usage is wrong in plugins.
  • NoMethodError: This error sometimes happens when events and configuration are mismatched.

Fluentd continues to retry buffer flush when other error happens.

New API: Counter API

This is for plugin developers. Counter API consists of server and client. Server stores counter values and client calls APIs to store values to server.

Here is example configuration:

<system>
  <counter_server>
    scope test
    bind 127.0.0.1
    port 25000
    backup_path /tmp/counter_backup.json
  </counter_server>
  <counter_client>
    host 127.0.0.1
    port 25000    
  </counter_client>
</system>

And filter implementation example:

require 'fluent/plugin/filter'

module Fluent
  module Plugin
    class CounterFilter < Filter
      Plugin.register_filter('counter', self)

      helpers :counter

      def start
        super

        @client = counter_client_create(scope: 'test') # scope value is same with `<counter_server>`
        @client.establish('counter').get
        @client.init(:name => 'num', type: 'numeric', reset_interval: 10).get
      end

      def filter(tag, time, record)
        @client.inc(:name => 'num', :value => 5)
        p @client.get('num').data.first["current"] # Show current value
        record
      end
    end
  end
end

This API is useful for storing metrics under multiprocess environment. We will add API documents.

filter_grep: Support for <and> and <or> sections

grep filter now supports and / or condition for <regexp> / <exclude> Before v1.2.0, grep filter supports only and <regexp> and or <exclude> patterns.

<filter pattern>
  # These <regexp>s are "and"
  <regexp>
    key level
    pattern ^ERROR|WARN$
  </regexp>
  <regexp>
    key method
    pattern ^GET|POST$
  </regexp>
  # These <exclude>s are "or"
  <exclude>
    key level
    pattern ^WARN$
  </exclude>
  <exclude>
    key method
    pattern ^GET$
  </exclude>
</filter>

v1.2.0 adds <and> and <or> sections to support more patterns.

Here is configuration example:

<filter pattern>
  <or>
    <regexp>
      key level
      pattern ^ERROR|WARN$
    </regexp>
    <regexp>
      key method
      pattern ^GET|POST$
    </regexp>
  </or>
  <and>
    <exclude>
      key level
      pattern ^WARN$
    </exclude>
    <exclude>
      key method
      pattern ^GET$
    </exclude>
  </and>
</filter>

If you pass this data streams:

{"time" : "2013/01/13T07:02:11.124202", "level" : "INFO", "method" : "GET", "path" : "/ping"}
{"time" : "2013/01/13T07:02:13.232645", "level" : "WARN", "method" : "POST", "path" : "/auth"}
{"time" : "2013/01/13T07:02:21.542145", "level" : "WARN", "method" : "GET", "path" : "/favicon.ico"}
{"time" : "2013/01/13T07:02:43.632145", "level" : "WARN", "method" : "POST", "path" : "/login"}
{"time" : "2013/01/13T07:02:44.959307", "level" : "ERROR", "method" : "POST", "path" : "/login"}
{"time" : "2013/01/13T07:02:45.444992", "level" : "ERROR", "method" : "GET", "path" : "/ping"}
{"time" : "2013/01/13T07:02:51.247941", "level" : "WARN", "method" : "GET", "path" : "/info"}
{"time" : "2013/01/13T07:02:53.108366", "level" : "WARN", "method" : "POST", "path" : "/ban"}

filtered result is below:

{"time" : "2013/01/13T07:02:11.124202", "level" : "INFO", "method" : "GET", "path" : "/ping"}
{"time" : "2013/01/13T07:02:13.232645", "level" : "WARN", "method" : "POST", "path" : "/auth"}
{"time" : "2013/01/13T07:02:43.632145", "level" : "WARN", "method" : "POST", "path" : "/login"}
{"time" : "2013/01/13T07:02:44.959307", "level" : "ERROR", "method" : "POST", "path" : "/login"}
{"time" : "2013/01/13T07:02:45.444992", "level" : "ERROR", "method" : "GET", "path" : "/ping"}
{"time" : "2013/01/13T07:02:53.108366", "level" : "WARN", "method" : "POST", "path" : "/ban"}

Major bug fixes

  • server helper: Close invalid socket when ssl error happens on reading
  • log: Fix unexpected implementation bug when log rotation setting is applied

Thanks for submitting bug reports and patches :)

Enjoy logging!

Subscribed to the RSS feed here.

Written by Masahiro Nakagawa

Masahiro (@repeatedly) is the main maintainer of Fluentd. He works on Fluentd development and support full-time. He is also a committer of the D programming language.