diff --git a/chain_capabilities/stellar/.mockery.yaml b/chain_capabilities/stellar/.mockery.yaml new file mode 100644 index 000000000..86e862182 --- /dev/null +++ b/chain_capabilities/stellar/.mockery.yaml @@ -0,0 +1,17 @@ +dir: "{{ .InterfaceDir }}" +mockname: "{{ .InterfaceName }}_mock" +outpkg: actions +inpackage: true +filename: "test_{{ .InterfaceName | snakecase }}_mock.go" +fail-on-missing: true +resolve-type-alias: false +packages: + github.com/smartcontractkit/chainlink-common/pkg/types: + config: + dir: actions + mockname: "StellarService_mock" + outpkg: actions + inpackage: true + filename: "test_stellar_service_mock.go" + interfaces: + StellarService: diff --git a/chain_capabilities/stellar/.tool-versions b/chain_capabilities/stellar/.tool-versions new file mode 100644 index 000000000..010d49ff5 --- /dev/null +++ b/chain_capabilities/stellar/.tool-versions @@ -0,0 +1 @@ +golang 1.25.3 diff --git a/chain_capabilities/stellar/actions/actions.go b/chain_capabilities/stellar/actions/actions.go index 84c99c51a..82bcab648 100644 --- a/chain_capabilities/stellar/actions/actions.go +++ b/chain_capabilities/stellar/actions/actions.go @@ -11,46 +11,87 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" + commoncfg "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/types" + stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" "github.com/smartcontractkit/chainlink-framework/multinode" capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" commonmon "github.com/smartcontractkit/capabilities/chain_capabilities/common/monitoring" - "github.com/smartcontractkit/capabilities/libs/chainconsensus" - ctypes "github.com/smartcontractkit/capabilities/libs/chainconsensus/types" - + ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/metering" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" + "github.com/smartcontractkit/capabilities/libs/chainconsensus" + ctypes "github.com/smartcontractkit/capabilities/libs/chainconsensus/types" ) -// Stellar implements the CRE capability actions for the Stellar chain +// Stellar implements the CRE capability actions for the Stellar chain. type Stellar struct { types.StellarService - handler chainconsensus.RequestHandler - lggr logger.SugaredLogger - messageBuilder *monitoring.MessageBuilder - beholderProcessor beholder.ProtoProcessor - chainSelector uint64 + handler chainconsensus.RequestHandler + lggr logger.SugaredLogger + messageBuilder *monitoring.MessageBuilder + beholderProcessor beholder.ProtoProcessor + chainSelector uint64 + forwarderClient CREForwarderClient + forwarderLookbackLedgers int64 + reportSizeLimit limits.BoundLimiter[commoncfg.Size] + transmissionScheduler ts.TransmissionScheduler } -// NewStellar builds the Stellar capability actions. func NewStellar( service types.StellarService, + forwarderAddress string, + forwarderLookbackLedgers int64, lggr logger.Logger, + limitsFactory limits.Factory, + transmissionScheduler ts.TransmissionScheduler, chainSelector uint64, handler chainconsensus.RequestHandler, messageBuilder *monitoring.MessageBuilder, beholderProcessor beholder.ProtoProcessor, ) (*Stellar, error) { - return &Stellar{ - StellarService: service, - handler: handler, - lggr: logger.Sugared(lggr), - messageBuilder: messageBuilder, - beholderProcessor: beholderProcessor, - chainSelector: chainSelector, - }, nil + if service == nil { + return nil, fmt.Errorf("stellar service is required") + } + + st := &Stellar{ + StellarService: service, + handler: handler, + lggr: logger.Sugared(lggr), + messageBuilder: messageBuilder, + beholderProcessor: beholderProcessor, + chainSelector: chainSelector, + forwarderClient: newForwarderClient(service, lggr, forwarderAddress, forwarderLookbackLedgers), + forwarderLookbackLedgers: forwarderLookbackLedgers, + transmissionScheduler: transmissionScheduler, + } + return st, st.initLimiters(limitsFactory) +} + +func (s *Stellar) initLimiters(limitsFactory limits.Factory) (err error) { + s.reportSizeLimit, err = limits.MakeUpperBoundLimiter(limitsFactory, cresettings.Default.PerWorkflow.ChainWrite.ReportSizeLimit) + return err +} + +func (s *Stellar) Close() error { + return services.CloseAll(s.reportSizeLimit) +} + +func (s *Stellar) GetLatestLedger(ctx context.Context, _ capabilities.RequestMetadata, _ *stellarcap.GetLatestLedgerRequest) (*capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse], caperrors.Error) { + resp, err := s.StellarService.GetLatestLedger(ctx) + if err != nil { + return nil, GetError(err, false) + } + protoResp, err := stellarcap.ConvertGetLatestLedgerResponseToProto(resp) + if err != nil { + return nil, GetError(err, false) + } + return &capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse]{Response: protoResp}, nil } // ReadContract performs a consensus read of a read-only Soroban contract call. @@ -59,7 +100,7 @@ func (s *Stellar) ReadContract( metadata capabilities.RequestMetadata, input *stellarcap.ReadContractRequest, ) (*capabilities.ResponseAndMetadata[*stellarcap.ReadContractResponse], caperrors.Error) { - request, err := stellarcap.ConvertReadContractRequestFromProto(input) + request, err := convertReadContractRequestFromProto(input) if err != nil { return nil, caperrors.NewPublicUserError(fmt.Errorf("invalid request: %w", err), caperrors.InvalidArgument) } @@ -81,13 +122,13 @@ func (s *Stellar) ReadContract( metadata.ReferenceID, metering.GetResponseMetadata(metering.ReadContract), func(ctx context.Context) (*stellarcap.ReadContractResponse, uint64, error) { - response, err := s.StellarService.ReadContract(ctx, request) + response, err := s.SimulateTransaction(ctx, request) if err != nil { return nil, 0, err } return &stellarcap.ReadContractResponse{ - Result: response.Result, + Result: response.ReturnValueXDR, LedgerSequence: response.LedgerSequence, Error: response.Error, }, uint64(response.LedgerSequence), nil @@ -109,20 +150,40 @@ func (s *Stellar) ReadContract( return responseAndMetadata, nil } -func (s *Stellar) GetLatestLedger( - _ context.Context, - _ capabilities.RequestMetadata, - _ *stellarcap.GetLatestLedgerRequest, -) (*capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse], caperrors.Error) { - return nil, caperrors.NewPublicSystemError(errors.New("unimplemented"), caperrors.Unknown) +func convertReadContractRequestFromProto(p *stellarcap.ReadContractRequest) (stellartypes.SimulateTransactionRequest, error) { + if p == nil { + return stellartypes.SimulateTransactionRequest{}, fmt.Errorf("readContractRequest is nil") + } + if p.GetContractId() == "" { + return stellartypes.SimulateTransactionRequest{}, fmt.Errorf("contractID is required") + } + if p.GetFunction() == "" { + return stellartypes.SimulateTransactionRequest{}, fmt.Errorf("function is required") + } + + pArgs := p.GetArgs() + args := make([]stellartypes.ScVal, len(pArgs)) + for i, psv := range pArgs { + sv, err := stellarcap.ProtoToScVal(psv) + if err != nil { + return stellartypes.SimulateTransactionRequest{}, fmt.Errorf("args[%d]: %w", i, err) + } + args[i] = sv + } + return stellartypes.SimulateTransactionRequest{ + ContractID: p.GetContractId(), + Function: p.GetFunction(), + Args: args, + SourceAccount: p.GetSourceAccount(), + }, nil +} + +func (s *Stellar) isUserErrorWriteReport(err error) bool { + return strings.HasPrefix(err.Error(), capcommon.UserError) } -func (s *Stellar) WriteReport( - _ context.Context, - _ capabilities.RequestMetadata, - _ *stellarcap.WriteReportRequest, -) (*capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply], caperrors.Error) { - return nil, caperrors.NewPublicSystemError(errors.New("unimplemented"), caperrors.Unknown) +func (s *Stellar) Info() (capabilities.CapabilityInfo, error) { + return capabilities.CapabilityInfo{}, nil } func isUserError(err error) bool { @@ -130,8 +191,11 @@ func isUserError(err error) bool { } // isStellarNodeInfraError reports whether err is a node-availability failure. It checks both -// error identity and the message substring because errors reach this function through LOOP gRPC , -// which preserve only the gRPC status code and message — Go error identity (errors.Is) does not survive that round trip. +// error identity and the message substring because errors reach this function through LOOP gRPC, +// which preserves only the gRPC status code and message. func isStellarNodeInfraError(err error) bool { return errors.Is(err, multinode.ErrNodeError) || strings.Contains(err.Error(), multinode.ErrNodeError.Error()) } + +var GetError = capcommon.GetError +var NewUserError = caperrors.NewPublicUserError diff --git a/chain_capabilities/stellar/actions/actions_test.go b/chain_capabilities/stellar/actions/actions_test.go index f61ca7b95..3dd673f91 100644 --- a/chain_capabilities/stellar/actions/actions_test.go +++ b/chain_capabilities/stellar/actions/actions_test.go @@ -15,6 +15,7 @@ import ( caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/types" stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" @@ -24,6 +25,7 @@ import ( ctypes "github.com/smartcontractkit/capabilities/libs/chainconsensus/types" + ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" ) @@ -65,6 +67,92 @@ func validReadContractRequest() *stellarcap.ReadContractRequest { } } +func TestNewStellar(t *testing.T) { + t.Parallel() + + t.Run("nil service", func(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + _, err := NewStellar( + nil, + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + 100, + lggr, + limits.Factory{Logger: lggr}, + ts.TransmissionScheduler{}, + 1, + testConsensusHandler{handle: runVolatileHashableHandle}, + monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + nopBeholderProcessor{}, + ) + require.Error(t, err) + require.Contains(t, err.Error(), "stellar service is required") + }) + + t.Run("success", func(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + svc := mocks.NewStellarService(t) + st, err := NewStellar( + svc, + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + 100, + lggr, + limits.Factory{Logger: lggr}, + ts.TransmissionScheduler{}, + 1, + testConsensusHandler{handle: runVolatileHashableHandle}, + monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + nopBeholderProcessor{}, + ) + require.NoError(t, err) + require.NotNil(t, st) + require.NoError(t, st.Close()) + }) +} + +func TestGetLatestLedger(t *testing.T) { + t.Parallel() + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + helper := newMockedStellar(t) + helper.stellarService.EXPECT(). + GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{ + Sequence: 123, + LedgerCloseTime: 1_700_000_000, + }, nil). + Once() + + resp, err := helper.stellar.GetLatestLedger(t.Context(), capabilities.RequestMetadata{}, &stellarcap.GetLatestLedgerRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, uint32(123), resp.Response.GetSequence()) + require.Equal(t, int64(1_700_000_000), resp.Response.GetLedgerCloseTime()) + }) + + t.Run("service error", func(t *testing.T) { + t.Parallel() + helper := newMockedStellar(t) + helper.stellarService.EXPECT(). + GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{}, errors.New("node unavailable")). + Once() + + _, err := helper.stellar.GetLatestLedger(t.Context(), capabilities.RequestMetadata{}, &stellarcap.GetLatestLedgerRequest{}) + require.Error(t, err) + }) +} + +func TestStellar_Info(t *testing.T) { + t.Parallel() + helper := newMockedStellar(t) + info, err := helper.stellar.Info() + require.NoError(t, err) + require.Equal(t, capabilities.CapabilityInfo{}, info) +} + func TestReadContract(t *testing.T) { t.Parallel() @@ -90,9 +178,9 @@ func TestReadContract(t *testing.T) { const ledgerSeq uint32 = 52_000 const result = "AAAAAwAAAAA=" // base64 XDR helper.stellarService.EXPECT(). - ReadContract(mock.Anything, mock.Anything). - Return(stellartypes.ReadContractResponse{ - Result: result, + SimulateTransaction(mock.Anything, mock.Anything). + Return(stellartypes.SimulateTransactionResponse{ + ReturnValueXDR: result, LedgerSequence: ledgerSeq, }, nil). Once() @@ -114,8 +202,8 @@ func TestReadContract(t *testing.T) { // Plain errors (e.g. invalid input surfaced by the relayer) default to user errors. expectedErr := errors.New("failed to decode contract id") helper.stellarService.EXPECT(). - ReadContract(mock.Anything, mock.Anything). - Return(stellartypes.ReadContractResponse{}, expectedErr). + SimulateTransaction(mock.Anything, mock.Anything). + Return(stellartypes.SimulateTransactionResponse{}, expectedErr). Once() _, err := helper.stellar.ReadContract(t.Context(), capabilities.RequestMetadata{ @@ -135,10 +223,10 @@ func TestReadContract(t *testing.T) { // Errors tagged by the relayer with multinode.ErrNodeError must survive the // observation-error serialization round trip and stay classified as infra/system. - expectedErr := fmt.Errorf("failed to read contract: %w", multinode.ErrNodeError) + expectedErr := fmt.Errorf("failed to simulate transaction: %w", multinode.ErrNodeError) helper.stellarService.EXPECT(). - ReadContract(mock.Anything, mock.Anything). - Return(stellartypes.ReadContractResponse{}, expectedErr). + SimulateTransaction(mock.Anything, mock.Anything). + Return(stellartypes.SimulateTransactionResponse{}, expectedErr). Once() _, err := helper.stellar.ReadContract(t.Context(), capabilities.RequestMetadata{ diff --git a/chain_capabilities/stellar/actions/forwarder_client.go b/chain_capabilities/stellar/actions/forwarder_client.go new file mode 100644 index 000000000..898e6365b --- /dev/null +++ b/chain_capabilities/stellar/actions/forwarder_client.go @@ -0,0 +1,414 @@ +package actions + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/stellar/go-stellar-sdk/strkey" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types" + stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" + "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + + capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" +) + +const ( + forwarderReportFunction = "report" + forwarderGetTransmissionInfoFunction = "get_transmission_info" + defaultLedgerBoundsOffset = uint32(20) +) + +type TransmissionState uint32 + +const ( + TransmissionStateNotAttempted TransmissionState = iota + TransmissionStateSucceeded + TransmissionStateInvalidReceiver + TransmissionStateFailed +) + +type TransmissionInfo struct { + State TransmissionState + Transmitter string + LedgerSequence uint32 + Success bool + InvalidReceiver bool +} + +// CREForwarderClient abstracts interaction with the Stellar CRE forwarder contract. +type CREForwarderClient interface { + // InvokeOnReport resolves the relayer signing account, builds forwarder args, and submits via TXM. + InvokeOnReport(ctx context.Context, receiver string, report *sdk.ReportResponse) (*stellartypes.SubmitTransactionResponse, error) + // GetTransmissionInfo queries the forwarder for transmission state. + GetTransmissionInfo(ctx context.Context, receiver string, workflowExecutionID [32]byte, reportID [2]byte) (TransmissionInfo, error) + GetReportProcessedEvents(ctx context.Context, receiver string, workflowExecutionID [32]byte, reportID [2]byte) ([]ReportProcessedEvent, error) + ForwarderAddress() string +} + +type forwarderClient struct { + types.StellarService + lggr logger.Logger + forwarderAddress string + forwarderLookbackLedgers int64 +} + +type ReportProcessedEvent struct { + TxHash string + Ledger uint32 + Success bool +} + +func newForwarderClient(service types.StellarService, lggr logger.Logger, forwarderAddress string, forwarderLookbackLedgers int64) CREForwarderClient { + if forwarderLookbackLedgers <= 0 { + forwarderLookbackLedgers = defaultForwarderLookbackLedgers + } + return &forwarderClient{ + StellarService: service, + lggr: logger.Named(lggr, "ForwarderClient"), + forwarderAddress: forwarderAddress, + forwarderLookbackLedgers: forwarderLookbackLedgers, + } +} + +func (fc *forwarderClient) ForwarderAddress() string { + return fc.forwarderAddress +} + +func (fc *forwarderClient) InvokeOnReport( + ctx context.Context, + receiver string, + report *sdk.ReportResponse, +) (*stellartypes.SubmitTransactionResponse, error) { + transmitter, err := fc.resolveSigningAccount(ctx) + if err != nil { + return nil, fmt.Errorf("failed to resolve signing account: %w", err) + } + + args, err := buildForwarderReportArgs(transmitter, receiver, report) + if err != nil { + return nil, err + } + + submitResp, err := fc.SubmitTransaction(ctx, stellartypes.SubmitTransactionRequest{ + ContractID: fc.forwarderAddress, + Function: forwarderReportFunction, + Args: args, + LedgerBoundsOffset: defaultLedgerBoundsOffset, + }) + if err != nil { + return nil, fmt.Errorf("failed to submit forwarder report transaction: %w", err) + } + return submitResp, nil +} + +func (fc *forwarderClient) GetTransmissionInfo( + ctx context.Context, + receiver string, + workflowExecutionID [32]byte, + reportID [2]byte, +) (TransmissionInfo, error) { + args, err := buildTransmissionInfoArgs(receiver, workflowExecutionID, reportID) + if err != nil { + return TransmissionInfo{}, err + } + + resp, err := fc.SimulateTransaction(ctx, stellartypes.SimulateTransactionRequest{ + ContractID: fc.forwarderAddress, + Function: forwarderGetTransmissionInfoFunction, + Args: args, + }) + if err != nil { + return TransmissionInfo{}, err + } + if resp.Error != "" { + return TransmissionInfo{}, fmt.Errorf("forwarder simulation failed: %s", resp.Error) + } + if resp.ReturnValueXDR == "" { + return TransmissionInfo{State: TransmissionStateNotAttempted}, nil + } + + return parseTransmissionInfo(resp.ReturnValueXDR, resp.LedgerSequence) +} + +func (fc *forwarderClient) GetReportProcessedEvents( + ctx context.Context, + receiver string, + workflowExecutionID [32]byte, + reportID [2]byte, +) ([]ReportProcessedEvent, error) { + startLedger, err := fc.startLedger(ctx) + if err != nil { + return nil, err + } + topicFilter, err := buildReportProcessedTopicFilter(receiver, workflowExecutionID, reportID) + if err != nil { + return nil, err + } + + resp, err := fc.GetEvents(ctx, stellartypes.GetEventsRequest{ + StartLedger: startLedger, + Filters: []stellartypes.EventFilter{ + { + EventTypes: []stellartypes.EventType{stellartypes.EventTypeContract}, + ContractIDs: []string{fc.forwarderAddress}, + Topics: []stellartypes.TopicFilter{topicFilter}, + }, + }, + }) + if err != nil { + return nil, err + } + + events := make([]ReportProcessedEvent, 0, len(resp.Events)) + for i, e := range resp.Events { + if e.TransactionHash == "" { + return nil, fmt.Errorf("empty tx hash at event index %d", i) + } + if e.Value.Type != stellartypes.ScValTypeBool || e.Value.Bool == nil { + return nil, fmt.Errorf("event %s value is not a bool", e.TransactionHash) + } + events = append(events, ReportProcessedEvent{ + TxHash: e.TransactionHash, + Ledger: e.Ledger, + Success: *e.Value.Bool, + }) + } + return events, nil +} + +func (fc *forwarderClient) startLedger(ctx context.Context) (uint32, error) { + latest, err := fc.GetLatestLedger(ctx) + if err != nil { + return 0, err + } + if int64(latest.Sequence) <= fc.forwarderLookbackLedgers { + return 1, nil + } + start := int64(latest.Sequence) - fc.forwarderLookbackLedgers + return uint32(start), nil //nolint:gosec // G115: start is positive and at most latest.Sequence (uint32) +} + +func (fc *forwarderClient) resolveSigningAccount(ctx context.Context) (string, error) { + resp, err := fc.GetSigningAccount(ctx) + if err != nil { + return "", err + } + if resp.AccountAddress == "" { + return "", errors.New("relayer returned empty signing account") + } + return resp.AccountAddress, nil +} + +func parseTransmissionInfo(result string, ledgerSequence uint32) (TransmissionInfo, error) { + var sv xdr.ScVal + if err := xdr.SafeUnmarshalBase64(result, &sv); err != nil { + return TransmissionInfo{}, fmt.Errorf("decode transmission info result XDR: %w", err) + } + + info := TransmissionInfo{State: TransmissionStateNotAttempted, LedgerSequence: ledgerSequence} + parseFieldsIntoTransmissionInfo(&info, sv) + info.Success = info.State == TransmissionStateSucceeded + info.InvalidReceiver = info.State == TransmissionStateInvalidReceiver + return info, nil +} + +func parseFieldsIntoTransmissionInfo(info *TransmissionInfo, sv xdr.ScVal) { + switch sv.Type { + case xdr.ScValTypeScvU32: + if sv.U32 != nil { + info.State = TransmissionState(*sv.U32) + } + case xdr.ScValTypeScvI32: + if sv.I32 != nil && *sv.I32 >= 0 { + info.State = TransmissionState(*sv.I32) + } + case xdr.ScValTypeScvVec: + if sv.Vec == nil || *sv.Vec == nil { + return + } + vec := **sv.Vec + if len(vec) > 0 { + parseFieldsIntoTransmissionInfo(info, vec[0]) + } + if len(vec) > 1 { + if txr, ok := extractAddressString(vec[1]); ok { + info.Transmitter = txr + } + } + case xdr.ScValTypeScvMap: + if sv.Map == nil || *sv.Map == nil { + return + } + for _, entry := range **sv.Map { + key, ok := extractStringish(entry.Key) + if !ok { + continue + } + switch strings.ToLower(key) { + case "state": + parseFieldsIntoTransmissionInfo(info, entry.Val) + case "transmitter": + if txr, ok := extractAddressString(entry.Val); ok { + info.Transmitter = txr + } + case "success": + if b := extractBool(entry.Val); b != nil { + info.Success = *b + } + case "invalid_receiver", "invalidreceiver": + if b := extractBool(entry.Val); b != nil { + info.InvalidReceiver = *b + } + } + } + default: + } +} + +// buildForwarderReportArgs constructs the domain ScVal argument list for the forwarder report() function. +// +// Arg order matches the on-chain Rust signature: +// +// report(transmitter: Address, receiver: Address, raw_report: Bytes, report_context: Bytes, signatures: Vec>) +func buildForwarderReportArgs(transmitter, receiver string, report *sdk.ReportResponse) ([]stellartypes.ScVal, error) { + transmitterVal, err := accountAddressToScVal(transmitter) + if err != nil { + return nil, fmt.Errorf("transmitter: %w", err) + } + + receiverVal, err := contractAddressToScVal(receiver) + if err != nil { + return nil, err + } + + rawReportVal := stellartypes.ScVal{Type: stellartypes.ScValTypeBytes, Bytes: report.GetRawReport()} + reportContextVal := stellartypes.ScVal{Type: stellartypes.ScValTypeBytes, Bytes: report.GetReportContext()} + + sigVals := make([]*stellartypes.ScVal, len(report.Sigs)) + for i, sig := range report.Sigs { + s := stellartypes.ScVal{Type: stellartypes.ScValTypeBytes, Bytes: sig.GetSignature()} + sigVals[i] = &s + } + sigsVal := stellartypes.ScVal{ + Type: stellartypes.ScValTypeVec, + Vec: &stellartypes.ScVec{Values: sigVals}, + } + + return []stellartypes.ScVal{transmitterVal, receiverVal, rawReportVal, reportContextVal, sigsVal}, nil +} + +func buildTransmissionInfoArgs(receiver string, workflowExecutionID [32]byte, reportID [2]byte) ([]stellartypes.ScVal, error) { + receiverVal, err := contractAddressToScVal(receiver) + if err != nil { + return nil, err + } + return []stellartypes.ScVal{ + receiverVal, + {Type: stellartypes.ScValTypeBytes, Bytes: workflowExecutionID[:]}, + {Type: stellartypes.ScValTypeBytes, Bytes: reportID[:]}, + }, nil +} + +func buildReportProcessedTopicFilter(receiver string, workflowExecutionID [32]byte, reportID [2]byte) (stellartypes.TopicFilter, error) { + eventName := reportProcessedTopicPrefix + receiverVal, err := contractAddressToScVal(receiver) + if err != nil { + return stellartypes.TopicFilter{}, err + } + return stellartypes.TopicFilter{ + Segments: []stellartypes.TopicSegment{ + {Value: &stellartypes.ScVal{Type: stellartypes.ScValTypeSymbol, Symbol: &eventName}}, + {Value: &receiverVal}, + {Value: &stellartypes.ScVal{Type: stellartypes.ScValTypeBytes, Bytes: workflowExecutionID[:]}}, + {Value: &stellartypes.ScVal{Type: stellartypes.ScValTypeBytes, Bytes: reportID[:]}}, + }, + }, nil +} + +func contractAddressToScVal(contractID string) (stellartypes.ScVal, error) { + contractBytes, err := strkey.Decode(strkey.VersionByteContract, contractID) + if err != nil { + return stellartypes.ScVal{}, fmt.Errorf("%s invalid contract address %q: %w", capcommon.UserError, contractID, err) + } + if len(contractBytes) != 32 { + return stellartypes.ScVal{}, fmt.Errorf("%s contract address must decode to 32 bytes, got %d", capcommon.UserError, len(contractBytes)) + } + return stellartypes.ScVal{ + Type: stellartypes.ScValTypeAddress, + Address: &stellartypes.ScAddress{ + Type: stellartypes.ScAddressTypeContractID, + ContractID: contractBytes, + }, + }, nil +} + +func accountAddressToScVal(accountID string) (stellartypes.ScVal, error) { + accountBytes, err := strkey.Decode(strkey.VersionByteAccountID, accountID) + if err != nil { + return stellartypes.ScVal{}, fmt.Errorf("invalid account address %q: %w", accountID, err) + } + if len(accountBytes) != 32 { + return stellartypes.ScVal{}, fmt.Errorf("account address must decode to 32 bytes, got %d", len(accountBytes)) + } + return stellartypes.ScVal{ + Type: stellartypes.ScValTypeAddress, + Address: &stellartypes.ScAddress{ + Type: stellartypes.ScAddressTypeAccountID, + AccountID: accountBytes, + }, + }, nil +} + +func extractStringish(sv xdr.ScVal) (string, bool) { + switch sv.Type { + case xdr.ScValTypeScvSymbol: + if sv.Sym == nil { + return "", false + } + return string(*sv.Sym), true + case xdr.ScValTypeScvString: + if sv.Str == nil { + return "", false + } + return string(*sv.Str), true + default: + return "", false + } +} + +func extractAddressString(sv xdr.ScVal) (string, bool) { + if sv.Type != xdr.ScValTypeScvAddress || sv.Address == nil { + return "", false + } + switch sv.Address.Type { + case xdr.ScAddressTypeScAddressTypeAccount: + if sv.Address.AccountId == nil { + return "", false + } + raw := sv.Address.AccountId.Ed25519 + out, err := strkey.Encode(strkey.VersionByteAccountID, raw[:]) + return out, err == nil + case xdr.ScAddressTypeScAddressTypeContract: + if sv.Address.ContractId == nil { + return "", false + } + raw := *sv.Address.ContractId + out, err := strkey.Encode(strkey.VersionByteContract, raw[:]) + return out, err == nil + default: + return "", false + } +} + +func extractBool(sv xdr.ScVal) *bool { + if sv.Type != xdr.ScValTypeScvBool || sv.B == nil { + return nil + } + b := *sv.B + return &b +} diff --git a/chain_capabilities/stellar/actions/tx_hash_retriever.go b/chain_capabilities/stellar/actions/tx_hash_retriever.go new file mode 100644 index 000000000..8c2a00a7c --- /dev/null +++ b/chain_capabilities/stellar/actions/tx_hash_retriever.go @@ -0,0 +1,149 @@ +package actions + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" +) + +const ( + reportProcessedTopicPrefix = "forwarder_ReportProcessed" + defaultForwarderLookbackLedgers = int64(100) + failedToRetrieveTxHashErrorMsg = "failed to retrieve tx hash for report" +) + +var ErrUnexpectedSuccessfulTransmission = errors.New("unexpected successful transmission") + +type TxHashRetriever struct { + forwarderClient CREForwarderClient + receiver string + workflowExecutionID [32]byte + reportID [2]byte + lggr logger.SugaredLogger +} + +func NewTxHashRetriever( + forwarderClient CREForwarderClient, + lggr logger.SugaredLogger, + receiver string, + workflowExecutionID [32]byte, + reportID [2]byte, +) TxHashRetriever { + return TxHashRetriever{ + forwarderClient: forwarderClient, + receiver: receiver, + workflowExecutionID: workflowExecutionID, + reportID: reportID, + lggr: lggr, + } +} + +type eventDetails struct { + txHash string + ledger uint32 + isSuccess bool +} + +func (d eventDetails) String() string { + resultStr := "success" + if !d.isSuccess { + resultStr = "failed" + } + return fmt.Sprintf("hash=%s ledger=%d result=%s", d.txHash, d.ledger, resultStr) +} + +type eventDetailsList []eventDetails + +func (l eventDetailsList) String() string { + if len(l) == 0 { + return "[]" + } + parts := make([]string, len(l)) + for i, d := range l { + parts[i] = d.String() + } + return "[" + strings.Join(parts, ", ") + "]" +} + +func (r *TxHashRetriever) GetSuccessfulTransmissionHash(ctx context.Context) (string, error) { + details, err := r.fetchAndParseEvents(ctx) + if err != nil { + return "", err + } + for _, d := range details { + if d.isSuccess { + return d.txHash, nil + } + } + r.lggr.Errorw("No successful transmission found", "txCount", len(details), "transactions", details.String()) + return "", fmt.Errorf("no successful transmission found. Found %d transactions (all failed): %s", + len(details), details) +} + +func (r *TxHashRetriever) GetFailedTransmissionHash(ctx context.Context) (string, error) { + hash, _, err := r.GetFailedTransmissionHashWithCount(ctx) + return hash, err +} + +func (r *TxHashRetriever) GetFailedTransmissionHashWithCount(ctx context.Context) (string, int, error) { + details, err := r.fetchAndParseEvents(ctx) + if err != nil { + return "", 0, err + } + for _, d := range details { + if d.isSuccess { + return "", len(details), fmt.Errorf("%w, successful tx hash: %s", + ErrUnexpectedSuccessfulTransmission, d.txHash) + } + } + + earliestIdx := 0 + for i, d := range details { + if d.ledger < details[earliestIdx].ledger { + earliestIdx = i + } + } + + r.lggr.Debugw("Returning earliest failed transmission", + "txCount", len(details), + "selectedTxHash", details[earliestIdx].txHash, + "receiver", r.receiver, + ) + + return details[earliestIdx].txHash, len(details), nil +} + +func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context) (eventDetailsList, error) { + events, err := capcommon.WithPollingRetry(ctx, r.lggr, func(ctx context.Context) ([]ReportProcessedEvent, error) { + events, fetchErr := r.forwarderClient.GetReportProcessedEvents(ctx, r.receiver, r.workflowExecutionID, r.reportID) + if fetchErr != nil { + return nil, fetchErr + } + if len(events) == 0 { + return nil, errors.New("no matching events found yet, retrying") + } + return events, nil + }) + if err != nil { + return nil, fmt.Errorf("%s: %w", failedToRetrieveTxHashErrorMsg, err) + } + + return buildEventDetails(events), nil +} + +func buildEventDetails(events []ReportProcessedEvent) eventDetailsList { + details := make(eventDetailsList, len(events)) + for i, e := range events { + details[i] = eventDetails{ + txHash: e.TxHash, + ledger: e.Ledger, + isSuccess: e.Success, + } + } + return details +} diff --git a/chain_capabilities/stellar/actions/write_report.go b/chain_capabilities/stellar/actions/write_report.go new file mode 100644 index 000000000..451b395fc --- /dev/null +++ b/chain_capabilities/stellar/actions/write_report.go @@ -0,0 +1,436 @@ +package actions + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "strings" + "time" + + "github.com/stellar/go-stellar-sdk/strkey" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" + stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" + commoncfg "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/contexts" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + types "github.com/smartcontractkit/chainlink-common/pkg/types" + stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" + + capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" + ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/metering" +) + +const ocrSignatureLen = 65 + +type writeReport struct { + service types.StellarService + forwarderClient CREForwarderClient + lggr logger.SugaredLogger + forwarderLookbackLedgers int64 + chainSelector uint64 + reportSizeLimit limits.BoundLimiter[commoncfg.Size] + transmissionScheduler ts.TransmissionScheduler +} + +type writeReportExecuteResult struct { + response *stellarcap.WriteReportReply + billNode bool +} + +func (s *Stellar) WriteReport( + ctx context.Context, + metadata capabilities.RequestMetadata, + input *stellarcap.WriteReportRequest, +) (*capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply], caperrors.Error) { + ctx = metadata.ContextWithCRE(ctx) + + if err := s.validateWriteReportInputs(metadata, input); err != nil { + return nil, NewUserError(err, caperrors.InvalidArgument) + } + + wr := &writeReport{ + service: s.StellarService, + forwarderClient: s.forwarderClient, + lggr: s.lggr, + forwarderLookbackLedgers: s.forwarderLookbackLedgers, + chainSelector: s.chainSelector, + reportSizeLimit: s.reportSizeLimit, + transmissionScheduler: s.transmissionScheduler, + } + + result, err := wr.execute(ctx, metadata, input) + if err != nil { + return nil, GetError(err, s.isUserErrorWriteReport(err)) + } + + var meteringMeta capabilities.ResponseMetadata + if result.billNode && result.response.TransactionFee != nil { + meteringMeta = metering.GetResponseMetadataWriteReport(*result.response.TransactionFee, s.chainSelector) + } + + return &capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply]{ + Response: result.response, + ResponseMetadata: meteringMeta, + }, nil +} + +func (wr *writeReport) execute( + ctx context.Context, + metadata capabilities.RequestMetadata, + request *stellarcap.WriteReportRequest, +) (writeReportExecuteResult, error) { + ctx = contexts.WithChainSelector(ctx, wr.chainSelector) + + transmissionID, workflowExecutionID, reportID, err := getTransmissionID(metadata.WorkflowExecutionID, request) + if err != nil { + return writeReportExecuteResult{}, err + } + + queuePosition := wr.transmissionScheduler.GetQueuePosition(hex.EncodeToString(transmissionID[:])) + wr.lggr = wr.lggr.With("queuePosition", queuePosition, "forwarder", wr.forwarderClient.ForwarderAddress(), "receiver", request.ContractId) + + txHashRetriever := NewTxHashRetriever( + wr.forwarderClient, + wr.lggr, + request.ContractId, + workflowExecutionID, + reportID, + ) + + info, err := wr.pollTransmissionInfo(ctx, request.ContractId, workflowExecutionID, reportID, queuePosition) + if err != nil { + return writeReportExecuteResult{}, fmt.Errorf("failed to get transmission info: %w", err) + } + + switch info.State { + case TransmissionStateSucceeded: + txHash, hashErr := txHashRetriever.GetSuccessfulTransmissionHash(ctx) + if hashErr != nil { + wr.lggr.Errorw("Returning without a transmission attempt - prior transmission succeeded, but failed to retrieve its tx hash", "error", hashErr) + return writeReportExecuteResult{}, hashErr + } + reply := wr.successReplyFromObservedState(info) + if err := wr.populateReplyFromTx(ctx, reply, txHash); err != nil { + return writeReportExecuteResult{}, err + } + return observedWriteReportResult(reply), nil + case TransmissionStateInvalidReceiver: + txHash, hashErr := txHashRetriever.GetFailedTransmissionHash(ctx) + if hashErr != nil { + wr.lggr.Errorw("Returning without a transmission attempt - prior transmission marked receiver invalid, but failed to retrieve its tx hash", "error", hashErr) + return writeReportExecuteResult{}, hashErr + } + reply := wr.revertedReplyFromObservedState(info, "receiver contract cannot accept reports: not a Wasm contract or missing on_report function") + if err := wr.populateReplyFromTx(ctx, reply, txHash); err != nil { + return writeReportExecuteResult{}, fmt.Errorf("%s receiver contract cannot accept reports: not a Wasm contract or missing on_report function: additional error while fetching tx details: %w", capcommon.UserError, err) + } + return observedWriteReportResult(reply), nil + case TransmissionStateFailed: + txHash, hashErr := txHashRetriever.GetFailedTransmissionHash(ctx) + if hashErr != nil { + if errors.Is(hashErr, ErrUnexpectedSuccessfulTransmission) { + wr.lggr.Errorw("Returning without a transmission attempt - unexpected successful transmission while state is failed", "error", hashErr) + } else { + wr.lggr.Errorw("Returning without a transmission attempt - prior transmission failed, but failed to retrieve its tx hash", "error", hashErr) + } + return writeReportExecuteResult{}, hashErr + } + reply := wr.revertedReplyFromObservedState(info, "receiver contract execution failed") + if err := wr.populateReplyFromTx(ctx, reply, txHash); err != nil { + return writeReportExecuteResult{}, fmt.Errorf("%s receiver contract execution failed: additional error while fetching tx details: %w", capcommon.UserError, err) + } + return observedWriteReportResult(reply), nil + case TransmissionStateNotAttempted: + default: + return writeReportExecuteResult{}, fmt.Errorf("unexpected transmission state: %d", info.State) + } + + if err := wr.reportSizeLimit.Check(ctx, commoncfg.SizeOf(request.Report.RawReport)); err != nil { + return writeReportExecuteResult{}, fmt.Errorf("%s report size exceeds limit: %w", capcommon.UserError, err) + } + + submitResp, err := wr.forwarderClient.InvokeOnReport(ctx, request.ContractId, request.Report) + if err != nil { + return writeReportExecuteResult{}, err + } + + // Poll for the canonical on-chain transmission state. The forwarder may record the + // outcome after the tx confirms; retry until it is visible or the context expires. + postInfo, pollErr := capcommon.WithPollingRetry(ctx, wr.lggr, func(ctx context.Context) (TransmissionInfo, error) { + readInfo, readErr := wr.forwarderClient.GetTransmissionInfo(ctx, request.ContractId, workflowExecutionID, reportID) + if readErr != nil { + return TransmissionInfo{}, readErr + } + if readInfo.State == TransmissionStateNotAttempted { + return TransmissionInfo{}, errors.New("tx submitted but transmission info not yet visible on-chain, retrying") + } + return readInfo, nil + }) + if pollErr != nil { + // TX was submitted but on-chain state is still not visible; use the TXM result directly. + wr.lggr.Warnw("Failed to poll transmission info after submit, returning reply from TX outcome", "error", pollErr) + return submittedWriteReportResult(wr.replyFromOwnTransaction(submitResp)), nil + } + + switch postInfo.State { + case TransmissionStateSucceeded: + reply := wr.successReplyFromObservedState(postInfo) + populateReplyFromSubmit(reply, submitResp) + return submittedWriteReportResult(reply), nil + case TransmissionStateInvalidReceiver: + reply := wr.revertedReplyFromObservedState(postInfo, "receiver contract cannot accept reports: not a Wasm contract or missing on_report function") + populateReplyFromSubmit(reply, submitResp) + return submittedWriteReportResult(reply), nil + case TransmissionStateFailed: + reply := wr.revertedReplyFromObservedState(postInfo, "receiver contract execution failed") + populateReplyFromSubmit(reply, submitResp) + return submittedWriteReportResult(reply), nil + default: + return submittedWriteReportResult(wr.replyFromOwnTransaction(submitResp)), nil + } +} + +func (s *Stellar) validateWriteReportInputs(metadata capabilities.RequestMetadata, request *stellarcap.WriteReportRequest) error { + if request == nil { + return errors.New("nil WriteReportRequest") + } + if request.Report == nil { + return errors.New("nil SignedReport in WriteReportRequest") + } + if request.ContractId == "" { + return errors.New("contractId is required") + } + if _, err := strkey.Decode(strkey.VersionByteContract, request.ContractId); err != nil { + return fmt.Errorf("%s invalid receiver contract address: %w", capcommon.UserError, err) + } + if len(request.Report.Sigs) == 0 { + return fmt.Errorf("%s signed report must contain at least one signature", capcommon.UserError) + } + for i, sig := range request.Report.Sigs { + if len(sig.GetSignature()) != ocrSignatureLen { + return fmt.Errorf("%s signature %d has invalid length: got %d, want %d", capcommon.UserError, i, len(sig.GetSignature()), ocrSignatureLen) + } + } + + reportMetadata, err := capcommon.DecodeReportMetadata(request.Report.RawReport) + if err != nil { + return fmt.Errorf("%s failed to decode report metadata: %w", capcommon.UserError, err) + } + if reportMetadata.Version != 1 { + return fmt.Errorf("%s unsupported report metadata version: %d", capcommon.UserError, reportMetadata.Version) + } + if reportMetadata.ExecutionID != metadata.WorkflowExecutionID { + return fmt.Errorf("%s report workflowExecutionID does not match request metadata", capcommon.UserError) + } + if !strings.EqualFold(reportMetadata.WorkflowOwner, metadata.WorkflowOwner) { + return fmt.Errorf("%s report workflowOwner does not match request metadata", capcommon.UserError) + } + expectedWorkflowName := metadata.WorkflowName + if len(expectedWorkflowName) < 20 { + expectedWorkflowName += strings.Repeat("0", 20-len(expectedWorkflowName)) + } + if !strings.EqualFold(reportMetadata.WorkflowName, expectedWorkflowName) { + return fmt.Errorf("%s report workflowName does not match request metadata", capcommon.UserError) + } + if reportMetadata.WorkflowID != metadata.WorkflowID { + return fmt.Errorf("%s report workflowID does not match request metadata", capcommon.UserError) + } + return nil +} + +func getTransmissionID(workflowExecutionID string, request *stellarcap.WriteReportRequest) ([32]byte, [32]byte, [2]byte, error) { + rawExecutionID, reportID, err := capcommon.ParseTransmissionComponents(workflowExecutionID, request.Report.RawReport) + if err != nil { + return [32]byte{}, [32]byte{}, [2]byte{}, err + } + + receiverBytes, err := strkey.Decode(strkey.VersionByteContract, request.ContractId) + if err != nil { + return [32]byte{}, [32]byte{}, [2]byte{}, fmt.Errorf("%s invalid receiver contract address: %w", capcommon.UserError, err) + } + + hash := sha256.New() + hash.Write(receiverBytes) + hash.Write(rawExecutionID[:]) + hash.Write(reportID[:]) + + var transmissionID [32]byte + copy(transmissionID[:], hash.Sum(nil)) + return transmissionID, rawExecutionID, reportID, nil +} + +// pollTransmissionInfo returns the forwarder transmission state at this point in the +// DON schedule. Nodes with queuePosition > 0 wait until queuePosition×DeltaStage before +// submitting, polling with exponential backoff so an earlier peer's success or terminal +// failure can be observed without spending fees on a duplicate submit. +func (wr *writeReport) pollTransmissionInfo( + ctx context.Context, + receiver string, + workflowExecutionID [32]byte, + reportID [2]byte, + queuePosition int, +) (lastValidInfo TransmissionInfo, err error) { + if queuePosition <= 0 { + return capcommon.WithQuickRetry(ctx, wr.lggr, func(ctx context.Context) (TransmissionInfo, error) { + return wr.forwarderClient.GetTransmissionInfo(ctx, receiver, workflowExecutionID, reportID) + }) + } + + delay := time.Duration(queuePosition) * wr.transmissionScheduler.DeltaStage + wr.lggr.Infow("Polling until slot or state change", "delay", delay, "deltaStage", wr.transmissionScheduler.DeltaStage) + + attempt := 0 + stageTimer := time.NewTimer(delay) + hadSuccessfulPoll := false + defer stageTimer.Stop() + + for { + if info, infoErr := wr.forwarderClient.GetTransmissionInfo(ctx, receiver, workflowExecutionID, reportID); infoErr != nil { + wr.lggr.Debugw("GetTransmissionInfo failed during polling", "error", infoErr, "attempt", attempt) + } else { + hadSuccessfulPoll = true + lastValidInfo = info + switch lastValidInfo.State { + case TransmissionStateSucceeded, TransmissionStateInvalidReceiver, TransmissionStateFailed: + return lastValidInfo, nil + case TransmissionStateNotAttempted: + default: + wr.lggr.Warnw("Unexpected transmission state during polling, continuing", "state", lastValidInfo.State) + } + } + + wait := (100 * time.Millisecond) << min(attempt, 5) + if wait > 2*time.Second { + wait = 2 * time.Second + } + attempt++ + + select { + case <-ctx.Done(): + return TransmissionInfo{}, fmt.Errorf("timed out waiting for transmission info") + case <-stageTimer.C: + if lastValidInfo.State == TransmissionStateNotAttempted { + if finalInfo, finalErr := wr.forwarderClient.GetTransmissionInfo(ctx, receiver, workflowExecutionID, reportID); finalErr == nil { + hadSuccessfulPoll = true + lastValidInfo = finalInfo + } else { + wr.lggr.Debugw("Final GetTransmissionInfo at delta stage boundary failed", "error", finalErr) + } + } + if !hadSuccessfulPoll { + wr.lggr.Errorw("All GetTransmissionInfo polls failed during delta stage window, cannot determine transmission state") + return TransmissionInfo{}, fmt.Errorf("all GetTransmissionInfo polls failed during delta stage window") + } + wr.lggr.Infow("Delta stage has passed, returning transmission info", "state", lastValidInfo.State, "attempts", attempt) + return lastValidInfo, nil + case <-time.After(wait): + } + } +} + +func observedWriteReportResult(reply *stellarcap.WriteReportReply) writeReportExecuteResult { + return writeReportExecuteResult{response: reply, billNode: false} +} + +func submittedWriteReportResult(reply *stellarcap.WriteReportReply) writeReportExecuteResult { + return writeReportExecuteResult{response: reply, billNode: true} +} + +func (wr *writeReport) populateReplyFromTx(ctx context.Context, reply *stellarcap.WriteReportReply, txHash string) error { + txResp, err := capcommon.WithQuickRetry(ctx, wr.lggr, func(ctx context.Context) (stellartypes.GetTransactionResponse, error) { + return wr.service.GetTransaction(ctx, stellartypes.GetTransactionRequest{TxHash: txHash}) + }) + if err != nil { + return fmt.Errorf("failed to get transaction for tx hash %s: %w", txHash, err) + } + reply.TxHash = capcommon.Ptr(txHash) + if txResp.FeeStroops > 0 { + fee := txResp.FeeStroops + reply.TransactionFee = &fee + } + if txResp.LedgerCloseTime > 0 { + ts := uint64(txResp.LedgerCloseTime) * 1_000_000 + reply.BlockTimestamp = &ts + } + if reply.LedgerSequence == nil && txResp.LedgerSequence > 0 { + reply.LedgerSequence = capcommon.Ptr(txResp.LedgerSequence) + } + return nil +} + +func (wr *writeReport) successReplyFromObservedState(info TransmissionInfo) *stellarcap.WriteReportReply { + reply := &stellarcap.WriteReportReply{ + TxStatus: stellarcap.TxStatus_TX_STATUS_SUCCESS, + } + status := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS + reply.ReceiverContractExecutionStatus = &status + if info.LedgerSequence != 0 { + reply.LedgerSequence = capcommon.Ptr(info.LedgerSequence) + } + return reply +} + +func (wr *writeReport) revertedReplyFromObservedState(info TransmissionInfo, errorMsg string) *stellarcap.WriteReportReply { + reply := &stellarcap.WriteReportReply{ + TxStatus: stellarcap.TxStatus_TX_STATUS_REVERTED, + } + status := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_REVERTED + reply.ReceiverContractExecutionStatus = &status + if info.LedgerSequence != 0 { + reply.LedgerSequence = capcommon.Ptr(info.LedgerSequence) + } + if errorMsg != "" { + reply.ErrorMessage = capcommon.Ptr(errorMsg) + } + return reply +} + +// replyFromOwnTransaction builds a WriteReportReply directly from a SubmitTransactionResponse +// when the post-submit transmission info poll fails or returns an unexpected state. +func (wr *writeReport) replyFromOwnTransaction(resp *stellartypes.SubmitTransactionResponse) *stellarcap.WriteReportReply { + reply := &stellarcap.WriteReportReply{} + if resp == nil { + reply.TxStatus = stellarcap.TxStatus_TX_STATUS_FATAL + return reply + } + populateReplyFromSubmit(reply, resp) + switch resp.TxStatus { + case stellartypes.TxSuccess: + reply.TxStatus = stellarcap.TxStatus_TX_STATUS_SUCCESS + status := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS + reply.ReceiverContractExecutionStatus = &status + case stellartypes.TxFailed: + reply.TxStatus = stellarcap.TxStatus_TX_STATUS_REVERTED + status := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_REVERTED + reply.ReceiverContractExecutionStatus = &status + if resp.Error != "" { + reply.ErrorMessage = capcommon.Ptr("on-chain transaction failed: " + resp.Error) + } + default: // TxFatal + reply.TxStatus = stellarcap.TxStatus_TX_STATUS_FATAL + } + return reply +} + +// populateReplyFromSubmit sets tx hash, fee, and block timestamp on the reply from a SubmitTransactionResponse. +// Ledger sequence on submit paths is populated from get_transmission_info (post-submit poll), not from ResultMetaXDR. +func populateReplyFromSubmit(reply *stellarcap.WriteReportReply, resp *stellartypes.SubmitTransactionResponse) { + if resp == nil { + return + } + if resp.TxHash != "" { + reply.TxHash = capcommon.Ptr(resp.TxHash) + } + if resp.TransactionFee != nil { + reply.TransactionFee = resp.TransactionFee + } + if resp.BlockTimestamp != nil { + reply.BlockTimestamp = resp.BlockTimestamp + } +} diff --git a/chain_capabilities/stellar/actions/write_report_test.go b/chain_capabilities/stellar/actions/write_report_test.go new file mode 100644 index 000000000..26ae9b6e8 --- /dev/null +++ b/chain_capabilities/stellar/actions/write_report_test.go @@ -0,0 +1,1150 @@ +package actions + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/stellar/go-stellar-sdk/strkey" + "github.com/stellar/go-stellar-sdk/xdr" + + p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + ocrtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink-common/pkg/types" + stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" + "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" + workflowpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + + commontest "github.com/smartcontractkit/capabilities/chain_capabilities/common/test" + ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/metering" + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" +) + +// ─── constants ─────────────────────────────────────────────────────────────── + +const ( + testWRChainSelector = uint64(12345) + + // Valid C… StrKey Stellar contract address (56 chars). + testForwarderAddress = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC" + // Different valid C… address used as the receiver contract. + testReceiverAddress = "CA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJUWDA" + // Valid G… StrKey Stellar account address (56 chars). + testNodeAddress = "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7" + + testTxHash = "abc123txhash" + testFee = uint64(5_000) // stroops + // Ledger close time in microseconds (1_700_000_000 unix seconds). + testBlockTimestamp = uint64(1_700_000_000_000_000) +) + +// ─── helper builders ───────────────────────────────────────────────────────── + +type writeReportHelper struct { + svc *mocks.StellarService + stellar *Stellar +} + +// newWriteReportHelper builds a single-node Stellar under test backed by a fresh StellarService mock. +func newWriteReportHelper(t *testing.T) *writeReportHelper { + t.Helper() + lggr := logger.Test(t) + mockSvc := mocks.NewStellarService(t) + + myPeerID := p2ptypes.PeerID{1} + scheduler := ts.NewTransmissionScheduler( + myPeerID, []p2ptypes.PeerID{myPeerID}, 100*time.Millisecond, 0, lggr) + + s := &Stellar{ + StellarService: mockSvc, + lggr: logger.Sugared(lggr), + chainSelector: testWRChainSelector, + forwarderClient: newForwarderClient(mockSvc, lggr, testForwarderAddress, 100), + forwarderLookbackLedgers: 100, + transmissionScheduler: scheduler, + messageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + beholderProcessor: nopBeholderProcessor{}, + handler: testConsensusHandler{handle: runVolatileHashableHandle}, + } + require.NoError(t, s.initLimiters(limits.Factory{Logger: lggr})) + return &writeReportHelper{svc: mockSvc, stellar: s} +} + +func signingAccountResp() stellartypes.GetSigningAccountResponse { + return stellartypes.GetSigningAccountResponse{AccountAddress: testNodeAddress} +} + +func (h *writeReportHelper) expectSigningAccount(t *testing.T) { + t.Helper() + h.svc.EXPECT().GetSigningAccount(mock.Anything). + Return(signingAccountResp(), nil).Once() +} + +// newWRReportFixture generates a self-consistent (metadata, RequestMetadata, WriteReportRequest) triple. +func newWRReportFixture(t *testing.T) (ocrtypes.Metadata, capabilities.RequestMetadata, *stellarcap.WriteReportRequest) { + t.Helper() + rm := ocrtypes.Metadata{ + Version: 1, + ExecutionID: hex.EncodeToString(commontest.RandomBytes(32)), + Timestamp: 1000, + DONID: 10, + DONConfigVersion: 2, + WorkflowID: hex.EncodeToString(commontest.RandomBytes(32)), + WorkflowName: hex.EncodeToString(commontest.RandomBytes(10)), + WorkflowOwner: hex.EncodeToString(commontest.RandomBytes(20)), + ReportID: hex.EncodeToString(commontest.RandomBytes(2)), + } + encoded, err := rm.Encode() + require.NoError(t, err) + + reqMeta := capabilities.RequestMetadata{ + WorkflowID: rm.WorkflowID, + WorkflowOwner: rm.WorkflowOwner, + WorkflowName: rm.WorkflowName, + WorkflowDonID: rm.DONID, + WorkflowDonConfigVersion: rm.DONConfigVersion, + WorkflowExecutionID: rm.ExecutionID, + } + req := &stellarcap.WriteReportRequest{ + ContractId: testReceiverAddress, + Report: &workflowpb.ReportResponse{ + RawReport: encoded, + ReportContext: make([]byte, 96), + Sigs: wrTestSigs(), + }, + } + return rm, reqMeta, req +} + +func wrTestSigs() []*workflowpb.AttributedSignature { + sig := make([]byte, ocrSignatureLen) + sig[0] = 0xAB + return []*workflowpb.AttributedSignature{{Signature: sig}, {Signature: sig}} +} + +// ─── XDR helpers ───────────────────────────────────────────────────────────── + +// buildTransmissionInfoXDR returns a base64-encoded XDR ScVal{scvU32=state}. +// This is what the Stellar forwarder returns from get_transmission_info. +// We use the stellar SDK's MarshalBase64 so the encoding is verifiable by SafeUnmarshalBase64. +func buildTransmissionInfoXDR(t *testing.T, state TransmissionState) string { + t.Helper() + v := xdr.Uint32(state) + sv := xdr.ScVal{ + Type: xdr.ScValTypeScvU32, + U32: &v, + } + b64, err := xdr.MarshalBase64(sv) + require.NoError(t, err, "XDR encode transmission state") + return b64 +} + +// Convenience wrappers for common transmission states. +func notAttemptedXDR(t *testing.T) string { + return buildTransmissionInfoXDR(t, TransmissionStateNotAttempted) +} + +func succeededXDR(t *testing.T) string { + return buildTransmissionInfoXDR(t, TransmissionStateSucceeded) +} + +func invalidReceiverXDR(t *testing.T) string { + return buildTransmissionInfoXDR(t, TransmissionStateInvalidReceiver) +} + +func failedXDR(t *testing.T) string { + return buildTransmissionInfoXDR(t, TransmissionStateFailed) +} + +func transmissionResp(xdrResult string) stellartypes.SimulateTransactionResponse { + return stellartypes.SimulateTransactionResponse{Success: true, ReturnValueXDR: xdrResult, LedgerSequence: 100} +} + +func successSubmitResp() *stellartypes.SubmitTransactionResponse { + fee := testFee + ts := testBlockTimestamp + return &stellartypes.SubmitTransactionResponse{ + TxStatus: stellartypes.TxSuccess, + TxHash: testTxHash, + TransactionFee: &fee, + BlockTimestamp: &ts, + } +} + +func reportProcessedEventsForFixture(t *testing.T, rm ocrtypes.Metadata, receiver string, success bool) stellartypes.GetEventsResponse { + t.Helper() + execID, err := hex.DecodeString(rm.ExecutionID) + require.NoError(t, err) + require.Len(t, execID, 32) + reportID, err := hex.DecodeString(rm.ReportID) + require.NoError(t, err) + require.Len(t, reportID, 2) + + eventName := reportProcessedTopicPrefix + receiverVal, err := contractAddressToScVal(receiver) + require.NoError(t, err) + + return stellartypes.GetEventsResponse{ + Events: []stellartypes.EventInfo{{ + Ledger: 100, + TransactionHash: testTxHash, + Topics: []stellartypes.ScVal{ + {Type: stellartypes.ScValTypeSymbol, Symbol: &eventName}, + receiverVal, + {Type: stellartypes.ScValTypeBytes, Bytes: execID}, + {Type: stellartypes.ScValTypeBytes, Bytes: reportID}, + }, + Value: stellartypes.ScVal{Type: stellartypes.ScValTypeBool, Bool: &success}, + }}, + } +} + +func (h *writeReportHelper) expectObservedTxHashLookup(t *testing.T, rm ocrtypes.Metadata, receiver string, eventSuccess bool) { + t.Helper() + h.svc.EXPECT().GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{Sequence: 200}, nil).Once() + h.svc.EXPECT().GetEvents(mock.Anything, mock.Anything). + Return(reportProcessedEventsForFixture(t, rm, receiver, eventSuccess), nil).Once() + h.svc.EXPECT().GetTransaction(mock.Anything, stellartypes.GetTransactionRequest{TxHash: testTxHash}). + Return(stellartypes.GetTransactionResponse{ + FeeStroops: testFee, + LedgerSequence: 100, + LedgerCloseTime: int64(testBlockTimestamp / 1_000_000), + }, nil).Once() +} + +func requireReplyBlockTimestamp(t *testing.T, reply *stellarcap.WriteReportReply, expected uint64) { + t.Helper() + require.NotNil(t, reply.BlockTimestamp) + require.Equal(t, expected, *reply.BlockTimestamp) +} + +// ─── metering assertion ─────────────────────────────────────────────────────── + +func validateWRMetering(t *testing.T, meta capabilities.ResponseMetadata, chainSelector uint64, expectedStroops uint64) { + t.Helper() + require.Len(t, meta.Metering, 1) + m := meta.Metering[0] + require.Equal(t, fmt.Sprintf(metering.WriteReportSpendUnitFormat, chainSelector), m.SpendUnit) + require.Equal(t, fmt.Sprintf("%d", expectedStroops), m.SpendValue) + require.Empty(t, m.Peer2PeerID) +} + +// ─── Validation tests ───────────────────────────────────────────────────────── + +func TestWriteReport_Validation(t *testing.T) { + t.Parallel() + + t.Run("nil request", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, err := h.stellar.WriteReport(t.Context(), capabilities.RequestMetadata{}, nil) + require.NotNil(t, err) + require.Contains(t, err.Error(), "nil WriteReportRequest") + }) + + t.Run("nil report", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, err := h.stellar.WriteReport(t.Context(), capabilities.RequestMetadata{}, + &stellarcap.WriteReportRequest{}) + require.NotNil(t, err) + require.Contains(t, err.Error(), "nil SignedReport") + }) + + t.Run("empty contract_id", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, err := h.stellar.WriteReport(t.Context(), capabilities.RequestMetadata{}, + &stellarcap.WriteReportRequest{Report: &workflowpb.ReportResponse{Sigs: wrTestSigs()}}) + require.NotNil(t, err) + require.Contains(t, err.Error(), "contractId is required") + }) + + t.Run("invalid contract_id (G… account, not C… contract)", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, err := h.stellar.WriteReport(t.Context(), capabilities.RequestMetadata{}, + &stellarcap.WriteReportRequest{ + ContractId: testNodeAddress, // G… key — not a contract + Report: &workflowpb.ReportResponse{Sigs: wrTestSigs()}, + }) + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid receiver contract address") + }) + + t.Run("no signatures", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + req.Report.Sigs = nil + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "signed report must contain at least one signature") + }) + + t.Run("invalid signature length", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + req.Report.Sigs = []*workflowpb.AttributedSignature{{Signature: make([]byte, 32)}} + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "signature 0 has invalid length") + require.Contains(t, err.Error(), "want 65") + }) + + t.Run("report metadata cannot be decoded", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, _ := newWRReportFixture(t) + req := &stellarcap.WriteReportRequest{ + ContractId: testReceiverAddress, + Report: &workflowpb.ReportResponse{RawReport: []byte("garbage"), Sigs: wrTestSigs()}, + } + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "failed to decode report metadata") + }) + + t.Run("WorkflowExecutionID mismatch", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + reqMeta.WorkflowExecutionID = hex.EncodeToString(commontest.RandomBytes(32)) + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "workflowExecutionID does not match") + }) + + t.Run("WorkflowOwner mismatch", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + reqMeta.WorkflowOwner = hex.EncodeToString(commontest.RandomBytes(20)) + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "workflowOwner does not match") + }) + + t.Run("WorkflowName mismatch", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + reqMeta.WorkflowName = "totally-different-name" + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "workflowName does not match") + }) + + t.Run("WorkflowID mismatch", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + reqMeta.WorkflowID = hex.EncodeToString(commontest.RandomBytes(32)) + + _, err := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "workflowID does not match") + }) + + t.Run("report size exceeds limit", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + rm, reqMeta, _ := newWRReportFixture(t) + encoded, err2 := rm.Encode() + require.NoError(t, err2) + + // Pre-submit poll: NotAttempted so code reaches the size check. + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + + req := &stellarcap.WriteReportRequest{ + ContractId: testReceiverAddress, + Report: &workflowpb.ReportResponse{ + RawReport: append(encoded, make([]byte, 20_000)...), + ReportContext: make([]byte, 96), + Sigs: wrTestSigs(), + }, + } + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.Contains(t, capErr.Error(), "report size exceeds limit") + }) +} + +// ─── Early-return (observed-state before submit) tests ─────────────────────── + +func TestWriteReport_EarlyReturn(t *testing.T) { + t.Parallel() + + t.Run("already succeeded - returns success with no submit and no metering", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + rm, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + h.expectObservedTxHashLookup(t, rm, req.ContractId, true) + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_SUCCESS, result.Response.TxStatus) + rcSuccess := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS + require.Equal(t, &rcSuccess, result.Response.ReceiverContractExecutionStatus) + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + require.NotNil(t, result.Response.TransactionFee) + require.Equal(t, testFee, *result.Response.TransactionFee) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + // No billing metering: this node observed, not submitted. + require.Empty(t, result.ResponseMetadata.Metering) + h.svc.AssertNotCalled(t, "SubmitTransaction", mock.Anything, mock.Anything) + }) + + t.Run("already failed - InvalidReceiver - terminal error message, no submit", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + rm, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(invalidReceiverXDR(t)), nil).Once() + h.expectObservedTxHashLookup(t, rm, req.ContractId, false) + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, result.Response.TxStatus) + rcReverted := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_REVERTED + require.Equal(t, &rcReverted, result.Response.ReceiverContractExecutionStatus) + require.NotNil(t, result.Response.ErrorMessage) + require.Contains(t, *result.Response.ErrorMessage, "not a Wasm contract or missing on_report") + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + require.Empty(t, result.ResponseMetadata.Metering) + h.svc.AssertNotCalled(t, "SubmitTransaction", mock.Anything, mock.Anything) + }) + + t.Run("already failed - receiver revert - error message, no submit", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + rm, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(failedXDR(t)), nil).Once() + h.expectObservedTxHashLookup(t, rm, req.ContractId, false) + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, result.Response.TxStatus) + require.NotNil(t, result.Response.ErrorMessage) + require.Contains(t, *result.Response.ErrorMessage, "receiver contract execution failed") + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + require.Empty(t, result.ResponseMetadata.Metering) + h.svc.AssertNotCalled(t, "SubmitTransaction", mock.Anything, mock.Anything) + }) +} + +// ─── Happy-path and submit tests ───────────────────────────────────────────── + +func TestWriteReport_HappyPath(t *testing.T) { + t.Parallel() + + t.Run("fresh submit succeeds - reply contains hash, fee and metering", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + // Call 1: pre-submit get_transmission_info → NotAttempted + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + // TXM submit → success with fee + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + // Call 3: post-submit get_transmission_info → Succeeded + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_SUCCESS, result.Response.TxStatus) + rcSuccess := stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS + require.Equal(t, &rcSuccess, result.Response.ReceiverContractExecutionStatus) + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + require.NotNil(t, result.Response.TransactionFee) + require.Equal(t, testFee, *result.Response.TransactionFee) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + // Billing metering is populated because this node submitted. + validateWRMetering(t, result.ResponseMetadata, testWRChainSelector, testFee) + }) +} + +// ─── Submit-path error and edge-case tests ──────────────────────────────────── + +func TestWriteReport_Submit(t *testing.T) { + t.Parallel() + + t.Run("SubmitTransaction RPC fails - error propagated", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(nil, errors.New("TXM: context deadline exceeded")).Once() + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.Contains(t, capErr.Error(), "failed to submit forwarder report transaction") + }) + + t.Run("post-submit poll times out - falls back to TXM reply", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + // Post-submit poll always returns NotAttempted → times out → fallback. + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil) + + ctx, cancel := context.WithDeadline(t.Context(), time.Now().Add(400*time.Millisecond)) + defer cancel() + + result, capErr := h.stellar.WriteReport(ctx, reqMeta, req) + require.Nil(t, capErr) + // Reply comes from the TXM submit response. + require.Equal(t, stellarcap.TxStatus_TX_STATUS_SUCCESS, result.Response.TxStatus) + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + require.NotNil(t, result.Response.TransactionFee) + require.Equal(t, testFee, *result.Response.TransactionFee) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + validateWRMetering(t, result.ResponseMetadata, testWRChainSelector, testFee) + }) + + t.Run("post-submit shows InvalidReceiver - reply with error message and hash from submit", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(invalidReceiverXDR(t)), nil).Once() + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, result.Response.TxStatus) + // hash and fee come from submit response via populateReplyFromSubmit. + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + require.NotNil(t, result.Response.TransactionFee) + require.Equal(t, testFee, *result.Response.TransactionFee) + requireReplyBlockTimestamp(t, result.Response, testBlockTimestamp) + require.NotNil(t, result.Response.ErrorMessage) + require.Contains(t, *result.Response.ErrorMessage, "not a Wasm contract or missing on_report") + // This node spent gas → billing metering is populated. + validateWRMetering(t, result.ResponseMetadata, testWRChainSelector, testFee) + }) + + t.Run("post-submit shows Failed - receiver revert - error message and hash from submit", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(failedXDR(t)), nil).Once() + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, result.Response.TxStatus) + require.NotNil(t, result.Response.TxHash) + require.Equal(t, testTxHash, *result.Response.TxHash) + require.NotNil(t, result.Response.ErrorMessage) + require.Contains(t, *result.Response.ErrorMessage, "receiver contract execution failed") + validateWRMetering(t, result.ResponseMetadata, testWRChainSelector, testFee) + }) + + t.Run("own TxFailed - on-chain error string appears in ErrorMessage", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + fee := uint64(0) + failedResp := &stellartypes.SubmitTransactionResponse{ + TxStatus: stellartypes.TxFailed, + TxHash: testTxHash, + Error: "transaction result: InvokeHostFunctionTrapped", + TransactionFee: &fee, + } + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(failedResp, nil).Once() + // Post-submit poll stays NotAttempted → context deadline triggers fallback. + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil) + + ctx, cancel := context.WithDeadline(t.Context(), time.Now().Add(400*time.Millisecond)) + defer cancel() + + result, capErr := h.stellar.WriteReport(ctx, reqMeta, req) + require.Nil(t, capErr) + // replyFromOwnTransaction maps TxFailed to REVERTED. + require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, result.Response.TxStatus) + require.NotNil(t, result.Response.ErrorMessage) + require.Contains(t, *result.Response.ErrorMessage, "on-chain transaction failed") + require.Contains(t, *result.Response.ErrorMessage, "InvokeHostFunctionTrapped") + }) + + t.Run("submit superseded by prior success - post-submit succeeds", func(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + fee := uint64(0) + // Our node's submission "fails" (another node already succeeded), but + // the post-submit poll shows Succeeded. + myResp := &stellartypes.SubmitTransactionResponse{ + TxStatus: stellartypes.TxFailed, + TxHash: "mytx", + Error: "Already processed", + TransactionFee: &fee, + } + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(myResp, nil).Once() + // Post-submit: another node already succeeded. + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + + result, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_SUCCESS, result.Response.TxStatus) + }) +} + +// ─── parseTransmissionInfo unit tests ──────────────────────────────────────── + +func TestParseTransmissionInfo(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + state TransmissionState + wantSuccess bool + wantInvalidRx bool + }{ + {"NotAttempted", TransmissionStateNotAttempted, false, false}, + {"Succeeded", TransmissionStateSucceeded, true, false}, + {"InvalidReceiver", TransmissionStateInvalidReceiver, false, true}, + {"Failed", TransmissionStateFailed, false, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + xdrResult := buildTransmissionInfoXDR(t, tc.state) + info, err := parseTransmissionInfo(xdrResult, 42) + require.NoError(t, err) + require.Equal(t, tc.state, info.State) + require.Equal(t, tc.wantSuccess, info.Success) + require.Equal(t, tc.wantInvalidRx, info.InvalidReceiver) + require.Equal(t, uint32(42), info.LedgerSequence) + }) + } + + t.Run("invalid base64 returns error", func(t *testing.T) { + t.Parallel() + _, err := parseTransmissionInfo("not-valid-xdr!!!", 0) + require.Error(t, err) + require.Contains(t, err.Error(), "decode transmission info result XDR") + }) +} + +// ─── Transmission ID tests ──────────────────────────────────────────────────── + +func TestGetTransmissionID_Determinism(t *testing.T) { + t.Parallel() + + _, _, req := newWRReportFixture(t) + execID := hex.EncodeToString(commontest.RandomBytes(32)) + + id1, _, _, err1 := getTransmissionID(execID, req) + id2, _, _, err2 := getTransmissionID(execID, req) + require.NoError(t, err1) + require.NoError(t, err2) + require.Equal(t, id1, id2, "transmission ID must be deterministic for identical inputs") +} + +func TestGetTransmissionID_DifferentInputsDifferentIDs(t *testing.T) { + t.Parallel() + + _, _, req1 := newWRReportFixture(t) + _, _, req2 := newWRReportFixture(t) + execID := hex.EncodeToString(commontest.RandomBytes(32)) + + id1, _, _, err1 := getTransmissionID(execID, req1) + id2, _, _, err2 := getTransmissionID(execID, req2) + require.NoError(t, err1) + require.NoError(t, err2) + require.NotEqual(t, id1, id2, "different receivers must produce different transmission IDs") +} + +func TestGetTransmissionID_InvalidReceiver(t *testing.T) { + t.Parallel() + + // Build a fixture so RawReport and execID are valid; only ContractId is wrong. + rm, reqMeta, req := newWRReportFixture(t) + _ = rm + req.ContractId = testNodeAddress // G… StrKey — not a contract address + + _, _, _, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid receiver contract address") +} + +func TestWriteReport_NoSigningAccount(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().GetSigningAccount(mock.Anything). + Return(stellartypes.GetSigningAccountResponse{}, errors.New("keystore has no accounts")).Once() + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.Contains(t, capErr.Error(), "failed to resolve signing account") +} + +func TestWriteReport_UnsupportedReportMetadataVersion(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + rm, reqMeta, req := newWRReportFixture(t) + rm.Version = 2 + encoded, err := rm.Encode() + require.NoError(t, err) + req.Report.RawReport = encoded + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.Contains(t, capErr.Error(), "unsupported report metadata version") +} + +func TestGetTransmissionInfo(t *testing.T) { + t.Parallel() + + newWR := func(t *testing.T) (*writeReportHelper, CREForwarderClient, [32]byte, [2]byte) { + t.Helper() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + _, workflowExecutionID, reportID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.NoError(t, err) + return h, h.stellar.forwarderClient, workflowExecutionID, reportID + } + + t.Run("empty result treated as not attempted", func(t *testing.T) { + t.Parallel() + h, fc, workflowExecutionID, reportID := newWR(t) + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(stellartypes.SimulateTransactionResponse{}, nil).Once() + + info, err := fc.GetTransmissionInfo(t.Context(), testReceiverAddress, workflowExecutionID, reportID) + require.NoError(t, err) + require.Equal(t, TransmissionStateNotAttempted, info.State) + }) + + t.Run("forwarder simulation error is propagated", func(t *testing.T) { + t.Parallel() + h, fc, workflowExecutionID, reportID := newWR(t) + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(stellartypes.SimulateTransactionResponse{Error: "contract trap"}, nil).Once() + + _, err := fc.GetTransmissionInfo(t.Context(), testReceiverAddress, workflowExecutionID, reportID) + require.Error(t, err) + require.Contains(t, err.Error(), "forwarder simulation failed") + }) +} + +func newPollTransmissionInfoHarness(t *testing.T, deltaStage time.Duration) ( + *writeReport, + *mocks.StellarService, + string, + [32]byte, + [2]byte, +) { + t.Helper() + lggr := logger.Test(t) + mockSvc := mocks.NewStellarService(t) + myPeerID := p2ptypes.PeerID{1} + scheduler := ts.NewTransmissionScheduler( + myPeerID, + []p2ptypes.PeerID{{1}, {2}, {3}}, + deltaStage, + 0, + lggr, + ) + wr := &writeReport{ + service: mockSvc, + forwarderClient: newForwarderClient(mockSvc, lggr, testForwarderAddress, 100), + lggr: logger.Sugared(lggr), + transmissionScheduler: scheduler, + } + _, reqMeta, req := newWRReportFixture(t) + _, workflowExecutionID, reportID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.NoError(t, err) + return wr, mockSvc, req.ContractId, workflowExecutionID, reportID +} + +func expectTransmissionInfoPoll(mockSvc *mocks.StellarService, xdrResult string, err error) { + mockSvc.EXPECT(). + SimulateTransaction(mock.Anything, mock.MatchedBy(func(req stellartypes.SimulateTransactionRequest) bool { + return req.Function == forwarderGetTransmissionInfoFunction + })). + Return(transmissionResp(xdrResult), err). + Once() +} + +func expectTransmissionInfoPollMaybe(mockSvc *mocks.StellarService, xdrResult string, err error) { + mockSvc.EXPECT(). + SimulateTransaction(mock.Anything, mock.MatchedBy(func(req stellartypes.SimulateTransactionRequest) bool { + return req.Function == forwarderGetTransmissionInfoFunction + })). + Return(transmissionResp(xdrResult), err). + Maybe() +} + +func TestPollTransmissionInfo_QueuePositionScenarios(t *testing.T) { + t.Parallel() + ctx := t.Context() + + t.Run("terminal states return immediately without waiting for delta stage", func(t *testing.T) { + cases := []struct { + name string + state TransmissionState + xdr func(t *testing.T) string + }{ + {"succeeded", TransmissionStateSucceeded, succeededXDR}, + {"invalid receiver", TransmissionStateInvalidReceiver, invalidReceiverXDR}, + {"failed", TransmissionStateFailed, failedXDR}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + for _, queuePosition := range []int{1, 2, 3} { + t.Run("queue position "+strconv.Itoa(queuePosition), func(t *testing.T) { + t.Parallel() + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 5*time.Second) + expectTransmissionInfoPoll(mockSvc, tc.xdr(t), nil) + + start := time.Now() + info, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, queuePosition) + require.NoError(t, err) + require.Equal(t, tc.state, info.State) + require.Less(t, time.Since(start), 500*time.Millisecond) + }) + } + }) + } + }) + + t.Run("not attempted waits until delta stage then returns", func(t *testing.T) { + t.Parallel() + const queuePosition = 2 + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 150*time.Millisecond) + expectTransmissionInfoPollMaybe(mockSvc, notAttemptedXDR(t), nil) + + start := time.Now() + info, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, queuePosition) + require.NoError(t, err) + require.Equal(t, TransmissionStateNotAttempted, info.State) + require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond) + }) + + t.Run("position zero uses quick retry", func(t *testing.T) { + t.Parallel() + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 5*time.Second) + expectTransmissionInfoPoll(mockSvc, succeededXDR(t), nil) + + info, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, 0) + require.NoError(t, err) + require.Equal(t, TransmissionStateSucceeded, info.State) + }) +} + +func TestPollTransmissionInfo_RaceConditions(t *testing.T) { + t.Parallel() + + t.Run("timer boundary read catches late success", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 150*time.Millisecond) + var chainStateUpdated atomic.Bool + go func() { + time.Sleep(120 * time.Millisecond) + chainStateUpdated.Store(true) + }() + + mockSvc.EXPECT(). + SimulateTransaction(mock.Anything, mock.MatchedBy(func(req stellartypes.SimulateTransactionRequest) bool { + return req.Function == forwarderGetTransmissionInfoFunction + })). + RunAndReturn(func(context.Context, stellartypes.SimulateTransactionRequest) (stellartypes.SimulateTransactionResponse, error) { + if chainStateUpdated.Load() { + return transmissionResp(succeededXDR(t)), nil + } + return transmissionResp(notAttemptedXDR(t)), nil + }). + Maybe() + + info, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, 1) + require.NoError(t, err) + require.True(t, chainStateUpdated.Load()) + require.Equal(t, TransmissionStateSucceeded, info.State) + }) + + t.Run("all rpc errors including boundary read return error", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 50*time.Millisecond) + var rpcCalls atomic.Int64 + mockSvc.EXPECT(). + SimulateTransaction(mock.Anything, mock.MatchedBy(func(req stellartypes.SimulateTransactionRequest) bool { + return req.Function == forwarderGetTransmissionInfoFunction + })). + RunAndReturn(func(context.Context, stellartypes.SimulateTransactionRequest) (stellartypes.SimulateTransactionResponse, error) { + rpcCalls.Add(1) + return stellartypes.SimulateTransactionResponse{}, errors.New("rpc unavailable") + }). + Maybe() + + _, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, 2) + require.Greater(t, rpcCalls.Load(), int64(0)) + require.Error(t, err) + require.Contains(t, err.Error(), "all GetTransmissionInfo polls failed during delta stage window") + }) + + t.Run("context cancel returns timeout error", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(t.Context()) + wr, mockSvc, receiver, workflowExecutionID, reportID := newPollTransmissionInfoHarness(t, 5*time.Second) + expectTransmissionInfoPollMaybe(mockSvc, notAttemptedXDR(t), nil) + + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + _, err := wr.pollTransmissionInfo(ctx, receiver, workflowExecutionID, reportID, 2) + require.Error(t, err) + require.Contains(t, err.Error(), "timed out waiting for transmission info") + }) +} + +func TestParseFieldsIntoTransmissionInfo(t *testing.T) { + t.Parallel() + + t.Run("vec with state and transmitter", func(t *testing.T) { + t.Parallel() + state := xdr.Uint32(TransmissionStateSucceeded) + stateVal := xdr.ScVal{Type: xdr.ScValTypeScvU32, U32: &state} + accountID := xdr.MustAddress(testNodeAddress) + txrVal := xdr.ScVal{ + Type: xdr.ScValTypeScvAddress, + Address: &xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeAccount, + AccountId: &accountID, + }, + } + vec := xdr.ScVec{stateVal, txrVal} + vecPtr := &vec + sv := xdr.ScVal{Type: xdr.ScValTypeScvVec, Vec: &vecPtr} + + info := TransmissionInfo{} + parseFieldsIntoTransmissionInfo(&info, sv) + require.Equal(t, TransmissionStateSucceeded, info.State) + require.Equal(t, testNodeAddress, info.Transmitter) + }) + + t.Run("map with state transmitter and flags", func(t *testing.T) { + t.Parallel() + state := xdr.Uint32(TransmissionStateInvalidReceiver) + stateVal := xdr.ScVal{Type: xdr.ScValTypeScvU32, U32: &state} + stateKey := xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: func() *xdr.ScSymbol { s := xdr.ScSymbol("state"); return &s }()} + invalid := true + invalidVal := xdr.ScVal{Type: xdr.ScValTypeScvBool, B: &invalid} + invalidKey := xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: func() *xdr.ScSymbol { s := xdr.ScSymbol("invalid_receiver"); return &s }()} + scMap := xdr.ScMap{ + {Key: stateKey, Val: stateVal}, + {Key: invalidKey, Val: invalidVal}, + } + mapPtr := &scMap + sv := xdr.ScVal{Type: xdr.ScValTypeScvMap, Map: &mapPtr} + + info := TransmissionInfo{} + parseFieldsIntoTransmissionInfo(&info, sv) + require.Equal(t, TransmissionStateInvalidReceiver, info.State) + require.True(t, info.InvalidReceiver) + }) +} + +func TestXDRExtractHelpers(t *testing.T) { + t.Parallel() + + t.Run("extractStringish symbol and string", func(t *testing.T) { + t.Parallel() + sym := xdr.ScSymbol("state") + symVal := xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &sym} + out, ok := extractStringish(symVal) + require.True(t, ok) + require.Equal(t, "state", out) + + str := xdr.ScString("transmitter") + strVal := xdr.ScVal{Type: xdr.ScValTypeScvString, Str: &str} + out, ok = extractStringish(strVal) + require.True(t, ok) + require.Equal(t, "transmitter", out) + }) + + t.Run("extractAddressString account and contract", func(t *testing.T) { + t.Parallel() + accountID := xdr.MustAddress(testNodeAddress) + accountVal := xdr.ScVal{ + Type: xdr.ScValTypeScvAddress, + Address: &xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeAccount, + AccountId: &accountID, + }, + } + out, ok := extractAddressString(accountVal) + require.True(t, ok) + require.Equal(t, testNodeAddress, out) + + contractBytes, err := strkey.Decode(strkey.VersionByteContract, testForwarderAddress) + require.NoError(t, err) + var contractID xdr.ContractId + copy(contractID[:], contractBytes) + contractVal := xdr.ScVal{ + Type: xdr.ScValTypeScvAddress, + Address: &xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeContract, + ContractId: &contractID, + }, + } + out, ok = extractAddressString(contractVal) + require.True(t, ok) + require.Equal(t, testForwarderAddress, out) + }) + + t.Run("extractBool", func(t *testing.T) { + t.Parallel() + b := true + val := xdr.ScVal{Type: xdr.ScValTypeScvBool, B: &b} + require.NotNil(t, extractBool(val)) + require.True(t, *extractBool(val)) + }) +} + +func TestReplyFromOwnTransaction(t *testing.T) { + t.Parallel() + wr := &writeReport{lggr: logger.Sugared(logger.Test(t))} + + t.Run("nil response is fatal", func(t *testing.T) { + t.Parallel() + reply := wr.replyFromOwnTransaction(nil) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_FATAL, reply.TxStatus) + }) + + t.Run("tx fatal maps to fatal status", func(t *testing.T) { + t.Parallel() + reply := wr.replyFromOwnTransaction(&stellartypes.SubmitTransactionResponse{ + TxStatus: stellartypes.TxFatal, + TxHash: testTxHash, + }) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_FATAL, reply.TxStatus) + require.NotNil(t, reply.TxHash) + }) +} + +func TestBuildForwarderReportArgs_InvalidTransmitter(t *testing.T) { + t.Parallel() + _, _, req := newWRReportFixture(t) + _, err := buildForwarderReportArgs("not-a-valid-address", testReceiverAddress, req.Report) + require.Error(t, err) + require.Contains(t, err.Error(), "transmitter") +} + +func TestWriteReport_TxFatalSubmitFallback(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + _, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(&stellartypes.SubmitTransactionResponse{ + TxStatus: stellartypes.TxFatal, + TxHash: testTxHash, + }, nil).Once() + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil) + + ctx, cancel := context.WithDeadline(t.Context(), time.Now().Add(400*time.Millisecond)) + defer cancel() + + result, capErr := h.stellar.WriteReport(ctx, reqMeta, req) + require.Nil(t, capErr) + require.Equal(t, stellarcap.TxStatus_TX_STATUS_FATAL, result.Response.TxStatus) + require.NotNil(t, result.Response.TxHash) +} diff --git a/chain_capabilities/stellar/config/config.go b/chain_capabilities/stellar/config/config.go index 514e78684..fd55af917 100644 --- a/chain_capabilities/stellar/config/config.go +++ b/chain_capabilities/stellar/config/config.go @@ -1,25 +1,51 @@ package config import ( + "encoding/json" + "fmt" "time" + + "github.com/stellar/go-stellar-sdk/strkey" ) -// Config is the Stellar capability configuration. -// -// Stellar consensus reads are the only functionality wired today, so write/forwarder related -// fields present in other chain capabilities are intentionally omitted. type Config struct { - // Network is the relayer network identifier, e.g. "stellar". - Network string `json:"network"` - // ChainID is the Stellar network id (network passphrase hash) used to resolve the chain selector. - ChainID string `json:"chainId"` - // IsLocal runs against a local network (local CRE runs only); chain selector resolution is skipped. - IsLocal bool `json:"isLocal,omitempty"` + CREForwarderAddress string `json:"creForwarderAddress"` + // ForwarderLookbackLedgers is how many ledgers back to search for ReportProcessed events (default 100). + ForwarderLookbackLedgers int64 `json:"forwarderLookbackLedgers"` + DeltaStage time.Duration `json:"deltaStage"` + Network string `json:"network"` + ChainID string `json:"chainId"` + IsLocal bool `json:"isLocal,omitempty"` + ObservationPollerWorkersCount uint `json:"observationPollerWorkersCount"` + ObservationPollPeriod time.Duration `json:"observationPollPeriod"` + UnknownRequestsTTL time.Duration `json:"unknownRequestsTTL"` +} + +func (c *Config) UnmarshalJSON(bs []byte) error { + type config Config + var cfg config + if err := json.Unmarshal(bs, &cfg); err != nil { + return err + } + if cfg.Network == "" { + return fmt.Errorf("network is required") + } + if cfg.ChainID == "" { + return fmt.Errorf("chainId is required") + } + if cfg.CREForwarderAddress == "" { + return fmt.Errorf("creForwarderAddress is required") + } + if err := validateContractStrKey(cfg.CREForwarderAddress); err != nil { + return fmt.Errorf("creForwarderAddress: %w", err) + } + *c = Config(cfg) + return nil +} - // ObservationPollerWorkersCount is the number of concurrent observation pollers. - ObservationPollerWorkersCount uint `json:"observationPollerWorkersCount"` - // ObservationPollPeriod is how often a volatile request re-observes the chain. - ObservationPollPeriod time.Duration `json:"observationPollPeriod"` - // UnknownRequestsTTL is how long results for not-yet-tracked requests are cached. - UnknownRequestsTTL time.Duration `json:"unknownRequestsTTL"` +func validateContractStrKey(address string) error { + if _, err := strkey.Decode(strkey.VersionByteContract, address); err != nil { + return fmt.Errorf("invalid contract address %q: %w", address, err) + } + return nil } diff --git a/chain_capabilities/stellar/config/config_test.go b/chain_capabilities/stellar/config/config_test.go new file mode 100644 index 000000000..6ae0860a2 --- /dev/null +++ b/chain_capabilities/stellar/config/config_test.go @@ -0,0 +1,92 @@ +package config + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfig_UnmarshalJSON(t *testing.T) { + t.Parallel() + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + input := `{ + "chainId":"stellar-testnet", + "network":"stellar", + "creForwarderAddress":"CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + "deltaStage":1000000000, + "observationPollerWorkersCount":17, + "observationPollPeriod":2000000000, + "unknownRequestsTTL":4000000000, + "isLocal":true + }` + + var cfg Config + require.NoError(t, json.Unmarshal([]byte(input), &cfg)) + + assert.Equal(t, "stellar-testnet", cfg.ChainID) + assert.Equal(t, "stellar", cfg.Network) + assert.Equal(t, "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", cfg.CREForwarderAddress) + assert.EqualValues(t, 17, cfg.ObservationPollerWorkersCount) + assert.Equal(t, time.Second, cfg.DeltaStage) + assert.Equal(t, 2*time.Second, cfg.ObservationPollPeriod) + assert.Equal(t, 4*time.Second, cfg.UnknownRequestsTTL) + assert.True(t, cfg.IsLocal) + }) + + t.Run("missing network", func(t *testing.T) { + t.Parallel() + input := `{"chainId":"stellar-testnet"}` + + var cfg Config + err := json.Unmarshal([]byte(input), &cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "network is required") + }) + + t.Run("missing chainId", func(t *testing.T) { + t.Parallel() + input := `{"network":"stellar"}` + + var cfg Config + err := json.Unmarshal([]byte(input), &cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "chainId is required") + }) + + t.Run("missing creForwarderAddress", func(t *testing.T) { + t.Parallel() + input := `{"network":"stellar","chainId":"stellar-testnet"}` + + var cfg Config + err := json.Unmarshal([]byte(input), &cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "creForwarderAddress is required") + }) + + t.Run("invalid creForwarderAddress", func(t *testing.T) { + t.Parallel() + input := `{ + "network":"stellar", + "chainId":"stellar-testnet", + "creForwarderAddress":"GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7" + }` + + var cfg Config + err := json.Unmarshal([]byte(input), &cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "creForwarderAddress") + require.Contains(t, err.Error(), "invalid contract address") + }) + + t.Run("invalid json", func(t *testing.T) { + t.Parallel() + var cfg Config + err := json.Unmarshal([]byte("{"), &cfg) + require.Error(t, err) + }) +} diff --git a/chain_capabilities/stellar/go.mod b/chain_capabilities/stellar/go.mod index 6ef043de8..076c7b67a 100644 --- a/chain_capabilities/stellar/go.mod +++ b/chain_capabilities/stellar/go.mod @@ -4,10 +4,14 @@ go 1.26.2 require ( github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb - github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610122030-760451629cfd + github.com/smartcontractkit/chainlink-common v0.11.2-0.20260628120702-23aa6e0ed0bf + github.com/stellar/go-stellar-sdk v0.5.0 ) -require github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b // indirect +require ( + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b // indirect + github.com/stellar/go-xdr v0.0.0-20260312225820-cc2b0611aabf // indirect +) require ( github.com/Masterminds/semver/v3 v3.4.0 // indirect @@ -77,7 +81,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.100 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260609153034-c8423a41ef9a // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect diff --git a/chain_capabilities/stellar/go.sum b/chain_capabilities/stellar/go.sum index d0d49c824..20aee57b8 100644 --- a/chain_capabilities/stellar/go.sum +++ b/chain_capabilities/stellar/go.sum @@ -38,6 +38,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= @@ -172,8 +174,14 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E= github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= +github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= @@ -208,8 +216,8 @@ github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb/go.mod h1:LS7F8U2YZNc0Vt8f6SVWUUigGLxdxZMpyC7VCcUTagg= github.com/smartcontractkit/chain-selectors v1.0.100 h1:wpiSpmI/eFjY+wx/nPr5VuNF4hki0prIBMKEaQWn3g4= github.com/smartcontractkit/chain-selectors v1.0.100/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610122030-760451629cfd h1:kLxp57N/X7eZqnfpgeq2XxrtH23KtYfcH+Jk1sSpZ4w= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610122030-760451629cfd/go.mod h1:GlEVw7ziizXoMfzl1onNSwansrVBLHhj5gUJlGQpb4I= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260628120702-23aa6e0ed0bf h1:BxwY8Qc2wetZAg15PXMuOScPfDZVhM0v7MmpApmL6IY= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260628120702-23aa6e0ed0bf/go.mod h1:wUK7w5xRrFPD2qQfdt1fLXzQzWSb4PaZaxa4nsqCWVs= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 h1:NExKM/D0HneOq/N5LGTbkV4VOa0UHCvfTNEb4GqYpto= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b h1:L1So1EDBDRET3j/TdV1Gjv3qWARoa/NPRaU7k4r30yA= @@ -218,8 +226,8 @@ github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805- github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260609153034-c8423a41ef9a h1:alnfvQgCKPFqsfijZQnr6Sbus2GT5YZ9BT/KzVrbszE= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260609153034-c8423a41ef9a/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b h1:VDgJWDipihV9f7M5+d21d1RzSsg5rEv+iI12oN1VQbo= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= @@ -232,6 +240,10 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12i github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d/go.mod h1:Acy3BTBxou83ooMESLO90s8PKSu7RvLCzwSTbxxfOK0= +github.com/stellar/go-stellar-sdk v0.5.0 h1:xpOO+ZTyvGz54wTm7pwl2Gf1e6lZl0ExrJ/tKb+Roj4= +github.com/stellar/go-stellar-sdk v0.5.0/go.mod h1:tLKAQPxa2I5UvGMabBbUXcY3fmgYnfDudrMeK7CDX4w= +github.com/stellar/go-xdr v0.0.0-20260312225820-cc2b0611aabf h1:GY1RVbX3Hg7poPXEf6yojjP0hyypvgUgZmCqQU9D0xg= +github.com/stellar/go-xdr v0.0.0-20260312225820-cc2b0611aabf/go.mod h1:If+U9Z1W5xU97VrOgJandQT+2dN7/iOpkCrxBJEyF80= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -252,6 +264,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xdrpp/goxdr v0.1.1 h1:E1B2c6E8eYhOVyd7yEpOyopzTPirUeF6mVOfXfGyJyc= +github.com/xdrpp/goxdr v0.1.1/go.mod h1:dXo1scL/l6s7iME1gxHWo2XCppbHEKZS7m/KyYWkNzA= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= @@ -427,6 +441,8 @@ google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/chain_capabilities/stellar/main.go b/chain_capabilities/stellar/main.go index 763a796f5..c8e4ea78f 100644 --- a/chain_capabilities/stellar/main.go +++ b/chain_capabilities/stellar/main.go @@ -11,34 +11,35 @@ import ( chainselectors "github.com/smartcontractkit/chain-selectors" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - stellarcapserver "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar/server" - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/loop" - "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" - "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" - "github.com/smartcontractkit/capabilities/libs/chainconsensus" consMetrics "github.com/smartcontractkit/capabilities/libs/chainconsensus/metrics" "github.com/smartcontractkit/capabilities/libs/chainconsensus/oracle" "github.com/smartcontractkit/capabilities/libs/chainconsensus/poller" "github.com/smartcontractkit/capabilities/libs/loopserver" + ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/actions" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/config" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + stellarcapserver "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" ) -const ( - CapabilityName = "stellar" +const CapabilityName = "stellar" - // defaultObservationPollPeriod is tuned to roughly half of Stellar's 5-7s ledger close time so - // a volatile request makes an observation for (nearly) every ledger during its lifecycle. - defaultObservationPollPeriod = 3 * time.Second - defaultPollerWorkersCount = 10 - defaultUnknownRequestsTTL = 10 * time.Second +const ( + // Default values for optional Stellar consensus/read settings when not provided in config. + defaultObservationPollPeriod = 3 * time.Second + defaultPollerWorkersCount = 10 + defaultUnknownRequestsTTL = 10 * time.Second + defaultForwarderLookbackLedgers = int64(100) ) // capabilityGRPCService is the top-level server wrapping the Stellar capability. @@ -59,6 +60,12 @@ type capability struct { id string } +type closeFunc func() error + +func (f closeFunc) Close() error { + return f() +} + var _ stellarcapserver.ClientCapability = &capabilityGRPCService{} func main() { @@ -94,6 +101,9 @@ func (c *capabilityGRPCService) Close() error { if c.consensusHandler != nil { closers = append(closers, c.consensusHandler) } + if c.Stellar != nil { + closers = append(closers, c.Stellar) + } return services.CloseAll(closers...) } @@ -131,14 +141,16 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor if err != nil { return fmt.Errorf("failed to get stellar service: %w", err) } + if _, err = stellarService.GetSigningAccount(ctx); err != nil { + return fmt.Errorf("stellar relayer has no signing account: %w", err) + } if err = c.setSelector(cfg); err != nil { return err } - c.id = "stellar:ChainSelector:" + strconv.FormatUint(c.chainSelector, 10) + "@1.0.0" + c.id = CapabilityName + ":ChainSelector:" + strconv.FormatUint(c.chainSelector, 10) + "@1.0.0" var chainInfo types.ChainInfo - // protection for e2e tests when we run against a local network if !cfg.IsLocal { chainInfo, err = relayer.GetChainInfo(ctx) if err != nil { @@ -146,7 +158,19 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor } } - var toStart []interface{ Start(context.Context) error } + var scheduler ts.TransmissionScheduler + if cfg.DeltaStage > 0 { + myDON, err := ts.InitMyDON(ctx, dependencies.CapabilityRegistry, c.id, dependencies.CapabilityDonID, c.lggr, cfg.IsLocal) + if err != nil { + return fmt.Errorf("failed to init DON: %w", err) + } + c.DON = &myDON + scheduler, err = ts.InitialiseTransmissionScheduler(ctx, dependencies.CapabilityRegistry, cfg.DeltaStage, c.lggr, c.DON, cfg.IsLocal) + if err != nil { + return fmt.Errorf("failed to initialize transmission scheduler: %w", err) + } + } + consensusMetrics, err := consMetrics.NewConsensusMetrics(chainInfo) if err != nil { return fmt.Errorf("failed to create stellar consensus metrics: %w", err) @@ -169,7 +193,6 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor if err != nil { return fmt.Errorf("error when creating oracle: %w", err) } - toStart = append(toStart, c.requestPoller, c.consensusHandler, c.oracle) var nodeAddress string if localNode, lnErr := dependencies.CapabilityRegistry.LocalNode(ctx); lnErr != nil { @@ -188,12 +211,23 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor } messageBuilder := monitoring.NewMessageBuilder(chainInfo, c.CapabilityInfo, nodeAddress) - c.Stellar, err = actions.NewStellar(stellarService, c.lggr, c.chainSelector, c.consensusHandler, messageBuilder, processor) + c.Stellar, err = actions.NewStellar( + stellarService, + cfg.CREForwarderAddress, + cfg.ForwarderLookbackLedgers, + c.lggr, + c.limitsFactory, + scheduler, + c.chainSelector, + c.consensusHandler, + messageBuilder, + processor, + ) if err != nil { return err } - for _, service := range toStart { + for _, service := range []interface{ Start(context.Context) error }{c.requestPoller, c.consensusHandler, c.oracle} { if err = service.Start(ctx); err != nil { return err } @@ -204,8 +238,6 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor } func (c *capabilityGRPCService) setSelector(cfg *config.Config) error { - // When we run against a local network (e.g. local CRE) we can't resolve the chain selector - // since ChainID is environment-specific. if cfg.IsLocal { c.chainSelector = chainselectors.STELLAR_LOCALNET.Selector return nil @@ -237,12 +269,10 @@ func (c *capabilityGRPCService) unmarshalConfig(configStr string) (*config.Confi cfg.UnknownRequestsTTL = defaultUnknownRequestsTTL c.lggr.Infof("UnknownRequestsTTL is zero, setting to %s.", cfg.UnknownRequestsTTL) } + if cfg.ForwarderLookbackLedgers == 0 { + cfg.ForwarderLookbackLedgers = defaultForwarderLookbackLedgers + c.lggr.Infof("ForwarderLookbackLedgers is zero, setting to %d.", cfg.ForwarderLookbackLedgers) + } return &cfg, nil } - -type closeFunc func() error - -func (f closeFunc) Close() error { - return f() -} diff --git a/chain_capabilities/stellar/metering/metering.go b/chain_capabilities/stellar/metering/metering.go index 948e0f429..eae9dcd51 100644 --- a/chain_capabilities/stellar/metering/metering.go +++ b/chain_capabilities/stellar/metering/metering.go @@ -1,6 +1,9 @@ package metering import ( + "fmt" + "strconv" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" ) @@ -11,6 +14,9 @@ const ( // ReadContract is the placeholder spend value for a ReadContract (simulateTransaction) read. // TODO: PLEX-3022 - replace with actual values. ReadContract SpendValueCredits = "1" + + // WriteReportSpendUnitFormat is the spend unit for write operations, parameterised by chain selector. + WriteReportSpendUnitFormat = "STROOP.%d" ) // GetResponseMetadata returns the response metadata (metering detail) for a given read action. @@ -25,3 +31,17 @@ func GetResponseMetadata(action SpendValueCredits) capabilities.ResponseMetadata }, } } + +// GetResponseMetadataWriteReport returns billing ResponseMetadata for a completed write-report +// submission. feeStroops is the actual FeeCharged from the confirmed transaction. +func GetResponseMetadataWriteReport(feeStroops uint64, chainSelector uint64) capabilities.ResponseMetadata { + return capabilities.ResponseMetadata{ + Metering: []capabilities.MeteringNodeDetail{ + { + // Peer2PeerID is assigned by the engine, leaving it empty here. + SpendValue: strconv.FormatUint(feeStroops, 10), + SpendUnit: fmt.Sprintf(WriteReportSpendUnitFormat, chainSelector), + }, + }, + } +} diff --git a/chain_capabilities/stellar/monitoring/messages.go b/chain_capabilities/stellar/monitoring/messages.go index 82582a7c9..64cba7f1d 100644 --- a/chain_capabilities/stellar/monitoring/messages.go +++ b/chain_capabilities/stellar/monitoring/messages.go @@ -26,14 +26,14 @@ func NewMessageBuilder(chainInfo types.ChainInfo, capInfo capabilities.Capabilit } } -func (m *MessageBuilder) BuildReadContractInitiated(tc commonmon.TelemetryContext, req stellartypes.ReadContractRequest) *ReadContractInitiated { +func (m *MessageBuilder) BuildReadContractInitiated(tc commonmon.TelemetryContext, req stellartypes.SimulateTransactionRequest) *ReadContractInitiated { return &ReadContractInitiated{ Req: convertReadContractRequest(req), ExecutionContext: m.BuildExecutionContext(tc), } } -func (m *MessageBuilder) BuildReadContractSuccess(tc commonmon.TelemetryContext, req stellartypes.ReadContractRequest, resultLen uint64, ledgerSequence uint32) *ReadContractSuccess { +func (m *MessageBuilder) BuildReadContractSuccess(tc commonmon.TelemetryContext, req stellartypes.SimulateTransactionRequest, resultLen uint64, ledgerSequence uint32) *ReadContractSuccess { return &ReadContractSuccess{ Req: convertReadContractRequest(req), ResultLen: resultLen, @@ -42,7 +42,7 @@ func (m *MessageBuilder) BuildReadContractSuccess(tc commonmon.TelemetryContext, } } -func (m *MessageBuilder) BuildReadContractError(tc commonmon.TelemetryContext, req stellartypes.ReadContractRequest, summary string, err caperrors.Error) commonmon.ErrorMessage { +func (m *MessageBuilder) BuildReadContractError(tc commonmon.TelemetryContext, req stellartypes.SimulateTransactionRequest, summary string, err caperrors.Error) commonmon.ErrorMessage { return &ReadContractError{ Req: convertReadContractRequest(req), Summary: summary, @@ -54,7 +54,7 @@ func (m *MessageBuilder) BuildReadContractError(tc commonmon.TelemetryContext, r // convertReadContractRequest extracts the non-sensitive subset of the request for telemetry // (raw argument values are intentionally omitted; only the count is recorded). -func convertReadContractRequest(req stellartypes.ReadContractRequest) *ReadContractRequest { +func convertReadContractRequest(req stellartypes.SimulateTransactionRequest) *ReadContractRequest { return &ReadContractRequest{ ContractId: req.ContractID, Function: req.Function,