Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ service SeqProxyApi {
body: "*"
};
}

rpc OnePhaseSearch(SearchRequest) returns (ComplexSearchResponse) {
option (google.api.http) = {
post: "/one-phase-search"
body: "*"
};
}
}

// Custom error code, returned by seq-db proxy.
Expand Down
64 changes: 62 additions & 2 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ service StoreApi {
rpc Fetch(FetchRequest) returns (stream BinaryData) {}

rpc Status(StatusRequest) returns (StatusResponse) {}

rpc OnePhaseSearch(OnePhaseSearchRequest) returns (stream OnePhaseSearchResponse) {}
}

message BulkRequest {
Expand Down Expand Up @@ -244,12 +246,13 @@ message IdWithHint {
string hint = 2;
}

message FetchRequest {
message FieldsFilter {
message FieldsFilter {
repeated string fields = 1;
// see seqproxyapi.FetchRequest.FieldsFilter.allow_list for details.
bool allow_list = 2;
}

message FetchRequest {
repeated string ids = 1;
bool explain = 3;
repeated IdWithHint ids_with_hints = 4;
Expand All @@ -261,3 +264,60 @@ message StatusRequest {}
message StatusResponse {
google.protobuf.Timestamp oldest_time = 1;
}

message OnePhaseSearchRequest {
string query = 1;
google.protobuf.Timestamp from = 2;
google.protobuf.Timestamp to = 3;
int64 size = 4;
int64 offset = 5;
bool explain = 6;
bool with_total = 7;
Order order = 8;
string offset_id = 9;
FieldsFilter fields_filter = 10;
}

message OnePhaseSearchResponse {
oneof ResponseType {
Header header = 1;
RecordsBatch batch = 2;
}
}

message Header {
Metadata metadata = 1;
repeated Typing typing = 2;
}

message Metadata {
uint64 total = 1;
SearchErrorCode code = 2;
repeated string errors = 3;
optional ExplainEntry explain = 4;
}

enum DataType {
BYTES = 0;
RAW_DOCUMENT = 1;
STRING = 2;
UINT32 = 3;
UINT64 = 4;
INT32 = 5;
INT64 = 6;
FLOAT64 = 7;
// TODO: array data types: StringArray, Uin64Array, Float64Array etc.
}

message Typing {
string title = 1;
DataType type = 2;
}

message RecordsBatch {
repeated Record records = 1;
}

message Record {
repeated bytes raw_data = 1;
}
161 changes: 160 additions & 1 deletion parser/seqql_pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func parsePipes(lex *lexer) ([]Pipe, error) {
}
pipes = append(pipes, p)
fieldFilters++
case lex.IsKeyword("stats"):
p, err := parsePipeStats(lex)
if err != nil {
return nil, fmt.Errorf("parsing 'stats' pipe: %s", err)
}
pipes = append(pipes, p)
default:
return nil, fmt.Errorf("unknown pipe: %s", lex.Token)
}
Expand Down Expand Up @@ -62,6 +68,50 @@ func (f *PipeFields) DumpSeqQL(o *strings.Builder) {
}
}

type StatsAgg struct {
Func string
Field string
GroupBy string
Interval string
Quantiles []float64
}

type PipeStats struct {
Aggs []StatsAgg
}

func (p *PipeStats) Name() string {
return "stats"
}

func (p *PipeStats) DumpSeqQL(o *strings.Builder) {
o.WriteString("stats ")
for i, agg := range p.Aggs {
if i > 0 {
o.WriteString(", ")
}
o.WriteString(agg.Func)
if agg.Field != "" {
o.WriteString("(")
o.WriteString(quoteTokenIfNeeded(agg.Field))
for _, q := range agg.Quantiles {
fmt.Fprintf(o, ", %v", q)
}
o.WriteString(")")
}
if agg.GroupBy != "" {
o.WriteString(" by (")
o.WriteString(quoteTokenIfNeeded(agg.GroupBy))
o.WriteString(")")
}
if agg.Interval != "" {
o.WriteString(" interval(")
o.WriteString(agg.Interval)
o.WriteString(")")
}
}
}

func parsePipeFields(lex *lexer) (*PipeFields, error) {
if !lex.IsKeyword("fields") {
return nil, fmt.Errorf("missing 'fields' keyword")
Expand All @@ -85,6 +135,115 @@ func parsePipeFields(lex *lexer) (*PipeFields, error) {
}, nil
}

func parsePipeStats(lex *lexer) (*PipeStats, error) {
if !lex.IsKeyword("stats") {
return nil, fmt.Errorf("missing 'stats' keyword")
}
lex.Next()

var aggs []StatsAgg
for {
agg, err := parseStatsAgg(lex)
if err != nil {
return nil, err
}
aggs = append(aggs, agg)

if !lex.IsKeyword(",") {
break
}
lex.Next()
}

if len(aggs) == 0 {
return nil, fmt.Errorf("at least one aggregation is required")
}

return &PipeStats{Aggs: aggs}, nil
}

func parseStatsAgg(lex *lexer) (StatsAgg, error) {
var agg StatsAgg

if !lex.IsKeywords("count", "sum", "min", "max", "avg", "quantile", "unique", "unique_count") {
return agg, fmt.Errorf("expected aggregation function (count, sum, min, max, avg, quantile, unique, unique_count), got %s", lex.Token)
}
agg.Func = strings.ToLower(lex.Token)
lex.Next()

if lex.IsKeyword("(") {
lex.Next()
field, err := parseCompositeTokenReplaceWildcards(lex)
if err != nil {
return agg, err
}
agg.Field = field

for lex.IsKeyword(",") {
lex.Next()
q, err := parseNumber(lex)
if err != nil {
return agg, fmt.Errorf("failed to parse quantile: %w", err)
}
agg.Quantiles = append(agg.Quantiles, q)
}

if !lex.IsKeyword(")") {
return agg, fmt.Errorf("expected ')' after field, got %s", lex.Token)
}
lex.Next()
}

if lex.IsKeyword("by") {
lex.Next()
if !lex.IsKeyword("(") {
return agg, fmt.Errorf("expected '(' after 'by', got %s", lex.Token)
}
lex.Next()
groupBy, err := parseCompositeTokenReplaceWildcards(lex)
if err != nil {
return agg, err
}
agg.GroupBy = groupBy
if !lex.IsKeyword(")") {
return agg, fmt.Errorf("expected ')' after groupBy, got %s", lex.Token)
}
lex.Next()
}

if lex.IsKeyword("interval") {
lex.Next()
if !lex.IsKeyword("(") {
return agg, fmt.Errorf("expected '(' after 'interval', got %s", lex.Token)
}
lex.Next()
interval := lex.Token
if interval == "" {
return agg, fmt.Errorf("expected interval value, got %s", lex.Token)
}
agg.Interval = interval
lex.Next()
if !lex.IsKeyword(")") {
return agg, fmt.Errorf("expected ')' after interval, got %s", lex.Token)
}
lex.Next()
}

return agg, nil
}

func parseNumber(lex *lexer) (float64, error) {
if lex.Token == "" {
return 0, fmt.Errorf("expected number, got empty token")
}
q, err := strconv.ParseFloat(lex.Token, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse number %s: %w", lex.Token, err)
}
lex.Next()
return q, nil
}

func parseFieldList(lex *lexer) ([]string, error) {
var fields []string
trailingComma := false
Expand Down Expand Up @@ -149,7 +308,7 @@ var reservedKeywords = uniqueTokens([]string{
"|",

// Pipe specific keywords.
"fields", "except",
"fields", "except", "stats", "by", "interval", "unique_count",
})

func needQuoteToken(s string) bool {
Expand Down
41 changes: 41 additions & 0 deletions parser/seqql_pipes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,44 @@ func TestParsePipeFieldsExcept(t *testing.T) {
test(`* | fields except "_\\message*"`, `* | fields except "_\\message\*"`)
test(`* | fields except k8s_namespace`, `* | fields except k8s_namespace`)
}

func TestParsePipeStats(t *testing.T) {
test := func(q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
require.Equal(t, expected, query.SeqQLString())
}

test("service:my-service | stats count by (service)", "service:my-service | stats count by (service)")
test("service:my-service | stats sum(level) by (service)", "service:my-service | stats sum(level) by (service)")
test("service:my-service | stats count by (service) interval(1m)", "service:my-service | stats count by (service) interval(1m)")
test("service:my-service | stats min(response_time) by (service)", "service:my-service | stats min(response_time) by (service)")
test("service:my-service | stats max(response_time) by (service)", "service:my-service | stats max(response_time) by (service)")
test("service:my-service | stats avg(response_time) by (service)", "service:my-service | stats avg(response_time) by (service)")
test("service:my-service | stats unique by (service)", "service:my-service | stats unique by (service)")
test("service:my-service | stats unique_count by (service)", "service:my-service | stats unique_count by (service)")
}

func TestParsePipeStatsMultiple(t *testing.T) {
test := func(q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
require.Equal(t, expected, query.SeqQLString())
}

test("service:my-service | stats count by (service), sum(level) by (service)", "service:my-service | stats count by (service), sum(level) by (service)")
test("service:my-service | stats count by (service) interval(1m), sum(level) by (service) interval(1m)", "service:my-service | stats count by (service) interval(1m), sum(level) by (service) interval(1m)")
}

func TestParsePipeStatsQuantile(t *testing.T) {
test := func(q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
require.Equal(t, expected, query.SeqQLString())
}

test("service:my-service | stats quantile(response_time, 0.5, 0.95) by (service)", "service:my-service | stats quantile(response_time, 0.5, 0.95) by (service)")
}
Loading
Loading