Skip to content

Commit 4904a18

Browse files
committed
refactor(minio): adopt x/minio pacakge
1 parent c940fa0 commit 4904a18

35 files changed

+555
-842
lines changed

cmd/main/main.go

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,13 @@ import (
3030

3131
"github.com/instill-ai/pipeline-backend/config"
3232
"github.com/instill-ai/pipeline-backend/pkg/acl"
33-
"github.com/instill-ai/pipeline-backend/pkg/external"
3433
"github.com/instill-ai/pipeline-backend/pkg/handler"
3534
"github.com/instill-ai/pipeline-backend/pkg/memory"
3635
"github.com/instill-ai/pipeline-backend/pkg/middleware"
3736
"github.com/instill-ai/pipeline-backend/pkg/pubsub"
3837
"github.com/instill-ai/pipeline-backend/pkg/repository"
3938
"github.com/instill-ai/pipeline-backend/pkg/service"
4039
"github.com/instill-ai/pipeline-backend/pkg/usage"
41-
"github.com/instill-ai/x/temporal"
4240

4341
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
4442
database "github.com/instill-ai/pipeline-backend/pkg/db"
@@ -53,6 +51,7 @@ import (
5351
otelx "github.com/instill-ai/x/otel"
5452
servergrpcx "github.com/instill-ai/x/server/grpc"
5553
gatewayx "github.com/instill-ai/x/server/grpc/gateway"
54+
temporalx "github.com/instill-ai/x/temporal"
5655
)
5756

5857
const gracefulShutdownWaitPeriod = 15 * time.Second
@@ -120,16 +119,28 @@ func main() {
120119
// Initialize all clients
121120
pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient,
122121
artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db,
123-
minIOClient, minIOFileGetter, aclClient, temporalClient, closeClients := newClients(ctx, logger)
122+
aclClient, temporalClient, closeClients := newClients(ctx, logger)
124123
defer closeClients()
125124

126-
// Keep NewArtifactBinaryFetcher as requested
127-
binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter)
125+
// Initialize MinIO client
126+
minIOParams := miniox.ClientParams{
127+
Config: config.Config.Minio,
128+
Logger: logger,
129+
ExpiryRules: service.NewRetentionHandler().ListExpiryRules(),
130+
AppInfo: miniox.AppInfo{
131+
Name: serviceName,
132+
Version: serviceVersion,
133+
},
134+
}
135+
136+
minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
137+
if err != nil {
138+
logger.Fatal("failed to create MinIO client", zap.Error(err))
139+
}
128140

129141
compStore := componentstore.Init(componentstore.InitParams{
130142
Logger: logger,
131143
Secrets: config.Config.Component.Secrets,
132-
BinaryFetcher: binaryFetcher,
133144
TemporalClient: temporalClient,
134145
})
135146

@@ -156,7 +167,7 @@ func main() {
156167
compStore,
157168
ms,
158169
service.NewRetentionHandler(),
159-
binaryFetcher,
170+
compStore.GetBinaryFetcher(),
160171
artifactPublicServiceClient,
161172
artifactPrivateServiceClient,
162173
)
@@ -368,8 +379,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
368379
artifactpb.ArtifactPrivateServiceClient,
369380
*redis.Client,
370381
*gorm.DB,
371-
miniox.Client,
372-
*miniox.FileGetter,
373382
acl.ACLClient,
374383
temporalclient.Client,
375384
func(),
@@ -437,7 +446,7 @@ func newClients(ctx context.Context, logger *zap.Logger) (
437446
}
438447

439448
// Initialize Temporal client
440-
temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger)
449+
temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger)
441450
if err != nil {
442451
logger.Fatal("Unable to build Temporal client options", zap.Error(err))
443452
}
@@ -484,27 +493,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
484493

485494
aclClient := acl.NewACLClient(fgaClient, fgaReplicaClient, redisClient)
486495

