flink解析kafka json数据

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink解析kafka json数据

Dream-底限
hi
我这面在使用sql api解析kafka
json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors
son.fail-on-missing-field
Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

Leonard Xu
Hi,
我理解应该做不到,因为这两个format参数在format里就做的。
json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
这两个不能同时为ture,语义上就是互斥的。

Best
Leonard Xu
> 在 2020年7月21日,16:08,Dream-底限 <[hidden email]> 写道:
>
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

Dream-底限
hi
 json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃

Leonard Xu <[hidden email]> 于2020年7月21日周二 下午4:18写道:

> Hi,
> 我理解应该做不到,因为这两个format参数在format里就做的。
> json.ignore-parse-errors 是在
> format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> 这两个不能同时为ture,语义上就是互斥的。
>
> Best
> Leonard Xu
> > 在 2020年7月21日,16:08,Dream-底限 <[hidden email]> 写道:
> >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

Jark
Administrator
目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。
一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。

Best,
Jark

On Tue, 21 Jul 2020 at 18:53, Dream-底限 <[hidden email]> wrote:

> hi
>  json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃
>
> Leonard Xu <[hidden email]> 于2020年7月21日周二 下午4:18写道:
>
> > Hi,
> > 我理解应该做不到,因为这两个format参数在format里就做的。
> > json.ignore-parse-errors 是在
> > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> > 这两个不能同时为ture,语义上就是互斥的。
> >
> > Best
> > Leonard Xu
> > > 在 2020年7月21日,16:08,Dream-底限 <[hidden email]> 写道:
> > >
> > >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

Dream-底限
hi jark wu、
将解析错误数据直接打到日志里确实是比较通用的解决方案;
我现在使用flink sql对接kafka
json数据的时候,发现对json数据的解析有一些局限性,即比如我有一条数据是jsonobject,但是我在定义flink sql
connector数据类型的时候如果直接定义为string,会导致数据解析失败(当然,这个失败是正常的)
但是这会有一个局限性就是我没办法以一个string方式获取一个jsonobject数据(由于一些比较尴尬的原因就想以string方式获取jsonobject数据),查看代码发现这是jackson导致的获取失败,这个社区考虑兼容一下吗?

if (simpleTypeInfo == Types.STRING) {
   return Optional.of((mapper, jsonNode) -> jsonNode.asText());//
这里会返回空,改成下面样子兼容一下

if (simpleTypeInfo == Types.STRING) {
   return Optional.of((mapper, jsonNode) ->
jsonNode.isTextual()?jsonNode.asText():jsonNode.toString());


Jark Wu <[hidden email]> 于2020年7月21日周二 下午10:30写道:

> 目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。
> 一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。
>
> Best,
> Jark
>
> On Tue, 21 Jul 2020 at 18:53, Dream-底限 <[hidden email]> wrote:
>
> > hi
> >  json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃
> >
> > Leonard Xu <[hidden email]> 于2020年7月21日周二 下午4:18写道:
> >
> > > Hi,
> > > 我理解应该做不到,因为这两个format参数在format里就做的。
> > > json.ignore-parse-errors 是在
> > > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> > > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> > > 这两个不能同时为ture,语义上就是互斥的。
> > >
> > > Best
> > > Leonard Xu
> > > > 在 2020年7月21日,16:08,Dream-底限 <[hidden email]> 写道:
> > > >
> > > >
> > >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

Jark
Administrator
哈哈,看来是一个很通用的需求啊。 本超同学已经在1.12 中支持了这个功能了, see
https://issues.apache.org/jira/browse/FLINK-18002

Best,
Jark

On Fri, 24 Jul 2020 at 11:36, Dream-底限 <[hidden email]> wrote:

> hi jark wu、
> 将解析错误数据直接打到日志里确实是比较通用的解决方案;
> 我现在使用flink sql对接kafka
> json数据的时候,发现对json数据的解析有一些局限性,即比如我有一条数据是jsonobject,但是我在定义flink sql
> connector数据类型的时候如果直接定义为string,会导致数据解析失败(当然,这个失败是正常的)
>
> 但是这会有一个局限性就是我没办法以一个string方式获取一个jsonobject数据(由于一些比较尴尬的原因就想以string方式获取jsonobject数据),查看代码发现这是jackson导致的获取失败,这个社区考虑兼容一下吗?
>
> if (simpleTypeInfo == Types.STRING) {
>    return Optional.of((mapper, jsonNode) -> jsonNode.asText());//
> 这里会返回空,改成下面样子兼容一下
>
> if (simpleTypeInfo == Types.STRING) {
>    return Optional.of((mapper, jsonNode) ->
> jsonNode.isTextual()?jsonNode.asText():jsonNode.toString());
>
>
> Jark Wu <[hidden email]> 于2020年7月21日周二 下午10:30写道:
>
> > 目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。
> > 一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。
> >
> > Best,
> > Jark
> >
> > On Tue, 21 Jul 2020 at 18:53, Dream-底限 <[hidden email]> wrote:
> >
> > > hi
> > >  json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃
> > >
> > > Leonard Xu <[hidden email]> 于2020年7月21日周二 下午4:18写道:
> > >
> > > > Hi,
> > > > 我理解应该做不到,因为这两个format参数在format里就做的。
> > > > json.ignore-parse-errors 是在
> > > > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> > > > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> > > > 这两个不能同时为ture,语义上就是互斥的。
> > > >
> > > > Best
> > > > Leonard Xu
> > > > > 在 2020年7月21日,16:08,Dream-底限 <[hidden email]> 写道:
> > > > >
> > > > >
> > > >
> > >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink解析kafka json数据

wxpcc
改用csv用一个很不常用的分隔符去获取是可以的,比如/u0005

Get Outlook for Android<https://aka.ms/ghei36>