@@ -20,6 +20,7 @@ import (
2020 "github.com/robfig/cron"
2121 elasticV3 "gopkg.in/olivere/elastic.v3"
2222 elasticV5 "gopkg.in/olivere/elastic.v5"
23+ elasticV7 "gopkg.in/olivere/elastic.v7"
2324
2425 "github.com/qiniu/log"
2526
@@ -89,6 +90,7 @@ type Reader struct {
8990 elasticV3Client * elasticV3.Client
9091 elasticV5Client * elasticV5.Client
9192 elasticV6Client * elasticV6.Client
93+ elasticV7Client * elasticV7.Client
9294}
9395
9496func init () {
@@ -132,7 +134,22 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
132134 var elasticV3Client * elasticV3.Client
133135 var elasticV5Client * elasticV5.Client
134136 var elasticV6Client * elasticV6.Client
137+ var elasticV7Client * elasticV7.Client
135138 switch esVersion {
139+ case ElasticVersion7 :
140+ optFns := []elasticV7.ClientOptionFunc {
141+ elasticV7 .SetHealthcheck (false ),
142+ elasticV7 .SetURL (eshost ),
143+ }
144+
145+ if len (authUsername ) > 0 && len (authPassword ) > 0 {
146+ optFns = append (optFns , elasticV7 .SetBasicAuth (authUsername , authPassword ))
147+ }
148+
149+ elasticV7Client , err = elasticV7 .NewClient (optFns ... )
150+ if err != nil {
151+ return nil , err
152+ }
136153 case ElasticVersion6 :
137154 optFns := []elasticV6.ClientOptionFunc {
138155 elasticV6 .SetHealthcheck (false ),
@@ -205,6 +222,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
205222 elasticV3Client : elasticV3Client ,
206223 elasticV5Client : elasticV5Client ,
207224 elasticV6Client : elasticV6Client ,
225+ elasticV7Client : elasticV7Client ,
208226 }
209227 if len (cronSched ) > 0 {
210228 cronSched = strings .ToLower (cronSched )
@@ -325,12 +343,42 @@ func (r *Reader) getIndexShift() string {
325343
326344// 循环读取默认间隔时间3s,只支持全量读取,不支持offset字段
327345func (r * Reader ) execWithLoop () error {
346+ defer func () {
347+ if rec := recover (); rec != nil {
348+ log .Errorf ("Runner[%v] recover when exec with loop\n panic: %v\n stack: %s" , r .meta .RunnerName , rec , debug .Stack ())
349+ }
350+ }()
328351 var index = r .esindex
329352 if r .dateShift {
330353 index = r .getIndexShift ()
331354 }
332355 // Create a client
333356 switch r .esVersion {
357+ case ElasticVersion7 :
358+ scroll := r .elasticV7Client .Scroll (index ).Size (r .readBatch ).KeepAlive (r .keepAlive )
359+ if r .estype != "" {
360+ scroll = scroll .Type (r .estype )
361+ }
362+ for {
363+ results , err := scroll .ScrollId (r .offset ).Do (context .Background ())
364+ if err == io .EOF {
365+ return nil // all results retrieved
366+ }
367+ if err != nil {
368+ return err // something went wrong
369+ }
370+
371+ // Send the hits to the hits channel
372+ for _ , hit := range results .Hits .Hits {
373+ r .readChan <- Record {
374+ data : hit .Source ,
375+ }
376+ }
377+ r .offset = results .ScrollId
378+ if r .isStopping () || r .hasStopped () {
379+ return nil
380+ }
381+ }
334382 case ElasticVersion6 :
335383 scroll := r .elasticV6Client .Scroll (index ).Size (r .readBatch ).KeepAlive (r .keepAlive )
336384 if r .estype != "" {
@@ -414,12 +462,51 @@ func (r *Reader) execWithLoop() error {
414462
415463// 定时读取,支持增量读取,需要指定具有自增属性的offset字段
416464func (r * Reader ) execWithCron () error {
465+ defer func () {
466+ if rec := recover (); rec != nil {
467+ log .Errorf ("Runner[%v] recover when exec with cron\n panic: %v\n stack: %s" , r .meta .RunnerName , rec , debug .Stack ())
468+ }
469+ }()
417470 var index = r .esindex
418471 if r .dateShift {
419472 index = r .getIndexShift ()
420473 }
421474 // Create a client
422475 switch r .esVersion {
476+ case ElasticVersion7 :
477+ var rangeQuery * elasticV7.RangeQuery
478+ if r .cronOffsetValueIsValid {
479+ rangeQuery = elasticV7 .NewRangeQuery (r .cronOffsetKey ).Gte (r .cronOffsetValue )
480+ } else {
481+ rangeQuery = elasticV7 .NewRangeQuery (r .cronOffsetKey )
482+ }
483+ scroll := r .elasticV7Client .Scroll (index ).Query (rangeQuery ).Size (r .readBatch ).KeepAlive (r .keepAlive )
484+ if r .estype != "" {
485+ scroll = scroll .Type (r .estype )
486+ }
487+ for {
488+ results , err := scroll .ScrollId (r .offset ).Do (context .Background ())
489+ if err == io .EOF {
490+ return nil // all results retrieved
491+ }
492+ if err != nil {
493+ return err // something went wrong
494+ }
495+
496+ // Send the hits to the hits channel
497+ for _ , hit := range results .Hits .Hits {
498+ m := make (map [string ]interface {})
499+ jsoniter .Unmarshal (hit .Source , & m )
500+ r .readChan <- Record {
501+ data : hit .Source ,
502+ cronOffset : m [r .cronOffsetKey ],
503+ }
504+ }
505+ r .offset = results .ScrollId
506+ if r .isStopping () || r .hasStopped () {
507+ return nil
508+ }
509+ }
423510 case ElasticVersion6 :
424511 var rangeQuery * elasticV6.RangeQuery
425512 if r .cronOffsetValueIsValid {
@@ -637,6 +724,9 @@ func (r *Reader) Close() error {
637724 if r .elasticV6Client != nil {
638725 r .elasticV6Client .Stop ()
639726 }
727+ if r .elasticV7Client != nil {
728+ r .elasticV7Client .Stop ()
729+ }
640730
641731 // 如果此时没有 routine 正在运行,则在此处关闭数据管道,否则由 routine 在退出时负责关闭
642732 if atomic .CompareAndSwapInt32 (& r .routineStatus , StatusInit , StatusStopping ) {
0 commit comments