Friday, May 01, 2015

Logstash and Oracle Database

    For anyone that doesn't know what Logstash is head over to and watch the video on the home page.  Robin Moffatt has done a great blog post on the full ELK stack with OBIEE.

    This blog post is a first cut at an input filter for Logstash to gather metrics, logs, or anything that can be expressed in sql.  A huge caution that this is a 0.01 attempt and will get better as I need it to.  This is a basic config of doing a select sessions every 5 seconds and report that in this case to just STDOUT.  There are many choices of where to send this information like to elastic search for visualization into Kibana as Mark showed in the OBIEE example.

    If anyone uses this, changes it, likes it, doesn't like it,  let me know.

input { 
  orasql { sql      => "SELECT count(1) sessions from v$session "
           dbuser   => "klrice"
           dbpasswd => "klrice"
           dburl    => "localhost/orcl"
           interval =>  5
output {
 stdout { codec => rubydebug }

The running of this is just needs to add in the plugin path for be able to find the orasql input filter.

     The filter itself is where everything oracle-wise is happening.  Since Logstash is written in JRuby, I'm able to use the normal Oracle JDBC Driver to access the database.  For this case I have a ./lib folder where I have a local copy.  This could just as easily be $ORACLE_HOME/jdbc/lib/ojdbc6.jar to ensure using the same as the database version when on the same server.

     Since the connect and establishing a session is expensive when repeatedly issuing sql, I have kept a connection open.  The connection could be closed for any number of reasons so there's an auto-reconnect in here also by calling the getConnection before any sql exec.   Before someone asks: no, sqldev will not get an auto-reconnect like this.

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
require 'java'
$CLASSPATH << "lib/ojdbc6.jar"

java_import 'oracle.jdbc.OracleDriver'
java_import 'java.sql.DriverManager'

# Run sql line tools and capture the whole output as an event.
# Notes:
# * The '@source' of this event will be the sql run.
# * The '@message' of this event will be the entire stdout of the sql
#   as one event.
class LogStash::Inputs::OraSQL < LogStash::Inputs::Base

  config_name "orasql"
  milestone 2
  $currConnection = nil

  default :codec, "plain"

  # Set this to true to enable debugging on an input.
  config :debug, :validate => :boolean, :default => false, :deprecated => "This setting was never used by this plugin. It will be removed soon."

  # SQL to run. For example, "select * from emp"
  config :sql, :validate => :string, :required => true

  # dbuser to run. For example, "select * from emp"
  config :dbuser, :validate => :string, :required => true ,  :default => "/"

  # dbpass to run. For example, "select * from emp"
  config :dbpasswd, :validate => :string, :required => false , :default => ""

  # dburl to run. For example, "select * from emp"
  config :dburl, :validate => :string, :required => true , :default => "//localhost/orcl"

  # Interval to run the sql. Value is in seconds.
  config :interval, :validate => :number, :required => true , :default => 120

  def register"Registering SQL Input", :type => @type,
                 :sql => @sql, :interval => @interval)
  end # def register
  def getConnection
      if $currConnection == nil  or  ! $currConnection.isValid(100)  
           oradriver =
           DriverManager.registerDriver oradriver

           con_props =
           con_props.setProperty("user", @dbuser)
           con_props.setProperty("password", @dbpasswd )
           conn ='jdbc:oracle:thin:@' + @dburl, con_props)

           conn.auto_commit = false

           $currConnection = conn
      return $currConnection  
  end # end getConnection

  def run(queue)
    hostname = Socket.gethostname
    loop do
      start = &&"Running SQL", :sql => @sql)

      conn = getConnection

      stmt = conn.prepare_statement @sql
      rset = stmt.execute_query
      while ( )
         event =  event =
         cols = rset.getMetaData.getColumnCount
         msg = ""
         while ( i <= cols ) 
             val = rset.getString(i)
             if ( val != nil ) 
                if ( r > 0 )
                   msg = msg + ","
                event[ rset.getMetaData.getColumnName(i).downcase ] =  val
                msg = msg +  "\"" +rset.getMetaData.getColumnName(i).downcase +  "\" : \"" + val + "\"" 
             i = i + 1
         event['message'] = "{" + msg + "}"
        queue << event

      duration = - start &&"Command completed", :sql => @sql,
                                    :duration => duration)

      # Sleep for the remainder of the interval, or 0 if the duration ran
      # longer than the interval.
      sleeptime = [0, @interval - duration].max
      if sleeptime == 0
        @logger.warn("Execution ran longer than the interval. Skipping sleep.",
                     :sql => @sql, :duration => duration,
                     :interval => @interval)
    end # loop
  end # def run
end # class LogStash::Inputs::OraSQL