如何用kafka connect做分布式日志收集?

日志文件轮转存放到本地,需要监听日志文件的变化,如log -> log-2018-12-20,但是log这个软连接会变动,下一天会变成log -> log-2018-12-21

  1. 能监测到文件的变化,同时能读到软连接变化后的新的文件
  2. 能过滤需要的行

用kafka connector能做到吗?






发表于: 2月前   最后更新时间: 2月前   游览量:331
上一条: 到头了!
下一条: 已经是最后了!

  • 加个logstash做过滤吧。
    • 之前的方案是用flume来做过滤,但是现在的考虑是去掉多余的技术栈,想用kafka直接实现,有scala/java的例子吗?
      ```shell

      import scala.sys.process._

      def someProcessing(line: String): Unit = {
        // do whatever you want with that line
        print("[just read this line] ")
        println(line)
      }

      // the file to read
      val file = "mylogfile.txt"

      // the process to start
      val tail = Seq("tail", "-f", file)

      // continuously read lines from tail -f
      tail.lineStream.foreach(someProcessing) 
      // careful: this never returns (unless tail is externally killed)
      ````

      这是我在stackoverflow上看到的,但是这个不能被打断恢复。