487-
// Initialize MinIO client
488-
minIOParams := miniox.ClientParams{
489-
Config: config.Config.Minio,
490-
Logger: logger,
491-
AppInfo: miniox.AppInfo{
492-
Name: serviceName,
493-
Version: serviceVersion,
494-
},
495-
}
496-
minIOFileGetter, err := miniox.NewFileGetter(minIOParams)
497-
if err != nil {
498-
logger.Fatal("Failed to create MinIO file getter", zap.Error(err))
499-
}
500-
501-
retentionHandler := service.NewRetentionHandler()
502-
minIOParams.ExpiryRules = retentionHandler.ListExpiryRules()
503-
minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
504-
if err != nil {
505-
logger.Fatal("failed to create MinIO client", zap.Error(err))
506-
}
507-
508496
closer := func() {
509497
for conn, fn := range closeFuncs {
510498
if err := fn(); err != nil {
@@ -515,5 +503,5 @@ func newClients(ctx context.Context, logger *zap.Logger) (
515503

516504
return pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient,
517505
artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db,
518-
minIOClient, minIOFileGetter, aclClient, temporalClient, closer
506+
aclClient, temporalClient, closer
519507
}

cmd/worker/main.go

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,11 @@ import (
2121
temporalclient "go.temporal.io/sdk/client"
2222

2323
"github.com/instill-ai/pipeline-backend/config"
24-
"github.com/instill-ai/pipeline-backend/pkg/external"
2524
"github.com/instill-ai/pipeline-backend/pkg/memory"
2625
"github.com/instill-ai/pipeline-backend/pkg/pubsub"
2726
"github.com/instill-ai/pipeline-backend/pkg/repository"
2827
"github.com/instill-ai/pipeline-backend/pkg/service"
2928
"github.com/instill-ai/x/client"
30-
"github.com/instill-ai/x/minio"
31-
"github.com/instill-ai/x/temporal"
3229

3330
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
3431
database "github.com/instill-ai/pipeline-backend/pkg/db"
@@ -37,7 +34,9 @@ import (
3734
pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta"
3835
clientgrpcx "github.com/instill-ai/x/client/grpc"
3936
logx "github.com/instill-ai/x/log"
37+
miniox "github.com/instill-ai/x/minio"
4038
otelx "github.com/instill-ai/x/otel"
39+
temporalx "github.com/instill-ai/x/temporal"
4140
)
4241

4342
const gracefulShutdownWaitPeriod = 15 * time.Second
@@ -83,16 +82,27 @@ func main() {
8382

8483
// Initialize all clients
8584
pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient,
86-
redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closeClients := newClients(ctx, logger)
85+
redisClient, db, temporalClient, timeseries, closeClients := newClients(ctx, logger)
8786
defer closeClients()
8887

89-
// Keep NewArtifactBinaryFetcher as requested
90-
binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter)
88+
minIOParams := miniox.ClientParams{
89+
Config: config.Config.Minio,
90+
Logger: logger,
91+
ExpiryRules: service.NewRetentionHandler().ListExpiryRules(),
92+
AppInfo: miniox.AppInfo{
93+
Name: serviceName,
94+
Version: serviceVersion,
95+
},
96+
}
97+
98+
minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
99+
if err != nil {
100+
logger.Fatal("failed to create MinIO client", zap.Error(err))
101+
}
91102

92103
compStore := componentstore.Init(componentstore.InitParams{
93104
Logger: logger,
94105
Secrets: config.Config.Component.Secrets,
95-
BinaryFetcher: binaryFetcher,
96106
TemporalClient: temporalClient,
97107
})
98108

@@ -111,7 +121,7 @@ func main() {
111121
MemoryStore: ms,
112122
ArtifactPublicServiceClient: artifactPublicServiceClient,
113123
ArtifactPrivateServiceClient: artifactPrivateServiceClient,
114-
BinaryFetcher: binaryFetcher,
124+
BinaryFetcher: compStore.GetBinaryFetcher(),
115125
PipelinePublicServiceClient: pipelinePublicServiceClient,
116126
},
117127
)
@@ -195,8 +205,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
195205
artifactpb.ArtifactPrivateServiceClient,
196206
*redis.Client,
197207
*gorm.DB,
198-
minio.Client,
199-
*minio.FileGetter,
200208
temporalclient.Client,
201209
*repository.InfluxDB,
202210
func(),
@@ -257,7 +265,7 @@ func newClients(ctx context.Context, logger *zap.Logger) (
257265
}
258266

259267
// Initialize Temporal client
260-
temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger)
268+
temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger)
261269
if err != nil {
262270
logger.Fatal("Unable to build Temporal client options", zap.Error(err))
263271
}
@@ -283,27 +291,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
283291
return nil
284292
}
285293

