From b578eee8065b58b067e561b21a75d128326ad7ff Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Sat, 18 Apr 2026 17:57:26 -0500 Subject: [PATCH] feat: add Notion metadata extractor Extract page and database metadata from Notion workspaces via the Notion API. Emits document entities for both pages and databases with child_of, belongs_to, owned_by, and documented_by edges. Reads page block content to scan for URN references linking documentation to data assets. Closes #503 (Notion portion) --- plugins/extractors/notion/README.md | 73 +++++ plugins/extractors/notion/client.go | 288 ++++++++++++++++++ plugins/extractors/notion/notion.go | 331 +++++++++++++++++++++ plugins/extractors/notion/notion_test.go | 363 +++++++++++++++++++++++ plugins/extractors/populate.go | 1 + test/e2e/notion_file/notion_file_test.go | 288 ++++++++++++++++++ 6 files changed, 1344 insertions(+) create mode 100644 plugins/extractors/notion/README.md create mode 100644 plugins/extractors/notion/client.go create mode 100644 plugins/extractors/notion/notion.go create mode 100644 plugins/extractors/notion/notion_test.go create mode 100644 test/e2e/notion_file/notion_file_test.go diff --git a/plugins/extractors/notion/README.md b/plugins/extractors/notion/README.md new file mode 100644 index 00000000..6f615370 --- /dev/null +++ b/plugins/extractors/notion/README.md @@ -0,0 +1,73 @@ +# Notion + +Extract page and database metadata from a Notion workspace using the Notion API. + +## Usage + +```yaml +source: + name: notion + scope: my-workspace + config: + token: ntn_your_integration_token + extract: + - pages + - databases +``` + +## Configuration + +| Key | Type | Required | Description | +| :-- | :--- | :------- | :---------- | +| `token` | `string` | Yes | Notion internal integration token. | +| `base_url` | `string` | No | Override Notion API base URL. Defaults to `https://api.notion.com`. | +| `extract` | `[]string` | No | Entity types to extract. Defaults to all: `pages`, `databases`. | + +## Entities + +The extractor emits `document` entities for both pages and databases. + +### Entity: `document` (page) + +| Field | Sample Value | +| :---- | :----------- | +| `urn` | `urn:notion:my-workspace:document:abc123-def456` | +| `name` | `Data Pipeline Architecture` | +| `properties.page_id` | `abc123-def456` | +| `properties.created_at` | `2024-01-15T10:30:00Z` | +| `properties.updated_at` | `2024-03-20T14:15:00Z` | +| `properties.created_by` | `Alice` | +| `properties.last_edited_by` | `Bob` | +| `properties.web_url` | `https://www.notion.so/Data-Pipeline-abc123` | +| `properties.archived` | `false` | + +### Entity: `document` (database) + +| Field | Sample Value | +| :---- | :----------- | +| `urn` | `urn:notion:my-workspace:document:db-789` | +| `name` | `Project Tracker` | +| `description` | `Track all engineering projects` | +| `properties.database_id` | `db-789` | +| `properties.created_at` | `2024-01-10T09:00:00Z` | +| `properties.updated_at` | `2024-03-18T16:00:00Z` | +| `properties.created_by` | `Alice` | +| `properties.columns` | `["Name", "Status", "Priority"]` | +| `properties.web_url` | `https://www.notion.so/db-789` | + +### Edges + +| Type | Source | Target | Description | +| :--- | :----- | :----- | :---------- | +| `child_of` | `document` | `document` | Page is a child of another page | +| `belongs_to` | `document` | `document` | Page belongs to a database | +| `owned_by` | `document` | `user` | Page/database is owned by its creator | +| `documented_by` | `document` | any | Page references a data asset via URN in its content | + +### URN Reference Detection + +The extractor reads page block content and scans for URN patterns (`urn:service:scope:type:id`), emitting `documented_by` edges to link documentation to data assets. + +## Contributing + +Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/extractors/notion/client.go b/plugins/extractors/notion/client.go new file mode 100644 index 00000000..478460d7 --- /dev/null +++ b/plugins/extractors/notion/client.go @@ -0,0 +1,288 @@ +package notion + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const ( + defaultBaseURL = "https://api.notion.com" + notionAPIVersion = "2022-06-28" + defaultPageSize = 100 +) + +// Page represents a Notion page. +type Page struct { + ID string `json:"id"` + CreatedTime time.Time `json:"created_time"` + LastEditedTime time.Time `json:"last_edited_time"` + CreatedBy User `json:"created_by"` + LastEditedBy User `json:"last_edited_by"` + Archived bool `json:"archived"` + URL string `json:"url"` + Parent Parent `json:"parent"` + Properties map[string]any `json:"properties"` +} + +// Database represents a Notion database. +type Database struct { + ID string `json:"id"` + CreatedTime time.Time `json:"created_time"` + LastEditedTime time.Time `json:"last_edited_time"` + CreatedBy User `json:"created_by"` + LastEditedBy User `json:"last_edited_by"` + Title []RichText `json:"title"` + Description []RichText `json:"description"` + Archived bool `json:"archived"` + URL string `json:"url"` + Parent Parent `json:"parent"` + Properties map[string]any `json:"properties"` +} + +// User represents a Notion user. +type User struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// Parent represents the parent of a page or database. +type Parent struct { + Type string `json:"type"` + PageID string `json:"page_id,omitempty"` + DatabaseID string `json:"database_id,omitempty"` + WorkspaceID string `json:"workspace,omitempty"` +} + +// RichText represents a Notion rich text object. +type RichText struct { + PlainText string `json:"plain_text"` +} + +// searchResponse is the response from the Notion search API. +type searchResponse struct { + Results []json.RawMessage `json:"results"` + HasMore bool `json:"has_more"` + NextCursor string `json:"next_cursor"` +} + +// blockChildrenResponse is the response from the block children API. +type blockChildrenResponse struct { + Results []Block `json:"results"` + HasMore bool `json:"has_more"` + NextCursor string `json:"next_cursor"` +} + +// Block represents a Notion block (used for reading page content). +type Block struct { + ID string `json:"id"` + Type string `json:"type"` + // We flatten all block types into a generic map for URN scanning. + Paragraph *blockContent `json:"paragraph,omitempty"` + Heading1 *blockContent `json:"heading_1,omitempty"` + Heading2 *blockContent `json:"heading_2,omitempty"` + Heading3 *blockContent `json:"heading_3,omitempty"` + BulletedList *blockContent `json:"bulleted_list_item,omitempty"` + NumberedList *blockContent `json:"numbered_list_item,omitempty"` + Quote *blockContent `json:"quote,omitempty"` + Callout *blockContent `json:"callout,omitempty"` + Code *blockContent `json:"code,omitempty"` +} + +type blockContent struct { + RichText []RichText `json:"rich_text"` +} + +// PlainText extracts all plain text from a block. +func (b *Block) PlainText() string { + for _, content := range []*blockContent{ + b.Paragraph, b.Heading1, b.Heading2, b.Heading3, + b.BulletedList, b.NumberedList, b.Quote, b.Callout, b.Code, + } { + if content == nil { + continue + } + var text string + for _, rt := range content.RichText { + text += rt.PlainText + } + if text != "" { + return text + } + } + return "" +} + +// Client wraps the Notion API. +type Client struct { + baseURL string + httpClient *http.Client + token string +} + +// NewClient creates a new Notion API client. +func NewClient(token string) *Client { + return &Client{ + baseURL: defaultBaseURL, + httpClient: &http.Client{Timeout: 30 * time.Second}, + token: token, + } +} + +// SetBaseURL overrides the API base URL (used for testing). +func (c *Client) SetBaseURL(url string) { + c.baseURL = url +} + +// SearchPages returns all pages, optionally filtered by query. +func (c *Client) SearchPages(ctx context.Context) ([]Page, error) { + var all []Page + cursor := "" + for { + body := map[string]any{ + "filter": map[string]any{"value": "page", "property": "object"}, + "page_size": defaultPageSize, + } + if cursor != "" { + body["start_cursor"] = cursor + } + + var resp searchResponse + if err := c.post(ctx, "/v1/search", body, &resp); err != nil { + return nil, fmt.Errorf("search pages: %w", err) + } + + for _, raw := range resp.Results { + var page Page + if err := json.Unmarshal(raw, &page); err != nil { + return nil, fmt.Errorf("unmarshal page: %w", err) + } + all = append(all, page) + } + + if !resp.HasMore || resp.NextCursor == "" { + break + } + cursor = resp.NextCursor + } + return all, nil +} + +// SearchDatabases returns all databases. +func (c *Client) SearchDatabases(ctx context.Context) ([]Database, error) { + var all []Database + cursor := "" + for { + body := map[string]any{ + "filter": map[string]any{"value": "database", "property": "object"}, + "page_size": defaultPageSize, + } + if cursor != "" { + body["start_cursor"] = cursor + } + + var resp searchResponse + if err := c.post(ctx, "/v1/search", body, &resp); err != nil { + return nil, fmt.Errorf("search databases: %w", err) + } + + for _, raw := range resp.Results { + var db Database + if err := json.Unmarshal(raw, &db); err != nil { + return nil, fmt.Errorf("unmarshal database: %w", err) + } + all = append(all, db) + } + + if !resp.HasMore || resp.NextCursor == "" { + break + } + cursor = resp.NextCursor + } + return all, nil +} + +// GetBlockChildren returns the top-level blocks of a page or block. +func (c *Client) GetBlockChildren(ctx context.Context, blockID string) ([]Block, error) { + var all []Block + cursor := "" + for { + path := fmt.Sprintf("/v1/blocks/%s/children?page_size=%d", blockID, defaultPageSize) + if cursor != "" { + path += "&start_cursor=" + cursor + } + + var resp blockChildrenResponse + if err := c.get(ctx, path, &resp); err != nil { + return nil, fmt.Errorf("get block children for %s: %w", blockID, err) + } + all = append(all, resp.Results...) + + if !resp.HasMore || resp.NextCursor == "" { + break + } + cursor = resp.NextCursor + } + return all, nil +} + +func (c *Client) post(ctx context.Context, path string, body any, out any) error { + jsonBody, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(jsonBody)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) + req.Header.Set("Content-Type", "application/json") + + return c.do(req, out) +} + +func (c *Client) get(ctx context.Context, path string, out any) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) + + return c.do(req, out) +} + +func (c *Client) setHeaders(req *http.Request) { + req.Header.Set("Authorization", "Bearer "+c.token) + req.Header.Set("Notion-Version", notionAPIVersion) + req.Header.Set("Accept", "application/json") +} + +func (c *Client) do(req *http.Request, out any) error { + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status %d: %s", resp.StatusCode, truncate(string(body), 200)) + } + + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "..." +} diff --git a/plugins/extractors/notion/notion.go b/plugins/extractors/notion/notion.go new file mode 100644 index 00000000..ab7ced6f --- /dev/null +++ b/plugins/extractors/notion/notion.go @@ -0,0 +1,331 @@ +package notion + +import ( + "context" + _ "embed" + "fmt" + "regexp" + "strings" + "time" + + "github.com/raystack/meteor/models" + meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1" + "github.com/raystack/meteor/plugins" + "github.com/raystack/meteor/registry" + log "github.com/raystack/salt/observability/logger" +) + +//go:embed README.md +var summary string + +type Config struct { + Token string `json:"token" yaml:"token" mapstructure:"token" validate:"required"` + BaseURL string `json:"base_url" yaml:"base_url" mapstructure:"base_url"` + Extract []string `json:"extract" yaml:"extract" mapstructure:"extract"` +} + +var sampleConfig = ` +# Notion integration token (required) +token: ntn_your_integration_token +# Entity types to extract (optional, defaults to all: ["pages", "databases"]) +extract: + - pages + - databases` + +var info = plugins.Info{ + Description: "Extract page and database metadata from a Notion workspace.", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"notion", "extractor"}, +} + +type Extractor struct { + plugins.BaseExtractor + logger log.Logger + config Config + client *Client + extract map[string]bool +} + +func New(logger log.Logger) *Extractor { + e := &Extractor{logger: logger} + e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) + return e +} + +func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { + if err := e.BaseExtractor.Init(ctx, config); err != nil { + return err + } + + e.client = NewClient(e.config.Token) + if e.config.BaseURL != "" { + e.client.SetBaseURL(e.config.BaseURL) + } + + e.extract = map[string]bool{ + "pages": true, + "databases": true, + } + if len(e.config.Extract) > 0 { + e.extract = make(map[string]bool, len(e.config.Extract)) + for _, v := range e.config.Extract { + e.extract[v] = true + } + } + + return nil +} + +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { + if e.extract["pages"] { + if err := e.extractPages(ctx, emit); err != nil { + return fmt.Errorf("extract pages: %w", err) + } + } + if e.extract["databases"] { + if err := e.extractDatabases(ctx, emit); err != nil { + return fmt.Errorf("extract databases: %w", err) + } + } + return nil +} + +func (e *Extractor) extractPages(ctx context.Context, emit plugins.Emit) error { + pages, err := e.client.SearchPages(ctx) + if err != nil { + return err + } + + for _, page := range pages { + if page.Archived { + continue + } + + var bodyText string + blocks, err := e.client.GetBlockChildren(ctx, page.ID) + if err != nil { + e.logger.Warn("failed to get page content, skipping URN scan", + "page_id", page.ID, "error", err) + } else { + bodyText = blocksToText(blocks) + } + + emit(e.buildPageRecord(page, bodyText)) + } + + return nil +} + +func (e *Extractor) extractDatabases(ctx context.Context, emit plugins.Emit) error { + databases, err := e.client.SearchDatabases(ctx) + if err != nil { + return err + } + + for _, db := range databases { + if db.Archived { + continue + } + emit(e.buildDatabaseRecord(db)) + } + + return nil +} + +func (e *Extractor) buildPageRecord(page Page, bodyText string) models.Record { + urn := models.NewURN("notion", e.UrnScope, "document", page.ID) + + title := extractPageTitle(page.Properties) + props := map[string]any{ + "page_id": page.ID, + "created_at": page.CreatedTime.UTC().Format(time.RFC3339), + "updated_at": page.LastEditedTime.UTC().Format(time.RFC3339), + "archived": page.Archived, + } + if page.URL != "" { + props["web_url"] = page.URL + } + if page.CreatedBy.Name != "" { + props["created_by"] = page.CreatedBy.Name + } + if page.LastEditedBy.Name != "" { + props["last_edited_by"] = page.LastEditedBy.Name + } + + entity := models.NewEntity(urn, "document", title, "notion", props) + + var edges []*meteorv1beta1.Edge + + // Parent relationship. + if parentURN, parentType := resolveParent(page.Parent, e.UrnScope); parentURN != "" { + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: parentURN, + Type: parentType, + Source: "notion", + }) + } + + // Owner: page creator. + if page.CreatedBy.ID != "" { + ownerURN := models.NewURN("notion", e.UrnScope, "user", page.CreatedBy.ID) + edges = append(edges, models.OwnerEdge(urn, ownerURN, "notion")) + } + + // Scan content for URN references. + if bodyText != "" { + for _, ref := range extractURNReferences(bodyText) { + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: ref, + Type: "documented_by", + Source: "notion", + }) + } + } + + return models.NewRecord(entity, edges...) +} + +func (e *Extractor) buildDatabaseRecord(db Database) models.Record { + urn := models.NewURN("notion", e.UrnScope, "document", db.ID) + + title := richTextToPlain(db.Title) + description := richTextToPlain(db.Description) + + // Extract property schema names (column names). + var columns []string + for name := range db.Properties { + columns = append(columns, name) + } + + props := map[string]any{ + "database_id": db.ID, + "created_at": db.CreatedTime.UTC().Format(time.RFC3339), + "updated_at": db.LastEditedTime.UTC().Format(time.RFC3339), + "archived": db.Archived, + } + if db.URL != "" { + props["web_url"] = db.URL + } + if db.CreatedBy.Name != "" { + props["created_by"] = db.CreatedBy.Name + } + if len(columns) > 0 { + props["columns"] = columns + } + + entity := models.NewEntity(urn, "document", title, "notion", props) + if description != "" { + entity.Description = description + } + + var edges []*meteorv1beta1.Edge + + // Parent relationship. + if parentURN, parentType := resolveParent(db.Parent, e.UrnScope); parentURN != "" { + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: parentURN, + Type: parentType, + Source: "notion", + }) + } + + // Owner: database creator. + if db.CreatedBy.ID != "" { + ownerURN := models.NewURN("notion", e.UrnScope, "user", db.CreatedBy.ID) + edges = append(edges, models.OwnerEdge(urn, ownerURN, "notion")) + } + + return models.NewRecord(entity, edges...) +} + +// resolveParent returns (parentURN, edgeType) for the given parent. +func resolveParent(parent Parent, scope string) (string, string) { + switch parent.Type { + case "page_id": + return models.NewURN("notion", scope, "document", parent.PageID), "child_of" + case "database_id": + return models.NewURN("notion", scope, "document", parent.DatabaseID), "belongs_to" + default: + return "", "" + } +} + +// extractPageTitle extracts the title from page properties. +// Notion pages have a "title" type property (often named "Name" or "title"). +func extractPageTitle(properties map[string]any) string { + for _, v := range properties { + prop, ok := v.(map[string]any) + if !ok { + continue + } + if prop["type"] != "title" { + continue + } + titleArr, ok := prop["title"].([]any) + if !ok || len(titleArr) == 0 { + continue + } + for _, item := range titleArr { + rt, ok := item.(map[string]any) + if !ok { + continue + } + if text, ok := rt["plain_text"].(string); ok { + return text + } + } + } + return "Untitled" +} + +// richTextToPlain concatenates plain text from a rich text array. +func richTextToPlain(texts []RichText) string { + var parts []string + for _, t := range texts { + if t.PlainText != "" { + parts = append(parts, t.PlainText) + } + } + return strings.Join(parts, "") +} + +// blocksToText concatenates plain text from all blocks. +func blocksToText(blocks []Block) string { + var parts []string + for _, b := range blocks { + if text := b.PlainText(); text != "" { + parts = append(parts, text) + } + } + return strings.Join(parts, "\n") +} + +// urnPattern matches URN references embedded in page content. +var urnPattern = regexp.MustCompile(`urn:[a-zA-Z0-9_-]+:[a-zA-Z0-9_.-]+:[a-zA-Z0-9_-]+:[a-zA-Z0-9_./-]+`) + +// extractURNReferences finds URN strings in text content. +func extractURNReferences(body string) []string { + matches := urnPattern.FindAllString(body, -1) + seen := make(map[string]bool, len(matches)) + var unique []string + for _, m := range matches { + cleaned := strings.TrimRight(m, ".,;:!?\"')") + if !seen[cleaned] { + seen[cleaned] = true + unique = append(unique, cleaned) + } + } + return unique +} + +func init() { + if err := registry.Extractors.Register("notion", func() plugins.Extractor { + return New(plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/extractors/notion/notion_test.go b/plugins/extractors/notion/notion_test.go new file mode 100644 index 00000000..8c535459 --- /dev/null +++ b/plugins/extractors/notion/notion_test.go @@ -0,0 +1,363 @@ +//go:build plugins +// +build plugins + +package notion_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/raystack/meteor/models" + meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1" + "github.com/raystack/meteor/plugins" + extractor "github.com/raystack/meteor/plugins/extractors/notion" + "github.com/raystack/meteor/test/mocks" + testutils "github.com/raystack/meteor/test/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const urnScope = "test-notion" + +func TestInit(t *testing.T) { + t.Run("should return error when token is missing", func(t *testing.T) { + err := extractor.New(testutils.Logger).Init(context.TODO(), plugins.Config{ + URNScope: urnScope, + RawConfig: map[string]any{}, + }) + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) + }) + + t.Run("should succeed with valid config", func(t *testing.T) { + err := extractor.New(testutils.Logger).Init(context.TODO(), plugins.Config{ + URNScope: urnScope, + RawConfig: map[string]any{ + "token": "ntn_test_token", + }, + }) + assert.NoError(t, err) + }) +} + +func TestExtract(t *testing.T) { + t.Run("should extract pages with edges", func(t *testing.T) { + server := newMockServer(t) + defer server.Close() + + extr := initExtractor(t, server.URL, map[string]any{ + "token": "test-token", + "extract": []any{"pages"}, + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + require.Len(t, records, 2) + + // Verify parent page. + parentPage := findByURNSuffix(records, "page-1") + require.NotNil(t, parentPage) + assert.Equal(t, "Architecture Docs", parentPage.Entity().GetName()) + assert.Equal(t, "document", parentPage.Entity().GetType()) + assert.Equal(t, "notion", parentPage.Entity().GetSource()) + + parentEdges := parentPage.Edges() + assert.Len(t, parentEdges, 1) // owned_by only (workspace parent is skipped) + assert.NotNil(t, findEdge(parentEdges, "owned_by")) + + // Verify child page with child_of edge. + childPage := findByURNSuffix(records, "page-2") + require.NotNil(t, childPage) + assert.Equal(t, "Database Design", childPage.Entity().GetName()) + + childEdges := childPage.Edges() + require.Len(t, childEdges, 2) // child_of + owned_by + childOfEdge := findEdge(childEdges, "child_of") + require.NotNil(t, childOfEdge) + assert.Contains(t, childOfEdge.GetTargetUrn(), "page-1") + }) + + t.Run("should extract databases with columns and description", func(t *testing.T) { + server := newMockServer(t) + defer server.Close() + + extr := initExtractor(t, server.URL, map[string]any{ + "token": "test-token", + "extract": []any{"databases"}, + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + require.Len(t, records, 1) + + dbRecord := records[0] + assert.Equal(t, "Project Tracker", dbRecord.Entity().GetName()) + assert.Equal(t, "Track engineering projects", dbRecord.Entity().GetDescription()) + + props := dbRecord.Entity().GetProperties().AsMap() + assert.NotNil(t, props["columns"]) + assert.Equal(t, "db-1", props["database_id"]) + + // Should have owned_by edge. + edges := dbRecord.Edges() + assert.NotNil(t, findEdge(edges, "owned_by")) + }) + + t.Run("should detect URN references in page content", func(t *testing.T) { + server := newMockServerWithURNs(t) + defer server.Close() + + extr := initExtractor(t, server.URL, map[string]any{ + "token": "test-token", + "extract": []any{"pages"}, + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + require.Len(t, records, 1) + + edges := records[0].Edges() + docEdge := findEdge(edges, "documented_by") + require.NotNil(t, docEdge) + assert.Equal(t, "urn:bigquery:prod:table:project.dataset.orders", docEdge.GetTargetUrn()) + }) + + t.Run("should skip archived pages", func(t *testing.T) { + server := newMockServerWithArchived(t) + defer server.Close() + + extr := initExtractor(t, server.URL, map[string]any{ + "token": "test-token", + "extract": []any{"pages"}, + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + require.Len(t, records, 1) + assert.Equal(t, "Active Page", records[0].Entity().GetName()) + }) +} + +// --- test helpers --- + +func initExtractor(t *testing.T, serverURL string, rawConfig map[string]any) *extractor.Extractor { + t.Helper() + rawConfig["base_url"] = serverURL + extr := extractor.New(testutils.Logger) + err := extr.Init(context.Background(), plugins.Config{ + URNScope: urnScope, + RawConfig: rawConfig, + }) + require.NoError(t, err) + return extr +} + +func findByURNSuffix(records []models.Record, suffix string) *models.Record { + for i, r := range records { + urn := r.Entity().GetUrn() + if len(urn) >= len(suffix) && urn[len(urn)-len(suffix):] == suffix { + return &records[i] + } + } + return nil +} + +func findEdge(edges []*meteorv1beta1.Edge, typ string) *meteorv1beta1.Edge { + for _, e := range edges { + if e.GetType() == typ { + return e + } + } + return nil +} + +// --- mock servers --- + +func newMockServer(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/v1/search", func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + + filter := body["filter"].(map[string]any) + value := filter["value"].(string) + + switch value { + case "page": + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makePage("page-1", "Architecture Docs", "", "workspace", "user-a", "Alice", false), + makePage("page-2", "Database Design", "page-1", "page_id", "user-b", "Bob", false), + }, + "has_more": false, + "next_cursor": "", + }) + case "database": + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makeDatabase("db-1", "Project Tracker", "Track engineering projects", "user-a", "Alice", + map[string]any{ + "Name": map[string]any{"type": "title"}, + "Status": map[string]any{"type": "select"}, + "Priority": map[string]any{"type": "number"}, + }), + }, + "has_more": false, + "next_cursor": "", + }) + } + }) + + mux.HandleFunc("/v1/blocks/page-1/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makeBlock("paragraph", "Overview of architecture."), + }, + "has_more": false, + "next_cursor": "", + }) + }) + + mux.HandleFunc("/v1/blocks/page-2/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makeBlock("paragraph", "Database schema details."), + }, + "has_more": false, + "next_cursor": "", + }) + }) + + return httptest.NewServer(mux) +} + +func newMockServerWithURNs(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/v1/search", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makePage("page-urn", "Pipeline Docs", "", "workspace", "user-a", "Alice", false), + }, + "has_more": false, + "next_cursor": "", + }) + }) + + mux.HandleFunc("/v1/blocks/page-urn/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makeBlock("paragraph", "This reads from urn:bigquery:prod:table:project.dataset.orders daily."), + }, + "has_more": false, + "next_cursor": "", + }) + }) + + return httptest.NewServer(mux) +} + +func newMockServerWithArchived(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/v1/search", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makePage("page-active", "Active Page", "", "workspace", "user-a", "Alice", false), + makePage("page-archived", "Archived Page", "", "workspace", "user-b", "Bob", true), + }, + "has_more": false, + "next_cursor": "", + }) + }) + + mux.HandleFunc("/v1/blocks/page-active/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{}, + "has_more": false, + "next_cursor": "", + }) + }) + + return httptest.NewServer(mux) +} + +// --- mock data builders --- + +func makePage(id, title, parentID, parentType, userID, userName string, archived bool) map[string]any { + parent := map[string]any{"type": parentType} + if parentType == "page_id" { + parent["page_id"] = parentID + } + + return map[string]any{ + "object": "page", + "id": id, + "created_time": "2024-01-15T10:30:00.000Z", + "last_edited_time": "2024-03-20T14:15:00.000Z", + "created_by": map[string]any{"id": userID, "name": userName}, + "last_edited_by": map[string]any{"id": userID, "name": userName}, + "archived": archived, + "url": "https://www.notion.so/" + id, + "parent": parent, + "properties": map[string]any{ + "Name": map[string]any{ + "type": "title", + "title": []map[string]any{ + {"plain_text": title}, + }, + }, + }, + } +} + +func makeDatabase(id, title, description, userID, userName string, props map[string]any) map[string]any { + return map[string]any{ + "object": "database", + "id": id, + "created_time": "2024-01-10T09:00:00.000Z", + "last_edited_time": "2024-03-18T16:00:00.000Z", + "created_by": map[string]any{"id": userID, "name": userName}, + "last_edited_by": map[string]any{"id": userID, "name": userName}, + "title": []map[string]any{{"plain_text": title}}, + "description": []map[string]any{{"plain_text": description}}, + "archived": false, + "url": "https://www.notion.so/" + id, + "parent": map[string]any{"type": "workspace"}, + "properties": props, + } +} + +func makeBlock(blockType, text string) map[string]any { + return map[string]any{ + "type": blockType, + blockType: map[string]any{ + "rich_text": []map[string]any{ + {"plain_text": text}, + }, + }, + } +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(v) //nolint:errcheck +} diff --git a/plugins/extractors/populate.go b/plugins/extractors/populate.go index 606922de..27a11e57 100644 --- a/plugins/extractors/populate.go +++ b/plugins/extractors/populate.go @@ -22,6 +22,7 @@ import ( _ "github.com/raystack/meteor/plugins/extractors/mongodb" _ "github.com/raystack/meteor/plugins/extractors/mssql" _ "github.com/raystack/meteor/plugins/extractors/mysql" + _ "github.com/raystack/meteor/plugins/extractors/notion" _ "github.com/raystack/meteor/plugins/extractors/optimus" _ "github.com/raystack/meteor/plugins/extractors/oracle" _ "github.com/raystack/meteor/plugins/extractors/postgres" diff --git a/test/e2e/notion_file/notion_file_test.go b/test/e2e/notion_file/notion_file_test.go new file mode 100644 index 00000000..0fc5c920 --- /dev/null +++ b/test/e2e/notion_file/notion_file_test.go @@ -0,0 +1,288 @@ +//go:build integration +// +build integration + +package notion_file_test + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/raystack/meteor/cmd" + _ "github.com/raystack/meteor/plugins/extractors" + _ "github.com/raystack/meteor/plugins/processors" + _ "github.com/raystack/meteor/plugins/sinks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNotionToFile(t *testing.T) { + mock := newMockNotionServer() + server := httptest.NewServer(mock) + defer server.Close() + + outFile, err := os.CreateTemp("", "notion-e2e-*.ndjson") + require.NoError(t, err) + outFile.Close() + defer os.Remove(outFile.Name()) + + recipeContent := fmt.Sprintf(` +name: notion-to-file-e2e +version: v1beta1 +source: + name: notion + scope: e2e-test + config: + token: test-token + base_url: %s +sinks: + - name: file + config: + path: %s + format: ndjson + overwrite: true +`, server.URL, outFile.Name()) + + recipeFile, err := os.CreateTemp("", "recipe-*.yml") + require.NoError(t, err) + defer os.Remove(recipeFile.Name()) + + _, err = recipeFile.WriteString(recipeContent) + require.NoError(t, err) + require.NoError(t, recipeFile.Close()) + + command := cmd.RunCmd() + command.SetArgs([]string{recipeFile.Name()}) + err = command.Execute() + require.NoError(t, err) + + data, err := os.ReadFile(outFile.Name()) + require.NoError(t, err) + + records := parseNDJSON(t, data) + + // Expect: 2 pages + 1 database = 3 records. + require.Len(t, records, 3, "expected 2 pages + 1 database") + + // Verify page records. + docRecords := findByEntityType(records, "document") + require.Len(t, docRecords, 3) + + // Find the parent page. + archPage := findByName(docRecords, "Architecture Docs") + require.NotNil(t, archPage) + archEntity := archPage["entity"].(map[string]any) + assert.Contains(t, archEntity["urn"], "document:page-1") + + // Find child page and verify child_of edge. + childPage := findByName(docRecords, "Database Design") + require.NotNil(t, childPage) + childEdges := toEdges(childPage) + assertHasEdgeType(t, childEdges, "child_of") + assertHasEdgeType(t, childEdges, "owned_by") + + // Find database and verify columns. + dbPage := findByName(docRecords, "Project Tracker") + require.NotNil(t, dbPage) + dbEntity := dbPage["entity"].(map[string]any) + assert.Equal(t, "notion", dbEntity["source"]) + + dbProps := dbEntity["properties"].(map[string]any) + assert.NotNil(t, dbProps["columns"]) + + // Print summary. + fmt.Printf("\n=== Notion E2E Test Summary ===\n") + fmt.Printf("Total records: %d\n", len(records)) + for _, r := range records { + e := r["entity"].(map[string]any) + fmt.Printf(" - %s (type=%s, urn=%s)\n", e["name"], e["type"], e["urn"]) + } +} + +// --- Mock Notion Server --- + +func newMockNotionServer() http.Handler { + mux := http.NewServeMux() + + mux.HandleFunc("/v1/search", func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + + filter := body["filter"].(map[string]any) + value := filter["value"].(string) + + switch value { + case "page": + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makePage("page-1", "Architecture Docs", "", "workspace", "user-a", "Alice"), + makePage("page-2", "Database Design", "page-1", "page_id", "user-b", "Bob"), + }, + "has_more": false, + }) + case "database": + writeJSON(w, map[string]any{ + "results": []map[string]any{ + makeDatabase("db-1", "Project Tracker", "Engineering projects", "user-a", "Alice", + map[string]any{ + "Name": map[string]any{"type": "title"}, + "Status": map[string]any{"type": "select"}, + }), + }, + "has_more": false, + }) + } + }) + + mux.HandleFunc("/v1/blocks/page-1/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{makeBlock("paragraph", "Architecture overview content.")}, + "has_more": false, + }) + }) + + mux.HandleFunc("/v1/blocks/page-2/children", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{makeBlock("paragraph", "Database schema docs.")}, + "has_more": false, + }) + }) + + return mux +} + +func makePage(id, title, parentID, parentType, userID, userName string) map[string]any { + parent := map[string]any{"type": parentType} + if parentType == "page_id" { + parent["page_id"] = parentID + } + return map[string]any{ + "object": "page", + "id": id, + "created_time": "2024-01-15T10:30:00.000Z", + "last_edited_time": "2024-03-20T14:15:00.000Z", + "created_by": map[string]any{"id": userID, "name": userName}, + "last_edited_by": map[string]any{"id": userID, "name": userName}, + "archived": false, + "url": "https://www.notion.so/" + id, + "parent": parent, + "properties": map[string]any{ + "Name": map[string]any{ + "type": "title", + "title": []map[string]any{{"plain_text": title}}, + }, + }, + } +} + +func makeDatabase(id, title, description, userID, userName string, props map[string]any) map[string]any { + return map[string]any{ + "object": "database", + "id": id, + "created_time": "2024-01-10T09:00:00.000Z", + "last_edited_time": "2024-03-18T16:00:00.000Z", + "created_by": map[string]any{"id": userID, "name": userName}, + "last_edited_by": map[string]any{"id": userID, "name": userName}, + "title": []map[string]any{{"plain_text": title}}, + "description": []map[string]any{{"plain_text": description}}, + "archived": false, + "url": "https://www.notion.so/" + id, + "parent": map[string]any{"type": "workspace"}, + "properties": props, + } +} + +func makeBlock(blockType, text string) map[string]any { + return map[string]any{ + "type": blockType, + blockType: map[string]any{ + "rich_text": []map[string]any{{"plain_text": text}}, + }, + } +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(v) //nolint:errcheck +} + +// --- helpers --- + +func parseNDJSON(t *testing.T, data []byte) []map[string]any { + t.Helper() + var records []map[string]any + for _, line := range splitLines(data) { + if len(line) == 0 { + continue + } + var rec map[string]any + require.NoError(t, json.Unmarshal(line, &rec)) + records = append(records, rec) + } + return records +} + +func splitLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + for i, b := range data { + if b == '\n' { + lines = append(lines, data[start:i]) + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + return lines +} + +func findByEntityType(records []map[string]any, typ string) []map[string]any { + var out []map[string]any + for _, r := range records { + if e, ok := r["entity"].(map[string]any); ok { + if e["type"] == typ { + out = append(out, r) + } + } + } + return out +} + +func findByName(records []map[string]any, name string) map[string]any { + for _, r := range records { + if e, ok := r["entity"].(map[string]any); ok { + if e["name"] == name { + return r + } + } + } + return nil +} + +func toEdges(record map[string]any) []map[string]any { + edgesRaw, ok := record["edges"].([]any) + if !ok { + return nil + } + var edges []map[string]any + for _, e := range edgesRaw { + if m, ok := e.(map[string]any); ok { + edges = append(edges, m) + } + } + return edges +} + +func assertHasEdgeType(t *testing.T, edges []map[string]any, typ string) { + t.Helper() + for _, e := range edges { + if e["type"] == typ { + return + } + } + t.Errorf("expected edge of type %q, found none in %d edges", typ, len(edges)) +}