diff --git a/README.md b/README.md index e8f1563..6904eff 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,11 @@ docker compose -f docker-compose.dev.yml up --build # Run database migrations docker exec -i postgres-db psql -U admin -d taskmaster < db/schema.sql + +#connect to postgres cli +docker exec -it postgres-db psql -U admin -d taskmaster + + # Run tests go test ./... -v ``` @@ -18,6 +23,7 @@ go test ./... -v ## 📋 Implementation Progress ### 1. Core Infrastructure ✅ + - [x] Project structure and module setup - [x] PostgreSQL database integration - [x] Redis for state management @@ -30,6 +36,7 @@ go test ./... -v ### 2. Job Processing System 🚧 #### Basic Features ✅ + - [x] REST API endpoints (Fiber) - [x] Job creation and storage - [x] Kafka producer implementation @@ -38,6 +45,7 @@ go test ./... -v - [x] Simple retry mechanism #### Core Processing Features 🚧 + - [ ] Job type registry system - [ ] Payload validation - [ ] Configurable retry policies @@ -50,6 +58,7 @@ go test ./... -v - [ ] Result storage #### Advanced Processing Features 🚧 + - [ ] Distributed locking - [ ] Job batching - [ ] Workflow engine @@ -62,6 +71,7 @@ go test ./... -v ### 3. Developer Experience 🚧 #### Documentation & Tools + - [ ] Job type documentation - [ ] Debugging tools - [ ] Testing/simulation tools @@ -70,6 +80,7 @@ go test ./... -v - [ ] Hooks/middleware system #### Web Dashboard + - [ ] React + Tailwind UI - [ ] Real-time updates (WebSocket) - [ ] Job filtering and search @@ -81,6 +92,7 @@ go test ./... -v ### 4. Operations & Monitoring 🚧 #### Observability + - [x] Structured logging (slog) - [ ] Prometheus metrics - [ ] Grafana dashboards @@ -89,6 +101,7 @@ go test ./... -v - [ ] Resource monitoring #### Operational Tools + - [ ] Job archival - [ ] Cleanup policies - [ ] Audit logging @@ -100,6 +113,7 @@ go test ./... -v ### 5. Production Deployment 🚧 #### Infrastructure + - [ ] Kubernetes setup - [ ] Helm charts - [ ] Worker auto-scaling @@ -108,6 +122,7 @@ go test ./... -v - [ ] Blue-green deployments #### Cloud Integration + - [ ] Terraform configurations - [ ] AWS/GCP deployment - [ ] Cost optimization @@ -116,17 +131,21 @@ go test ./... -v ## 🔄 System Architecture ### Components + 1. **API Service** + - REST/gRPC endpoints - Request validation - Job creation & queuing 2. **Message Queue** + - Kafka-based processing - Job distribution - Order guarantee 3. **Worker Service** + - Job execution - Status management - Error handling @@ -137,6 +156,7 @@ go test ./... -v - Kafka: Message queue ### Basic Job Flow + 1. Submit job via API 2. Store in PostgreSQL 3. Queue in Kafka @@ -147,11 +167,13 @@ go test ./... -v ## 🛠 Development ### Prerequisites + - Go 1.23+ - Docker & Docker Compose - Make (optional) ### Environment Setup + ```bash DATABASE_URL=postgres://admin:admin@postgres-db:5432/taskmaster?sslmode=disable KAFKA_BROKER=kafka:9092 @@ -160,6 +182,7 @@ JWT_SECRET=supersecretkey ``` ### API Examples + ```bash # Authentication POST /api/login @@ -184,6 +207,7 @@ GET /api/jobs ``` ## 📝 Contributing + 1. Fork repository 2. Create feature branch 3. Commit changes @@ -191,4 +215,5 @@ GET /api/jobs 5. Open Pull Request ## 📄 License -MIT License \ No newline at end of file + +MIT License diff --git a/db/schema.sql b/db/schema.sql index 3e5ac26..b53245e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -4,5 +4,6 @@ CREATE TABLE IF NOT EXISTS jobs ( status TEXT CHECK (status IN ('pending', 'processing', 'completed', 'failed')) DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, type TEXT NOT NULL, - payload JSON + payload JSON, + response JSON ); \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 80904e0..d95a711 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -1,4 +1,4 @@ -version: '3.9' +version: "3.9" services: postgres: @@ -73,6 +73,8 @@ services: REDIS_ADDR: ${REDIS_ADDR} GO_ENV: ${GO_ENV} JWT_SECRET: ${JWT_SECRET} + GEMINI_API_KEY: ${GEMINI_API_KEY} + LLAMA_API_KEY: ${LLAMA_API_KEY} networks: - app-network-dev depends_on: @@ -91,6 +93,8 @@ services: KAFKA_BROKER: ${KAFKA_BROKER} REDIS_ADDR: ${REDIS_ADDR} GO_ENV: ${GO_ENV} + GEMINI_API_KEY: ${GEMINI_API_KEY} + LLAMA_API_KEY: ${LLAMA_API_KEY} restart: on-failure networks: - app-network-dev @@ -103,4 +107,4 @@ networks: volumes: pgdata-dev: - go-modules: \ No newline at end of file + go-modules: diff --git a/go.mod b/go.mod index 6bfebe5..8409863 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,12 @@ require ( ) require ( + cloud.google.com/go v0.115.0 // indirect + cloud.google.com/go/ai v0.8.0 // indirect + cloud.google.com/go/auth v0.6.0 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect + cloud.google.com/go/compute/metadata v0.3.0 // indirect + cloud.google.com/go/longrunning v0.5.7 // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -26,9 +32,18 @@ require ( github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/generative-ai-go v0.19.0 // indirect + github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.5 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -58,9 +73,24 @@ require ( github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect + go.opentelemetry.io/otel v1.26.0 // indirect + go.opentelemetry.io/otel/metric v1.26.0 // indirect + go.opentelemetry.io/otel/trace v1.26.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/api v0.186.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect + google.golang.org/grpc v1.64.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5353dbf..3f0b136 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,19 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14= +cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU= +cloud.google.com/go/ai v0.8.0 h1:rXUEz8Wp2OlrM8r1bfmpF2+VKqc1VJpafE3HgzRnD/w= +cloud.google.com/go/ai v0.8.0/go.mod h1:t3Dfk4cM61sytiggo2UyGsDVW3RF1qGZaUKDrZFyqkE= +cloud.google.com/go/auth v0.6.0 h1:5x+d6b5zdezZ7gmLWD1m/xNjnaQ2YDhmIz/HH3doy1g= +cloud.google.com/go/auth v0.6.0/go.mod h1:b4acV+jLQDyjwm4OXHYjNvRi4jvGBzHWJRtJcy+2P4g= +cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= +cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuAKilhU= +cloud.google.com/go/longrunning v0.5.7/go.mod h1:8GClkudohy1Fxm3owmBGid8W0pSgodEMwEAztp38Xng= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= @@ -15,8 +29,11 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -29,8 +46,19 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/gofiber/fiber/v2 v2.45.0/go.mod h1:DNl0/c37WLe0g92U6lx1VMQuxGUQY5V7EIaVoEsUffc= @@ -42,11 +70,42 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/generative-ai-go v0.19.0 h1:R71szggh8wHMCUlEMsW2A/3T+5LdEIkiaHSYgSpUgdg= +github.com/google/generative-ai-go v0.19.0/go.mod h1:JYolL13VG7j79kM5BtHz4qwONHkeJQzOCkKXnpqtS/E= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= +github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= +github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA= +github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -103,6 +162,7 @@ github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= @@ -148,6 +208,18 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 h1:A3SayB3rNyt+1S6qpI9mHPkeHTZbD7XILEqWnYZb2l0= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0/go.mod h1:27iA5uvhuRNmalO+iEUdVn5ZMj2qy10Mm+XRIpRmyuU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc= +go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= +go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= +go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= +go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= +go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= +go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -156,14 +228,23 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= @@ -172,12 +253,18 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -203,7 +290,15 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= @@ -211,7 +306,37 @@ golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/api v0.186.0 h1:n2OPp+PPXX0Axh4GuSsL5QL8xQCTb2oDwyzPnQvqUug= +google.golang.org/api v0.186.0/go.mod h1:hvRbBmgoje49RV3xqVXrmP6w93n6ehGgIVPYrGtBFFc= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc= +google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= +google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -219,3 +344,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/api/server.go b/internal/api/server.go index 21d1df1..9214311 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -173,7 +173,7 @@ func (s *Server) handleGetJob(c *fiber.Ctx) error { } var job models.Job - query := "SELECT id, name, status, type FROM jobs WHERE id = $1" + query := "SELECT id, name, status, type, response FROM jobs WHERE id = $1" err = s.db.DB.Get(&job, query, jobID) if err != nil { return c.Status(fiber.StatusNotFound).JSON(fiber.Map{ diff --git a/internal/jobs/jobs.go b/internal/jobs/jobs.go index 0b38e6d..0876c8b 100644 --- a/internal/jobs/jobs.go +++ b/internal/jobs/jobs.go @@ -2,8 +2,34 @@ package jobs import ( "context" + + "github.com/illegalcall/task-master/pkg/database" ) +// DB holds the database clients for the jobs package +type DB struct { + Clients *database.Clients +} + +// Global instance of the database clients +var db *DB + +// InitDB initializes the database clients for the jobs package +func InitDB(clients *database.Clients) { + db = &DB{ + Clients: clients, + } +} + +// GetDB returns the global database instance +func GetDB() *DB { + return db +} + +//connect to the database + + + // Result represents the outcome of a job execution type Result struct { // Data contains the job result data diff --git a/internal/jobs/parsedocument.go b/internal/jobs/parsedocument.go index 32b475a..875016e 100644 --- a/internal/jobs/parsedocument.go +++ b/internal/jobs/parsedocument.go @@ -14,7 +14,7 @@ type ParseDocumentPayload struct { // DocumentType indicates the source type ("path", "url", or "base64") DocumentType string `json:"documentType"` // OutputSchema defines the expected JSON structure for the parsed result - OutputSchema map[string]interface{} `json:"outputSchema"` + OutputSchema string`json:"expected_schema"` // Description provides additional context to guide the LLM during parsing Description string `json:"description"` // Options contains optional parsing parameters @@ -51,7 +51,7 @@ func (p *ParseDocumentPayload) Validate() error { } // Validate output schema - if p.OutputSchema == nil || len(p.OutputSchema) == 0 { + if p.OutputSchema == "" || len(p.OutputSchema) == 0 { return errors.New("outputSchema is required") } @@ -123,121 +123,4 @@ func ValidateWithGJSON(payload []byte) error { } return nil -} - -// CreateSamplePayloads creates example payloads for testing -func CreateSamplePayloads() map[string]ParseDocumentPayload { - samples := make(map[string]ParseDocumentPayload) - - // Sample 1: Invoice PDF - samples["invoice"] = ParseDocumentPayload{ - Document: "/path/to/invoice.pdf", - DocumentType: "path", - OutputSchema: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "invoiceNumber": map[string]interface{}{"type": "string"}, - "date": map[string]interface{}{"type": "string"}, - "vendor": map[string]interface{}{"type": "string"}, - "total": map[string]interface{}{"type": "number"}, - "items": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "description": map[string]interface{}{"type": "string"}, - "quantity": map[string]interface{}{"type": "number"}, - "unitPrice": map[string]interface{}{"type": "number"}, - "amount": map[string]interface{}{"type": "number"}, - }, - }, - }, - }, - }, - Description: "Extract invoice details including invoice number, date, vendor name, total amount, and line items.", - Options: ParseOptions{ - Language: "en", - OCREnabled: true, - ConfidenceThreshold: 0.7, - }, - } - - // Sample 2: Resume PDF - samples["resume"] = ParseDocumentPayload{ - Document: "https://example.com/resume.pdf", - DocumentType: "url", - OutputSchema: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "name": map[string]interface{}{"type": "string"}, - "email": map[string]interface{}{"type": "string"}, - "phone": map[string]interface{}{"type": "string"}, - "summary": map[string]interface{}{"type": "string"}, - "experience": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "company": map[string]interface{}{"type": "string"}, - "position": map[string]interface{}{"type": "string"}, - "startDate": map[string]interface{}{"type": "string"}, - "endDate": map[string]interface{}{"type": "string"}, - "description": map[string]interface{}{"type": "string"}, - }, - }, - }, - "education": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "institution": map[string]interface{}{"type": "string"}, - "degree": map[string]interface{}{"type": "string"}, - "year": map[string]interface{}{"type": "string"}, - }, - }, - }, - "skills": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{"type": "string"}, - }, - }, - }, - Description: "Extract candidate information from resume including personal details, work experience, education, and skills.", - Options: ParseOptions{ - Language: "en", - OCREnabled: false, - }, - } - - // Sample 3: Contract PDF - samples["contract"] = ParseDocumentPayload{ - Document: "base64encodedpdfcontent...", - DocumentType: "base64", - OutputSchema: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "contractTitle": map[string]interface{}{"type": "string"}, - "parties": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}}, - "effectiveDate": map[string]interface{}{"type": "string"}, - "terminationDate": map[string]interface{}{"type": "string"}, - "paymentTerms": map[string]interface{}{"type": "string"}, - "deliverables": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}}, - "specialClauses": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}}, - "confidentiality": map[string]interface{}{"type": "string"}, - "disputeHandling": map[string]interface{}{"type": "string"}, - "governingLaw": map[string]interface{}{"type": "string"}, - "signatoryDetails": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}}, - }, - }, - Description: "Extract key contract terms and conditions including parties, effective dates, payment terms, deliverables, and special clauses.", - Options: ParseOptions{ - Language: "en", - OCREnabled: true, - ConfidenceThreshold: 0.8, - MaxPages: 10, - }, - } - - return samples -} \ No newline at end of file +} \ No newline at end of file diff --git a/internal/jobs/parsedocument_handler.go b/internal/jobs/parsedocument_handler.go index f0f5d8c..fb6be8e 100644 --- a/internal/jobs/parsedocument_handler.go +++ b/internal/jobs/parsedocument_handler.go @@ -9,12 +9,20 @@ import ( "fmt" "io" "io/ioutil" + "log/slog" + "mime/multipart" "net/http" "os" "strings" "time" + + "github.com/google/generative-ai-go/genai" + "google.golang.org/api/option" ) + +var llamaCloudAPIKey = os.Getenv("LLAMA_API_KEY") + // Make these functions variables so they can be mocked in tests var ( ExtractPDFText = extractPDFTextImpl @@ -23,14 +31,15 @@ var ( // GeminiClient is an interface for the Gemini LLM service type GeminiClient interface { - GenerateContent(ctx context.Context, text string, schema map[string]interface{}, description string) ([]byte, error) + GenerateContent(ctx context.Context, text string, schema string, description string) ([]byte, error) } -// HTTPGeminiClient implements the GeminiClient interface using HTTP requests +// HTTPGeminiClient implements the GeminiClient interface using the official genai package type HTTPGeminiClient struct { - apiKey string + client *genai.Client + model *genai.GenerativeModel // Optional function for testing/mocking - generateContentFunc func(ctx context.Context, text string, schema map[string]interface{}, description string) ([]byte, error) + generateContentFunc func(ctx context.Context, text string, schema string, description string) ([]byte, error) } // GeminiRequest represents a request to the Gemini API @@ -63,18 +72,29 @@ type GeminiCandidate struct { // newGeminiClientImpl creates a new Gemini client using the API key from environment variables func newGeminiClientImpl(ctx context.Context) (*HTTPGeminiClient, error) { apiKey := os.Getenv("GEMINI_API_KEY") + slog.Info("GEMINI_API_KEY", "apiKey", apiKey) if apiKey == "" { return nil, errors.New("GEMINI_API_KEY environment variable is not set") } + client, err := genai.NewClient(ctx, option.WithAPIKey(apiKey)) + if err != nil { + return nil, fmt.Errorf("failed to create genai client: %w", err) + } + + model := client.GenerativeModel("gemini-2.0-flash") + return &HTTPGeminiClient{ - apiKey: apiKey, + client: client, + model: model, }, nil } // GenerateContent sends a request to Gemini to convert extracted text into structured JSON -func (c *HTTPGeminiClient) GenerateContent(ctx context.Context, text string, schema map[string]interface{}, description string) ([]byte, error) { +func (c *HTTPGeminiClient) GenerateContent(ctx context.Context, text string, schema string, description string) ([]byte, error) { // If there's a test override function, use it instead + slog.Info("Generating content with genai package", "text length", len(text), "schema", schema, "description", description) + if c.generateContentFunc != nil { return c.generateContentFunc(ctx, text, schema, description) } @@ -82,9 +102,11 @@ func (c *HTTPGeminiClient) GenerateContent(ctx context.Context, text string, sch // Convert schema to a readable string format schemaBytes, err := json.MarshalIndent(schema, "", " ") if err != nil { + slog.Info("Failed to marshal schema to string", "error", err) return nil, fmt.Errorf("failed to marshal schema: %w", err) } schemaStr := string(schemaBytes) + slog.Info("Schema formatted for prompt", "schemaLength", len(schemaStr)) // Build the prompt for the model prompt := fmt.Sprintf(` @@ -102,91 +124,295 @@ DOCUMENT TEXT: Respond with ONLY a valid JSON object matching the schema. Do not include any explanations or markdown formatting. `, description, schemaStr, text) + slog.Info("Prompt built for Gemini", "promptLength", len(prompt)) - // Create the request to Gemini API - reqBody := GeminiRequest{ - Contents: []GeminiContent{ - { - Parts: []GeminiPart{ - { - Text: prompt, - }, - }, - }, - }, + // Use the genai client to generate content + resp, err := c.model.GenerateContent(ctx, genai.Text(prompt)) + if err != nil { + slog.Info("Gemini API request failed", "error", err, "modelName") + return nil, fmt.Errorf("failed to generate content: %w", err) } + slog.Info("Received response from Gemini API", "candidatesCount", len(resp.Candidates)) + + if len(resp.Candidates) == 0 { + slog.Info("Gemini returned empty candidates list") + return nil, errors.New("no response generated") + } + + if len(resp.Candidates[0].Content.Parts) == 0 { + slog.Info("Gemini returned candidate with empty parts list") + return nil, errors.New("no response generated") + } + + // Extract the response text + responsePart := resp.Candidates[0].Content.Parts[0] + slog.Info("Extracted first response part", "partType", fmt.Sprintf("%T", responsePart)) + + responseText, ok := responsePart.(genai.Text) + if !ok { + slog.Info("Unexpected response part type", "type", fmt.Sprintf("%T", responsePart)) + return nil, fmt.Errorf("unexpected response type: %T", responsePart) + } + slog.Info("Response text extracted", "textLength", len(string(responseText))) + + // Clean up response - remove any markdown code block formatting + cleanResponse := strings.TrimSpace(string(responseText)) + slog.Info("Trimmed response space", "beforeLength", len(string(responseText)), "afterLength", len(cleanResponse)) + + cleanResponse = strings.TrimPrefix(cleanResponse, "```json") + cleanResponse = strings.TrimPrefix(cleanResponse, "```") + cleanResponse = strings.TrimSuffix(cleanResponse, "```") + cleanResponse = strings.TrimSpace(cleanResponse) + slog.Info("Cleaned response from markdown formatting", "finalLength", len(cleanResponse)) + + // Validate the response is valid JSON + var jsonResponse interface{} + if err := json.Unmarshal([]byte(cleanResponse), &jsonResponse); err != nil { + slog.Info("Invalid JSON response from LLM", "error", err, "response", cleanResponse) + return nil, fmt.Errorf("invalid JSON response from LLM: %w", err) + } + slog.Info("Validated JSON response", "type", fmt.Sprintf("%T", jsonResponse)) + + return []byte(cleanResponse), nil +} - reqBytes, err := json.Marshal(reqBody) +// SimplePDFExtractor extracts text from a PDF file + +// SimplePDFExtractor uploads a PDF file to the LlamaParse API and retrieves the parsed result once the job is completed. +func SimplePDFExtractor(filePath string) (string, error) { + // Log the start of the file upload process + slog.Info("Starting PDF file upload", "filePath", filePath) + + // Step 1: Upload the file to the LlamaParse API + jobID, err := uploadFile(filePath) + if err != nil { + slog.Error("Failed to upload file", "filePath", filePath, "error", err) + return "", fmt.Errorf("failed to upload file: %w", err) + } + // TODO: hardcoded for testing + // var jobID="1c257b73-341f-439e-9271-90eed60a9415\" + // var jobID="f4a8b15e-62c0-4ff3-8618-1d4a0356ea73" + // Log successful file upload with job ID + slog.Info("File uploaded successfully", "filePath", filePath, "jobID", jobID) + + // Step 2: Check the status of the parsing job repeatedly until it is "completed" + var status string + for { + // Log the current job status check attempt + slog.Info("Checking parsing job status", "jobID", jobID) + + status, err := checkJobStatus(jobID) + if err != nil { + slog.Error("Failed to check job status", "jobID", jobID, "error", err) + return "", fmt.Errorf("failed to check job status: %w", err) + } + + // // Log the retrieved job status + slog.Info("Parsing job status retrieved", "jobID", jobID, "status", status) + + // If the job is completed or any other non-pending status, break the loop + if status != "PENDING" { + slog.Info("Parsing job is not pending, breaking out of the loop", "jobID", jobID, "status", status) + break + } + + // If the job is still pending, log the status and wait before retrying + slog.Warn("Parsing job not completed yet", "jobID", jobID, "status", status) + time.Sleep(5 * time.Second) // Retry every 5 seconds + } + slog.Info("Final parsing job status", "jobID", jobID, "status", status) + + + // Step 3: Retrieve the result once the job is completed + slog.Info("Retrieving parsing result", "jobID", jobID) + + result, err := getParsingResult(jobID) + if err != nil { + slog.Error("Failed to retrieve parsing result", "jobID", jobID, "error", err) + return "", fmt.Errorf("failed to retrieve parsing result: %w", err) + } + + // Log successful retrieval of the result + slog.Info("Parsing result successfully retrieved", "jobID", jobID) + + // Return the parsing result (Markdown format) + return result, nil +} + +// uploadFile uploads a PDF file to the LlamaParse API and returns the job ID. +// Struct to match the JSON response from the API +type UploadResponse struct { + ID string `json:"id"` + Status string `json:"status"` +} + +func uploadFile(filePath string) (string, error) { + // Open the file + file, err := os.Open(filePath) + if err != nil { + slog.Error("Failed to open file", "filePath", filePath, "error", err) + return "", fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Prepare the multipart form data + var b bytes.Buffer + writer := multipart.NewWriter(&b) + part, err := writer.CreateFormFile("file", file.Name()) if err != nil { - return nil, fmt.Errorf("failed to marshal request: %w", err) + slog.Error("Failed to create form file", "filePath", filePath, "error", err) + return "", fmt.Errorf("failed to create form file: %w", err) } - // Gemini API endpoint - url := fmt.Sprintf("https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key=%s", c.apiKey) + // Copy the file content into the form data + _, err = io.Copy(part, file) + if err != nil { + slog.Error("Failed to copy file content", "filePath", filePath, "error", err) + return "", fmt.Errorf("failed to read file content: %w", err) + } - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBytes)) + // Close the writer to finalize the multipart form + err = writer.Close() if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + slog.Error("Failed to close multipart writer", "filePath", filePath, "error", err) + return "", fmt.Errorf("failed to close writer: %w", err) } - req.Header.Set("Content-Type", "application/json") + + // Make the POST request to upload the file + url := "https://api.cloud.llamaindex.ai/api/v1/parsing/upload" + req, err := http.NewRequest("POST", url, &b) + if err != nil { + slog.Error("Failed to create request", "url", url, "error", err) + return "", fmt.Errorf("failed to create request: %w", err) + } + + // Set headers + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", llamaCloudAPIKey)) + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", writer.FormDataContentType()) // Send the request client := &http.Client{} resp, err := client.Do(req) if err != nil { - return nil, fmt.Errorf("failed to send request: %w", err) + slog.Error("Failed to send request", "url", url, "error", err) + return "", fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + // Log the response status code for debugging purposes + slog.Info("Received response from LlamaParse API", "statusCode", resp.StatusCode) + + // Read the response body + body, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("Failed to read response body", "error", err) + return "", fmt.Errorf("failed to read response body: %w", err) + } + + // Log the raw response body (useful for debugging) + slog.Debug("Response body", "body", string(body)) + + // Unmarshal the JSON response into the UploadResponse struct + var response UploadResponse + err = json.Unmarshal(body, &response) + if err != nil { + slog.Error("Failed to parse JSON response", "error", err) + return "", fmt.Errorf("failed to parse JSON response: %w", err) } - // Parse the response - var geminiResp GeminiResponse - if err := json.NewDecoder(resp.Body).Decode(&geminiResp); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) + // Log the parsed jobID and status + slog.Info("File uploaded successfully", "jobID", response.ID, "status", response.Status) + + // Return the job ID from the parsed response + return response.ID, nil +} + +// checkJobStatus checks the status of the parsing job using the job ID. +func checkJobStatus(jobID string) (string, error) { + // Construct the URL with the job ID in the endpoint + slog.Info("Checking job status", "jobID", jobID) + url := fmt.Sprintf("https://api.cloud.llamaindex.ai/api/v1/parsing/job/%s/details", jobID) + method := "GET" + + // Prepare the HTTP request + req, err := http.NewRequest(method, url, nil) + if err != nil { + slog.Error("Failed to create request", "url", url, "error", err) + return "", fmt.Errorf("failed to create request: %w", err) } - if len(geminiResp.Candidates) == 0 || len(geminiResp.Candidates[0].Content.Parts) == 0 { - return nil, errors.New("no response generated") + // Set necessary headers + req.Header.Add("Accept", "application/json") + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", llamaCloudAPIKey)) + + // Send the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + slog.Error("Failed to send request", "url", url, "error", err) + return "", fmt.Errorf("failed to send request: %w", err) } + defer resp.Body.Close() - // Extract the response text - responseText := geminiResp.Candidates[0].Content.Parts[0].Text - - // Clean up response - remove any markdown code block formatting - cleanResponse := strings.TrimSpace(responseText) - cleanResponse = strings.TrimPrefix(cleanResponse, "```json") - cleanResponse = strings.TrimPrefix(cleanResponse, "```") - cleanResponse = strings.TrimSuffix(cleanResponse, "```") - cleanResponse = strings.TrimSpace(cleanResponse) + // Log the response status code for debugging purposes + slog.Info("Received response from LlamaParse API", "checking status",resp) - // Validate the response is valid JSON - var jsonResponse interface{} - if err := json.Unmarshal([]byte(cleanResponse), &jsonResponse); err != nil { - return nil, fmt.Errorf("invalid JSON response from LLM: %w", err) + // Read the response body + body, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("Failed to read response body", "error", err) + return "", fmt.Errorf("failed to read response body: %w", err) } - return []byte(cleanResponse), nil + // Log the response body (you may want to change this to better handle large responses) + slog.Info("Job status response", "body", string(body)) + + // Return the job details (for simplicity, assuming it's a plain text or JSON response) + return string(body), nil } -// SimplePDFExtractor extracts text from a PDF file -// Note: In a real implementation, you would use a proper PDF parsing library -func SimplePDFExtractor(filePath string) (string, error) { - // In a real implementation, you would use a PDF library here - // For now, we'll just return a placeholder or read the file as text - // to avoid external dependencies - - content, err := ioutil.ReadFile(filePath) +// getParsingResult retrieves the text result of the parsing job using the provided job ID. +func getParsingResult(jobID string) (string, error) { + // Construct the URL to fetch the text result + url := fmt.Sprintf("https://api.cloud.llamaindex.ai/api/v1/parsing/job/%s/result/text", jobID) + method := "GET" + + // Prepare the HTTP request + req, err := http.NewRequest(method, url, nil) if err != nil { - return "", fmt.Errorf("failed to read file: %w", err) + slog.Error("Failed to create request", "url", url, "error", err) + return "", fmt.Errorf("failed to create request: %w", err) } - - // For simplicity, we'll just return the first part of the file as text - // In a real implementation, you would parse the PDF properly - return fmt.Sprintf("PDF Content (simulated): %s", string(content[:min(len(content), 1000)])), nil + + // Set necessary headers + req.Header.Add("Accept", "application/json") + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", llamaCloudAPIKey)) + + // Send the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + slog.Error("Failed to send request", "url", url, "error", err) + return "", fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + // Log the response status code for debugging purposes + slog.Info("Received response from LlamaParse API", "statusCode", resp.StatusCode) + + // Read the response body + body, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("Failed to read response body", "error", err) + return "", fmt.Errorf("failed to read response body: %w", err) + } + + // Log the response body (you may want to change this to better handle large responses) + slog.Info("Job result response", "body", string(body)) + + // Return the text content from the response + return string(body), nil } // min returns the minimum of two integers @@ -199,61 +425,97 @@ func min(a, b int) int { // extractPDFTextImpl extracts text content from a PDF document func extractPDFTextImpl(documentSource string, documentType string, maxPages int) (string, error) { + slog.Info("Starting PDF text extraction", "documentType", documentType, "documentSource", documentSource) switch documentType { case "path": - // For simplicity, we'll just use our simple extractor - return SimplePDFExtractor(documentSource) + slog.Info("Using simple extractor for local file path", "documentSource", documentSource) + text, err := SimplePDFExtractor(documentSource) + if err != nil { + slog.Info("SimplePDFExtractor failed for local file", "documentSource", documentSource, "error", err) + } else { + slog.Info("SimplePDFExtractor succeeded for local file", "documentSource", documentSource) + } + return text, err case "url": - // Download the file to a temporary location + slog.Info("Downloading PDF from URL", "documentSource", documentSource) resp, err := http.Get(documentSource) if err != nil { + slog.Info("Failed to download file", "documentSource", documentSource, "error", err) return "", fmt.Errorf("failed to download file: %w", err) } defer resp.Body.Close() + slog.Info("File downloaded successfully", "documentSource", documentSource) tempFile, err := ioutil.TempFile("", "pdf-*.pdf") if err != nil { + slog.Info("Failed to create temporary file for URL download", "error", err) return "", fmt.Errorf("failed to create temp file: %w", err) } + slog.Info("Temporary file created", "tempFile", tempFile.Name()) defer os.Remove(tempFile.Name()) defer tempFile.Close() _, err = io.Copy(tempFile, resp.Body) if err != nil { + slog.Info("Failed to write downloaded content to temporary file", "tempFile", tempFile.Name(), "error", err) return "", fmt.Errorf("failed to write downloaded content: %w", err) } + slog.Info("Downloaded content written to temporary file", "tempFile", tempFile.Name()) + tempFile.Close() // Close to flush writes + slog.Info("tempFile-nakul", "tempFile", tempFile.Name()) + slog.Info("Temporary file closed", "tempFile", tempFile.Name()) - return SimplePDFExtractor(tempFile.Name()) + text, err := SimplePDFExtractor(tempFile.Name()) + if err != nil { + slog.Info("SimplePDFExtractor failed for file downloaded from URL", "tempFile", tempFile.Name(), "error", err) + } else { + slog.Info("SimplePDFExtractor succeeded for file downloaded from URL", "tempFile", tempFile.Name()) + } + return text, err case "base64": - // Decode base64 content + slog.Info("Decoding base64 PDF content") decoded, err := base64.StdEncoding.DecodeString(documentSource) if err != nil { + slog.Info("Failed to decode base64 content", "error", err) return "", fmt.Errorf("failed to decode base64: %w", err) } + slog.Info("Base64 content decoded successfully") - // Save to temporary file tempFile, err := ioutil.TempFile("", "pdf-*.pdf") if err != nil { + slog.Info("Failed to create temporary file for base64 content", "error", err) return "", fmt.Errorf("failed to create temp file: %w", err) } + slog.Info("Temporary file created for base64 content", "tempFile", tempFile.Name()) defer os.Remove(tempFile.Name()) defer tempFile.Close() if _, err := tempFile.Write(decoded); err != nil { + slog.Info("Failed to write decoded base64 content to temporary file", "tempFile", tempFile.Name(), "error", err) return "", fmt.Errorf("failed to write to temp file: %w", err) } + slog.Info("Decoded base64 content written to temporary file", "tempFile", tempFile.Name()) tempFile.Close() // Close to flush writes + slog.Info("Temporary file closed", "tempFile", tempFile.Name()) - return SimplePDFExtractor(tempFile.Name()) + text, err := SimplePDFExtractor(tempFile.Name()) + if err != nil { + slog.Info("SimplePDFExtractor failed for file created from base64", "tempFile", tempFile.Name(), "error", err) + } else { + slog.Info("SimplePDFExtractor succeeded for file created from base64", "tempFile", tempFile.Name()) + } + return text, err default: + slog.Info("Unsupported document type encountered", "documentType", documentType) return "", fmt.Errorf("unsupported document type: %s", documentType) } } + // Global tracker instance var globalTracker *ParsingTracker @@ -271,31 +533,45 @@ func GetParsingTracker() *ParsingTracker { } // ParseDocumentWithTracking handles document parsing jobs with status tracking and retries -func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, error) { +func ParseDocumentWithTracking(ctx context.Context, payload []byte, jobID int) (Result, error) { + slog.Info("ParseDocumentWithTracking started", "payload", string(payload)) // Parse the payload to get the document ID var parsedPayload struct { DocumentID string `json:"documentID"` + DocumentType string `json:"documentType"` + DocumentSource string `json:"documentSource"` + ExpectedSchema string `json:"expected_schema"` + Description string `json:"description"` } if err := json.Unmarshal(payload, &parsedPayload); err != nil { + slog.Info("Failed to extract documentID from payload", "error", err) return Result{}, fmt.Errorf("failed to extract document ID: %w", err) } documentID := parsedPayload.DocumentID + documentType := parsedPayload.DocumentType + documentSource := parsedPayload.DocumentSource + expectedSchema := parsedPayload.ExpectedSchema + description := parsedPayload.Description if documentID == "" { - return Result{}, errors.New("documentID is required") + slog.Info("documentID is missing in payload") + return Result{}, fmt.Errorf("documentID is required") } + slog.Info("DocumentID extracted", "documentID", documentID) tracker := GetParsingTracker() + slog.Info("Parsing tracker obtained", "documentID", documentID) // Update status to uploaded if this is the first time tracker.UpdateStatus(documentID, StatusUploaded, nil) + slog.Info("Tracker status updated to 'uploaded'", "documentID", documentID) - // Function to handle any panics by updating status + // Handle any panics by updating status before re-panicking defer func() { if r := recover(); r != nil { err := fmt.Errorf("panic recovered: %v", r) tracker.UpdateStatus(documentID, StatusFailed, err) - // Re-panic so it can be handled by higher-level recovery mechanisms + slog.Info("Panic recovered, tracker status updated to 'failed'", "documentID", documentID, "error", err) panic(r) } }() @@ -305,46 +581,60 @@ func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, err // Retry loop maxAttempts := tracker.config.MaxRetries + 1 // +1 for the initial attempt + slog.Info("Starting retry loop", "maxAttempts", maxAttempts, "documentID", documentID) for attempt := 1; attempt <= maxAttempts; attempt++ { start := time.Now() + slog.Info("Retry attempt started", "attempt", attempt, "documentID", documentID) - // If this is a retry attempt, update status to retrying + // If this is a retry attempt, update status to retrying and delay briefly if attempt > 1 { tracker.UpdateStatus(documentID, StatusRetrying, nil) - // Add a small delay before retrying to prevent hammering the system + slog.Info("Tracker status updated to 'retrying'", "documentID", documentID, "attempt", attempt) time.Sleep(time.Millisecond * 100) } // Update status to parsing tracker.UpdateStatus(documentID, StatusParsing, nil) + slog.Info("Tracker status updated to 'parsing'", "documentID", documentID, "attempt", attempt) // Extract and validate the document var parsedPayload ParseDocumentPayload if err := json.Unmarshal(payload, &parsedPayload); err != nil { finalErr = fmt.Errorf("failed to unmarshal payload: %w", err) tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("Failed to unmarshal payload during parsing", "documentID", documentID, "attempt", attempt, "error", finalErr) continue // Try again if retries are available } + slog.Info("Payload unmarshalled successfully for parsing", "parsedPayload", parsedPayload) // Extract text from the PDF tracker.UpdateStatus(documentID, StatusParsing, nil) + slog.Info("Tracker status updated to 'parsing' for text extraction", "documentID", documentID, "attempt", attempt) maxPages := parsedPayload.Options.MaxPages - text, err := ExtractPDFText(parsedPayload.Document, parsedPayload.DocumentType, maxPages) + text, err := ExtractPDFText(documentSource, documentType, maxPages) if err != nil { finalErr = fmt.Errorf("text extraction error: %w", err) tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("Text extraction failed", "documentID", documentID, "attempt", attempt, "error", finalErr) continue // Try again if retries are available } + slog.Info("Text extraction succeeded", "documentID", documentID, "attempt", attempt, "extractedTextLen", len(text)) // Process with LLM tracker.UpdateStatus(documentID, StatusConverting, nil) + slog.Info("Tracker status updated to 'converting'", "documentID", documentID, "attempt", attempt) geminiClient, err := NewGeminiClient(ctx) if err != nil { finalErr = fmt.Errorf("failed to initialize Gemini client: %w", err) tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("Failed to initialize Gemini client", "documentID", documentID, "attempt", attempt, "error", finalErr) continue // Try again if retries are available } - + slog.Info("Gemini client initialized", "documentID", documentID, "attempt", attempt) + //log the text,outputSchema,description + slog.Info("Text", "text", text) + slog.Info("OutputSchema", "outputSchema", expectedSchema) + slog.Info("Description", "description", description) structuredData, err := geminiClient.GenerateContent( ctx, text, @@ -354,21 +644,40 @@ func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, err if err != nil { finalErr = fmt.Errorf("LLM processing error: %w", err) tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("LLM processing failed", "documentID", documentID, "attempt", attempt, "error", finalErr) + continue // Try again if retries are available + } + slog.Info("LLM processing succeeded", "documentID", documentID, "attempt", attempt) + + slog.Info("Structured data nakul 69696969", "structuredData", structuredData) + + //update the structured data in the database + updateQuery := "UPDATE jobs SET response = $1 WHERE id = $2" + // Here we assume that documentID corresponds to the job id. + _, err = db.Clients.DB.Exec(updateQuery, string(structuredData), jobID) + if err != nil { + finalErr = fmt.Errorf("failed to update job response: %w", err) + tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("Failed to update job response", "documentID", documentID, "attempt", attempt, "error", finalErr) continue // Try again if retries are available } + // Parse the structured data into our response format var parsedContent interface{} if err := json.Unmarshal(structuredData, &parsedContent); err != nil { finalErr = fmt.Errorf("failed to parse LLM response: %w", err) tracker.UpdateStatus(documentID, StatusFailed, finalErr) + slog.Info("Failed to parse LLM response", "documentID", documentID, "attempt", attempt, "error", finalErr) continue // Try again if retries are available } + slog.Info("LLM response parsed successfully", "documentID", documentID, "attempt", attempt) // Calculate processing time elapsedTime := time.Since(start) + slog.Info("Processing time calculated", "documentID", documentID, "attempt", attempt, "elapsedTimeMs", elapsedTime.Milliseconds()) - // Update metrics + // Update metrics and construct the parsed document parsedDocument := ParsedDocument{ Content: parsedContent, MetaInfo: map[string]interface{}{ @@ -378,6 +687,7 @@ func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, err "attempts": attempt, }, } + slog.Info("Parsed document metrics collected", "documentID", documentID, "attempt", attempt, "metaInfo", parsedDocument.MetaInfo) result = Result{ Data: parsedDocument, @@ -387,9 +697,11 @@ func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, err "documentID": documentID, }, } + slog.Info("Result constructed successfully", "documentID", documentID, "attempt", attempt) // Update status to complete tracker.UpdateStatus(documentID, StatusComplete, nil) + slog.Info("Tracker status updated to 'complete'", "documentID", documentID, "attempt", attempt) // Success, exit the retry loop finalErr = nil @@ -397,48 +709,72 @@ func ParseDocumentWithTracking(ctx context.Context, payload []byte) (Result, err } if finalErr != nil { + slog.Info("Final error after retries", "documentID", documentID, "error", finalErr) return Result{}, finalErr } + slog.Info("ParseDocumentWithTracking completed successfully", "documentID", documentID) return result, nil } + // ParseDocumentHandler handles document parsing jobs -func ParseDocumentHandler(ctx context.Context, payload []byte) (Result, error) { - // Extract document ID or generate one if not present +func ParseDocumentHandler(ctx context.Context, payload []byte, jobID int) (Result, error) { + slog.Info("ParseDocumentHandler invoked", "payload", string(payload)) + + // Attempt to extract document ID from payload var docIDContainer struct { DocumentID string `json:"documentID"` } - if err := json.Unmarshal(payload, &docIDContainer); err != nil { - // If we can't extract a document ID, we'll just use the regular parsing flow - // without tracking + slog.Error("Failed to unmarshal payload for documentID extraction", "error", err) + slog.Info("Falling back to simpleParseDocument due to unmarshalling error") return simpleParseDocument(ctx, payload) } + slog.Info("Extracted documentID container", "documentID", docIDContainer.DocumentID) documentID := docIDContainer.DocumentID if documentID == "" { - // If no document ID is provided, generate a random one for tracking + slog.Info("No documentID found in payload, generating a new one") documentID = fmt.Sprintf("doc-%s", time.Now().Format("20060102-150405-999999")) + slog.Info("Generated documentID", "documentID", documentID) - // Add the document ID to the payload + // Add the generated documentID to the payload var parsedPayload map[string]interface{} if err := json.Unmarshal(payload, &parsedPayload); err != nil { + slog.Error("Failed to unmarshal payload into map for documentID insertion", "error", err) + slog.Info("Falling back to simpleParseDocument due to unmarshalling error on map") return simpleParseDocument(ctx, payload) } + slog.Info("Garvit rand 696969",parsedPayload) parsedPayload["documentID"] = documentID + parsedPayload["documentType"] = "url" + parsedPayload["documentSource"]=parsedPayload["pdf_source"] + parsedPayload["description"]=parsedPayload["description"] + slog.Info("Inserted documentID into payload map", "documentID", documentID, "payloadMap", parsedPayload,"documentType", parsedPayload["documentType"],"documentSource", parsedPayload["documentSource"]) - // Reconstruct the payload with the document ID + // Reconstruct the payload with the documentID included updatedPayload, err := json.Marshal(parsedPayload) if err != nil { + slog.Error("Failed to marshal updated payload with documentID", "error", err) + slog.Info("Falling back to simpleParseDocument due to marshalling error") return simpleParseDocument(ctx, payload) } - + slog.Info("Reconstructed payload with documentID", "updatedPayload", string(updatedPayload)) payload = updatedPayload + } else { + slog.Info("DocumentID found in payload", "documentID", documentID) } - - // Use the tracking system for parsing - return ParseDocumentWithTracking(ctx, payload) + + // Proceed with tracking system for parsing + slog.Info("Invoking ParseDocumentWithTracking", "documentID", documentID) + result, err := ParseDocumentWithTracking(ctx, payload, jobID) + if err != nil { + slog.Error("ParseDocumentWithTracking failed", "documentID", documentID, "error", err) + } else { + slog.Info("ParseDocumentWithTracking succeeded", "documentID", documentID) + } + return result, err } // simpleParseDocument implements the original document parsing logic without tracking diff --git a/internal/models/job.go b/internal/models/job.go index c5e1a14..e64571f 100644 --- a/internal/models/job.go +++ b/internal/models/job.go @@ -11,6 +11,7 @@ type Job struct { Status string `json:"status" db:"status"` Type string `json:"type" db:"type"` CreatedAt time.Time `json:"created_at" db:"created_at"` + Response string `json:"response" db:"response"` } // pdf parsing job @@ -49,6 +50,7 @@ type NewParseDocumentPayload struct { PDFSource string `json:"pdf_source" validate:"required"` // URL or base64-encoded PDF data ExpectedSchema string `json:"expected_schema" validate:"required"` // JSON schema for desired output Name string `json:"name" validate:"required"` + Description string `json:"description" validate:"required"` } func (n NewParseDocumentPayload) JSON() any { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 31d0d1f..1edb2e6 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -13,9 +13,9 @@ import ( "github.com/IBM/sarama" "github.com/illegalcall/task-master/internal/config" + "github.com/illegalcall/task-master/internal/jobs" "github.com/illegalcall/task-master/internal/models" "github.com/illegalcall/task-master/pkg/database" - "github.com/illegalcall/task-master/internal/jobs" ) type Worker struct { @@ -26,6 +26,7 @@ type Worker struct { } func NewWorker(cfg *config.Config, db *database.Clients, consumer sarama.ConsumerGroup) *Worker { + slog.Info("Initializing new Worker") return &Worker{ cfg: cfg, db: db, @@ -36,158 +37,201 @@ func NewWorker(cfg *config.Config, db *database.Clients, consumer sarama.Consume func (w *Worker) Start(ctx context.Context) error { topics := []string{w.cfg.Kafka.Topic} + slog.Info("Starting worker", "topics", topics) + + // Initialize the jobs database + jobs.InitDB(w.db) + slog.Info("Jobs database initialized") // Setup signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + slog.Info("Signal handler setup complete") // Start error logging for consumer errors go func() { for err := range w.consumer.Errors() { - slog.Error("Kafka consumer error", "error", err) + slog.Error("Kafka consumer error received", "error", err) } }() // Start consuming messages go func() { + slog.Info("Consumer goroutine started") for { + slog.Info("Calling consumer.Consume") if err := w.consumer.Consume(ctx, topics, w); err != nil { - slog.Error("Error from consumer", "error", err) + slog.Error("Error from consumer.Consume", "error", err) } if ctx.Err() != nil { + slog.Info("Context error detected, exiting consumer loop", "error", ctx.Err()) return } + // Reset the ready channel after a new session is created w.ready = make(chan bool) } }() <-w.ready // Wait till the consumer has been set up - slog.Info("Worker started successfully", "topics", topics) + slog.Info("Worker setup complete; consumer ready") // Wait for shutdown signal select { case sig := <-sigChan: slog.Info("Received shutdown signal", "signal", sig) case <-ctx.Done(): - slog.Info("Context cancelled") + slog.Info("Context cancelled; shutting down worker") } + slog.Info("Worker shutting down gracefully") return nil } // Setup is run at the beginning of a new session, before ConsumeClaim. func (w *Worker) Setup(sarama.ConsumerGroupSession) error { + slog.Info("Consumer group session setup complete") close(w.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited. func (w *Worker) Cleanup(sarama.ConsumerGroupSession) error { + slog.Info("Consumer group session cleanup complete") return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (w *Worker) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + slog.Info("Starting ConsumeClaim loop") for message := range claim.Messages() { + slog.Info("Message received from Kafka", "offset", message.Offset, "partition", message.Partition) if err := w.processJob(message); err != nil { slog.Error("Failed to process job", "error", err) + } else { + slog.Info("Job processed successfully", "offset", message.Offset) } session.MarkMessage(message, "") + slog.Info("Message marked as processed", "offset", message.Offset) } return nil } func (w *Worker) processJob(msg *sarama.ConsumerMessage) error { var job struct { - ID int `json:"id"` - Name string `json:"name"` - Type string `json:"type"` + ID int `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + Type string `json:"type"` + CreatedAt time.Time `json:"created_at"` } + slog.Info("Received Kafka message", "msg", string(msg.Value)) // Parse JSON message if err := json.Unmarshal(msg.Value, &job); err != nil { + slog.Error("JSON unmarshalling failed", "error", err, "raw", string(msg.Value)) return fmt.Errorf("failed to parse job: %w", err) } - - slog.Info("Processing job", "jobID", job.ID, "jobName", job.Name) + slog.Info("Job parsed successfully", "jobID", job.ID, "jobName", job.Name) // Process job with retries var err error for attempt := 1; attempt <= w.cfg.Kafka.RetryMax; attempt++ { + slog.Info("Attempting job processing", "jobID", job.ID, "attempt", attempt) err = w.processJobLogic(job) if err == nil { + slog.Info("Job logic processed successfully", "jobID", job.ID, "attempt", attempt) break } - slog.Error("Job processing failed, retrying", "jobID", job.ID, "attempt", attempt, "error", err) + slog.Error("Job processing logic failed", "jobID", job.ID, "attempt", attempt, "error", err) time.Sleep(w.cfg.Kafka.RetryBackoff) } // Update job status based on processing result ctx := context.Background() redisKey := fmt.Sprintf("job:%d", job.ID) - if err != nil { // Job failed after all retries - slog.Error("Job processing failed after retries", "jobID", job.ID, "error", err) + slog.Error("Job processing ultimately failed", "jobID", job.ID, "error", err) if _, dbErr := w.db.DB.Exec("UPDATE jobs SET status = $1 WHERE id = $2", models.StatusFailed, job.ID); dbErr != nil { slog.Error("Failed to update job status to failed in DB", "jobID", job.ID, "error", dbErr) + } else { + slog.Info("Job status updated to failed in DB", "jobID", job.ID) } if err := w.db.Redis.Set(ctx, redisKey, models.StatusFailed, 0).Err(); err != nil { slog.Error("Failed to update Redis status to failed", "jobID", job.ID, "error", err) + } else { + slog.Info("Redis job status set to failed", "jobID", job.ID) } return err } // Job completed successfully + slog.Info("Job processing completed without errors", "jobID", job.ID) if _, err := w.db.DB.Exec("UPDATE jobs SET status = $1 WHERE id = $2", models.StatusCompleted, job.ID); err != nil { - slog.Error("Failed to update job status in DB", "jobID", job.ID, "error", err) + slog.Error("Failed to update job status to completed in DB", "jobID", job.ID, "error", err) return err } + slog.Info("Job status updated to completed in DB", "jobID", job.ID) + if err := w.db.Redis.Set(ctx, redisKey, models.StatusCompleted, 0).Err(); err != nil { - slog.Error("Failed to update Redis status", "jobID", job.ID, "error", err) + slog.Error("Failed to update Redis status to completed", "jobID", job.ID, "error", err) + } else { + slog.Info("Redis job status set to completed", "jobID", job.ID) } - slog.Info("Job completed successfully", "jobID", job.ID) return nil } func (w *Worker) processJobLogic(job struct { - ID int `json:"id"` - Name string `json:"name"` - Type string `json:"type"` + ID int `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + Type string `json:"type"` + CreatedAt time.Time `json:"created_at"` }) error { ctx := context.Background() - // Get job payload from Redis + // Retrieve job payload from Redis redisKey := fmt.Sprintf("job:%d:payload", job.ID) + slog.Info("Fetching full job payload from Redis", "redisKey", redisKey) payloadBytes, err := w.db.Redis.Get(ctx, redisKey).Bytes() if err != nil { + slog.Error("Failed to get job payload from Redis", "jobID", job.ID, "error", err) return fmt.Errorf("failed to get job payload: %w", err) } + slog.Info("Job payload retrieved from Redis", "jobID", job.ID) switch job.Type { case models.JobTypePDFParse: + slog.Info("Processing PDF parsing job", "jobID", job.ID) // Process PDF parsing job - result, err := jobs.ParseDocumentHandler(ctx, payloadBytes) + result, err := jobs.ParseDocumentHandler(ctx, payloadBytes, job.ID) if err != nil { + slog.Error("PDF parsing failed", "jobID", job.ID, "error", err) return fmt.Errorf("failed to process PDF: %w", err) } + slog.Info("PDF parsed successfully", "jobID", job.ID) // Store result in Redis resultKey := fmt.Sprintf("job:%d:result", job.ID) resultBytes, _ := json.Marshal(result) + slog.Info("Storing job result in Redis", "resultKey", resultKey) if err := w.db.Redis.Set(ctx, resultKey, resultBytes, w.cfg.Storage.TTL).Err(); err != nil { + slog.Error("Failed to store job result in Redis", "jobID", job.ID, "error", err) return fmt.Errorf("failed to store result: %w", err) } - + slog.Info("Job result stored successfully in Redis", "jobID", job.ID) return nil default: // For other job types, use default processing + slog.Info("Default job processing for non-PDF job", "jobID", job.ID) time.Sleep(w.cfg.Kafka.ProcessingTime) if job.ID%5 == 0 { + slog.Error("Simulated error triggered for job", "jobID", job.ID) return fmt.Errorf("simulated error for job %d", job.ID) } + slog.Info("Default job processing completed", "jobID", job.ID) return nil } } diff --git a/tmp/main b/tmp/main deleted file mode 100755 index fd989a4..0000000 Binary files a/tmp/main and /dev/null differ