286-
// Initialize MinIO client
287-
minIOParams := minio.ClientParams{
288-
Config: config.Config.Minio,
289-
Logger: logger,
290-
AppInfo: minio.AppInfo{
291-
Name: serviceName,
292-
Version: serviceVersion,
293-
},
294-
}
295-
minIOFileGetter, err := minio.NewFileGetter(minIOParams)
296-
if err != nil {
297-
logger.Fatal("Failed to create MinIO file getter", zap.Error(err))
298-
}
299-
300-
retentionHandler := service.NewRetentionHandler()
301-
minIOParams.ExpiryRules = retentionHandler.ListExpiryRules()
302-
minIOClient, err := minio.NewMinIOClientAndInitBucket(ctx, minIOParams)
303-
if err != nil {
304-
logger.Fatal("failed to create MinIO client", zap.Error(err))
305-
}
306-
307294
closer := func() {
308295
for conn, fn := range closeFuncs {
309296
if err := fn(); err != nil {
@@ -313,5 +300,5 @@ func newClients(ctx context.Context, logger *zap.Logger) (
313300
}
314301

315302
return pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient,
316-
redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closer
303+
redisClient, db, temporalClient, timeseries, closer
317304
}

config/config.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"github.com/knadh/koanf/providers/file"
1515
"github.com/redis/go-redis/v9"
1616

17-
"github.com/instill-ai/x/client"
18-
"github.com/instill-ai/x/minio"
19-
"github.com/instill-ai/x/temporal"
17+
clientx "github.com/instill-ai/x/client"
18+
miniox "github.com/instill-ai/x/minio"
19+
temporalx "github.com/instill-ai/x/temporal"
2020
)
2121

2222
const (
@@ -31,20 +31,20 @@ var Config AppConfig
3131

3232
// AppConfig defines
3333
type AppConfig struct {
34-
Server ServerConfig `koanf:"server"`
35-
Component ComponentConfig `koanf:"component"`
36-
Database DatabaseConfig `koanf:"database"`
37-
InfluxDB InfluxDBConfig `koanf:"influxdb"`
38-
Temporal temporal.ClientConfig `koanf:"temporal"`
39-
Cache CacheConfig `koanf:"cache"`
40-
OTELCollector OTELCollectorConfig `koanf:"otelcollector"`
41-
MgmtBackend client.ServiceConfig `koanf:"mgmtbackend"`
42-
ModelBackend client.ServiceConfig `koanf:"modelbackend"`
43-
OpenFGA OpenFGAConfig `koanf:"openfga"`
44-
ArtifactBackend client.ServiceConfig `koanf:"artifactbackend"`
45-
Minio minio.Config `koanf:"minio"`
46-
AgentBackend client.ServiceConfig `koanf:"agentbackend"`
47-
APIGateway APIGatewayConfig `koanf:"apigateway"`
34+
Server ServerConfig `koanf:"server"`
35+
Component ComponentConfig `koanf:"component"`
36+
Database DatabaseConfig `koanf:"database"`
37+
InfluxDB InfluxDBConfig `koanf:"influxdb"`
38+
Temporal temporalx.ClientConfig `koanf:"temporal"`
39+
Cache CacheConfig `koanf:"cache"`
40+
OTELCollector OTELCollectorConfig `koanf:"otelcollector"`
41+
MgmtBackend clientx.ServiceConfig `koanf:"mgmtbackend"`
42+
ModelBackend clientx.ServiceConfig `koanf:"modelbackend"`
43+
OpenFGA OpenFGAConfig `koanf:"openfga"`
44+
ArtifactBackend clientx.ServiceConfig `koanf:"artifactbackend"`
45+
Minio miniox.Config `koanf:"minio"`
46+
AgentBackend clientx.ServiceConfig `koanf:"agentbackend"`
47+
APIGateway APIGatewayConfig `koanf:"apigateway"`
4848
}
4949

5050
// APIGatewayConfig related to API gateway

config/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ minio:
8181
port: 9000
8282
user: minioadmin
8383
password: minioadmin
84-
bucketname: instill-ai-vdp
84+
bucketname: core-pipeline
8585
secure: false
8686
agentbackend:
8787
host: agent-backend

pkg/component/base/component.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import (
2020
temporalclient "go.temporal.io/sdk/client"
2121

2222
"github.com/instill-ai/pipeline-backend/pkg/component/internal/jsonref"
23+
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
2324
"github.com/instill-ai/pipeline-backend/pkg/data/format"
24-
"github.com/instill-ai/pipeline-backend/pkg/external"
2525

2626
pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta"
2727
)
@@ -161,14 +161,15 @@ type IdentifierResult struct {
161161
}
162162

163163
// Component implements the common component methods.
164+
164165
type Component struct {
165166
Logger *zap.Logger
166167
NewUsageHandler UsageHandlerCreator
167168

168169
definition *pipelinepb.ComponentDefinition
169170
secretFields []string
170171

171-
BinaryFetcher external.BinaryFetcher
172+
BinaryFetcher binary.Fetcher
172173
TemporalClient temporalclient.Client
173174
}
174175

pkg/component/operator/audio/v0/task_detect_activity_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/instill-ai/pipeline-backend/pkg/component/base"
2020
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
2121
"github.com/instill-ai/pipeline-backend/pkg/data"
22-
"github.com/instill-ai/pipeline-backend/pkg/external"
22+
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
2323
)
2424

2525
func TestDetectActivity(t *testing.T) {
@@ -125,7 +125,7 @@ func TestDetectActivity(t *testing.T) {
125125
jsonValue, err := data.NewJSONValue(segmentsMap)
126126
c.Assert(err, qt.IsNil)
127127

128-
binaryFetcher := external.NewBinaryFetcher()
128+
binaryFetcher := binary.NewFetcher()
129129
unmarshaler := data.NewUnmarshaler(binaryFetcher)
130130
c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &expectedSegmentsStruct), qt.IsNil)
131131
expectedSegments := expectedSegmentsStruct.Segments

pkg/component/operator/audio/v0/task_segment_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/instill-ai/pipeline-backend/pkg/component/base"
1313
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
1414
"github.com/instill-ai/pipeline-backend/pkg/data"
15-
"github.com/instill-ai/pipeline-backend/pkg/external"
15+
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
1616
)
1717

1818
func TestSegment(t *testing.T) {
@@ -68,7 +68,7 @@ func TestSegment(t *testing.T) {
6868
jsonValue, err := data.NewJSONValue(segmentsMap)
6969
c.Assert(err, qt.IsNil)
7070

71-
binaryFetcher := external.NewBinaryFetcher()
71+
binaryFetcher := binary.NewFetcher()
7272
unmarshaler := data.NewUnmarshaler(binaryFetcher)
7373
c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &segmentsStruct), qt.IsNil)
7474
segments := segmentsStruct.Segments

0 commit comments

Comments
 (0)