-
Notifications
You must be signed in to change notification settings - Fork 380
feat: make both feed versions race together #5287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
67efc3b
4a65577
e3cb3a3
166edba
feed2b4
5a4d6cd
bca89ea
0abde88
32370e6
909173c
4cd9bd5
cd4da09
080de51
21d788f
615d3ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -337,15 +337,7 @@ func (s *Service) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) { | |
| paths.Path = strings.TrimRight(paths.Path, "/") + "/" // NOTE: leave one slash if there was some. | ||
| } | ||
|
|
||
| queries := struct { | ||
| FeedLegacyResolve bool `map:"swarm-feed-legacy-resolve"` | ||
| }{} | ||
| if response := s.mapStructure(r.URL.Query(), &queries); response != nil { | ||
| response("invalid query params", logger, w) | ||
| return | ||
| } | ||
|
|
||
| s.serveReference(logger, address, paths.Path, w, r, false, queries.FeedLegacyResolve) | ||
| s.serveReference(logger, address, paths.Path, w, r, false) | ||
| } | ||
|
|
||
| func (s *Service) bzzHeadHandler(w http.ResponseWriter, r *http.Request) { | ||
|
|
@@ -360,14 +352,6 @@ func (s *Service) bzzHeadHandler(w http.ResponseWriter, r *http.Request) { | |
| return | ||
| } | ||
|
|
||
| queries := struct { | ||
| FeedLegacyResolve bool `map:"swarm-feed-legacy-resolve"` | ||
| }{} | ||
| if response := s.mapStructure(r.URL.Query(), &queries); response != nil { | ||
| response("invalid query params", logger, w) | ||
| return | ||
| } | ||
|
|
||
| address := paths.Address | ||
| if v := getAddressFromContext(r.Context()); !v.IsZero() { | ||
| address = v | ||
|
|
@@ -377,10 +361,119 @@ func (s *Service) bzzHeadHandler(w http.ResponseWriter, r *http.Request) { | |
| paths.Path = strings.TrimRight(paths.Path, "/") + "/" // NOTE: leave one slash if there was some. | ||
| } | ||
|
|
||
| s.serveReference(logger, address, paths.Path, w, r, true, queries.FeedLegacyResolve) | ||
| s.serveReference(logger, address, paths.Path, w, r, true) | ||
| } | ||
|
|
||
| func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathVar string, w http.ResponseWriter, r *http.Request, headerOnly bool, feedLegacyResolve bool) { | ||
| type getWrappedResult struct { | ||
| ch swarm.Chunk | ||
| v1 bool // indicates whether the feed that was resolved is v1. false if v2 | ||
| err error | ||
| } | ||
|
|
||
| // resolveFeed races the resolution of both types of feeds. | ||
| // figure out if its a v1 or v2 chunk. | ||
| // it returns the first correct feed found, the type found ("v1" or "v2") or an error. | ||
| func (s *Service) resolveFeed(ctx context.Context, getter storage.Getter, ch swarm.Chunk) (swarm.Chunk, string, error) { | ||
| innerCtx, cancel := context.WithCancel(ctx) | ||
| defer cancel() | ||
| getWrapped := func(v1 bool) chan getWrappedResult { | ||
| ret := make(chan getWrappedResult) | ||
| go func() { | ||
| wc, err := feeds.GetWrappedChunk(innerCtx, getter, ch, v1) | ||
| if err != nil { | ||
| select { | ||
| case ret <- getWrappedResult{nil, v1, err}: | ||
| return | ||
| case <-innerCtx.Done(): | ||
| return | ||
| } | ||
| } | ||
|
|
||
| // here we just check whether the address is retrievable. | ||
| // if it returns an error we send that over the channel, otherwise | ||
| // we send the wc chunk back to the caller so that the feed can be | ||
| // dereferenced. | ||
| _, err = getter.Get(innerCtx, wc.Address()) | ||
| if err != nil { | ||
| select { | ||
| case ret <- getWrappedResult{wc, v1, err}: | ||
| return | ||
| case <-innerCtx.Done(): | ||
| return | ||
| } | ||
| } | ||
| select { | ||
| case ret <- getWrappedResult{wc, v1, nil}: | ||
| return | ||
| case <-innerCtx.Done(): | ||
| return | ||
| } | ||
| }() | ||
| return ret | ||
| } | ||
| isV1, err := feeds.IsV1Payload(ch) | ||
| if err != nil { | ||
| return nil, "", err | ||
| } | ||
| // if we have v1 length, it means there's ambiguity so we | ||
| // should fetch both feed versions. if the length isn't v1 | ||
| // then we should only try to fetch v2. | ||
| var ( | ||
| v1C, v2C chan getWrappedResult | ||
| both = false | ||
| ) | ||
| if isV1 { | ||
| both = true | ||
| v1C = getWrapped(true) | ||
| v2C = getWrapped(false) | ||
| } else { | ||
| v2C = getWrapped(false) | ||
| } | ||
|
|
||
| // closure to handle processing one channel then the other. | ||
| // the "resolving" parameter is meant to tell the closure which feed type is in the result struct | ||
| // which in turns allows it to return which feed type was resolved. | ||
| processChanOutput := func(resolving string, result getWrappedResult, other chan getWrappedResult) (swarm.Chunk, string, error) { | ||
| defer cancel() | ||
| if !both { | ||
| if resolving == "v2" { | ||
| return result.ch, resolving, nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In practice we can also have here result.err != nil , right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's the problem here, the answer unfortunately is "no". Have a look at the PR description - this issue is documented thoroughly. The problem is the v2 can be either a manifest address OR actual data. This means that you can't "lookup what v2 references" because it may unmarshal as junk in case it is 48/80 bytes long data verbatim which is allowed. |
||
| } | ||
| return result.ch, resolving, result.err | ||
| } | ||
| // both are being checked. if there's no err return the chunk | ||
| // otherwise wait for the other channel | ||
| if result.err == nil { | ||
| return result.ch, resolving, nil | ||
| } | ||
| if resolving == "v1" { | ||
| resolving = "v2" | ||
| } else { | ||
| resolving = "v1" | ||
| } | ||
| // wait for the other one | ||
| select { | ||
| case result := <-other: | ||
| if !result.v1 { | ||
| // resolving v2 | ||
| return result.ch, resolving, nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, in practice we can also have here result.err != nil , right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
| } | ||
| return result.ch, resolving, result.err | ||
| case <-innerCtx.Done(): | ||
| return nil, "", ctx.Err() | ||
| } | ||
| } | ||
| select { | ||
| case v1r := <-v1C: | ||
| return processChanOutput("v1", v1r, v2C) | ||
| case v2r := <-v2C: | ||
| return processChanOutput("v2", v2r, v1C) | ||
| case <-innerCtx.Done(): | ||
| return nil, "", ctx.Err() | ||
| } | ||
| } | ||
|
|
||
| func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathVar string, w http.ResponseWriter, r *http.Request, headerOnly bool) { | ||
| loggerV1 := logger.V(1).Build() | ||
|
|
||
| headers := struct { | ||
|
|
@@ -415,7 +508,6 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV | |
| jsonhttp.BadRequest(w, "could not parse headers") | ||
| return | ||
| } | ||
|
|
||
| FETCH: | ||
| // read manifest entry | ||
| m, err := manifest.NewDefaultManifestReference( | ||
|
|
@@ -449,7 +541,8 @@ FETCH: | |
| jsonhttp.NotFound(w, "no update found") | ||
| return | ||
| } | ||
| wc, err := feeds.GetWrappedChunk(ctx, s.storer.Download(cache), ch, feedLegacyResolve) | ||
|
|
||
| wc, feedVer, err := s.resolveFeed(ctx, s.storer.Download(cache), ch) | ||
| if err != nil { | ||
| if errors.Is(err, feeds.ErrNotLegacyPayload) { | ||
| logger.Debug("bzz: download: feed is not a legacy payload") | ||
|
|
@@ -468,10 +561,10 @@ FETCH: | |
| jsonhttp.InternalServerError(w, "mapStructure feed update") | ||
| return | ||
| } | ||
|
|
||
| address = wc.Address() | ||
| // modify ls and init with non-existing wrapped chunk | ||
| ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), s.storer.Cache(), wc, rLevel) | ||
|
|
||
| feedDereferenced = true | ||
| curBytes, err := cur.MarshalBinary() | ||
| if err != nil { | ||
|
|
@@ -482,6 +575,7 @@ FETCH: | |
| } | ||
|
|
||
| w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes)) | ||
| w.Header().Set(SwarmFeedResolvedVersionHeader, feedVer) | ||
| // this header might be overriding others. handle with care. in the future | ||
| // we should implement an append functionality for this specific header, | ||
| // since different parts of handlers might be overriding others' values | ||
|
|
@@ -490,7 +584,6 @@ FETCH: | |
| goto FETCH | ||
| } | ||
| } | ||
|
|
||
| if pathVar == "" { | ||
| loggerV1.Debug("bzz download: handle empty path", "address", address) | ||
|
|
||
|
|
@@ -505,6 +598,7 @@ FETCH: | |
| return | ||
| } | ||
| } | ||
|
|
||
| logger.Debug("bzz download: address not found or incorrect", "address", address, "path", pathVar) | ||
| logger.Error(nil, "address not found or incorrect") | ||
| jsonhttp.NotFound(w, "address not found or incorrect") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this redundant, as in
GetWrappedChunkwe have agetter.Get(...)that returnsnil, erriferr, orwc, nil?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no.
GetWrappedChunkonly callsGeton the legacy payload, not on a v2 type chunk. Also here, notice that we send the error on the channel, but also whether it is a v1 or v2 payload. Because v2 cannot technically error, we must short-circuit the error and return the payload anyway.