| 1 | // Copyright 2019 The Go Authors. All rights reserved. |
|---|---|
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package jsonrpc2 |
| 6 | |
| 7 | import ( |
| 8 | "context" |
| 9 | "fmt" |
| 10 | "sync" |
| 11 | |
| 12 | "golang.org/x/tools/internal/event" |
| 13 | ) |
| 14 | |
| 15 | // Handler is invoked to handle incoming requests. |
| 16 | // The Replier sends a reply to the request and must be called exactly once. |
| 17 | type Handler func(ctx context.Context, reply Replier, req Request) error |
| 18 | |
| 19 | // Replier is passed to handlers to allow them to reply to the request. |
| 20 | // If err is set then result will be ignored. |
| 21 | type Replier func(ctx context.Context, result interface{}, err error) error |
| 22 | |
| 23 | // MethodNotFound is a Handler that replies to all call requests with the |
| 24 | // standard method not found response. |
| 25 | // This should normally be the final handler in a chain. |
| 26 | func MethodNotFound(ctx context.Context, reply Replier, req Request) error { |
| 27 | return reply(ctx, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method())) |
| 28 | } |
| 29 | |
| 30 | // MustReplyHandler creates a Handler that panics if the wrapped handler does |
| 31 | // not call Reply for every request that it is passed. |
| 32 | func MustReplyHandler(handler Handler) Handler { |
| 33 | return func(ctx context.Context, reply Replier, req Request) error { |
| 34 | called := false |
| 35 | err := handler(ctx, func(ctx context.Context, result interface{}, err error) error { |
| 36 | if called { |
| 37 | panic(fmt.Errorf("request %q replied to more than once", req.Method())) |
| 38 | } |
| 39 | called = true |
| 40 | return reply(ctx, result, err) |
| 41 | }, req) |
| 42 | if !called { |
| 43 | panic(fmt.Errorf("request %q was never replied to", req.Method())) |
| 44 | } |
| 45 | return err |
| 46 | } |
| 47 | } |
| 48 | |
| 49 | // CancelHandler returns a handler that supports cancellation, and a function |
| 50 | // that can be used to trigger canceling in progress requests. |
| 51 | func CancelHandler(handler Handler) (Handler, func(id ID)) { |
| 52 | var mu sync.Mutex |
| 53 | handling := make(map[ID]context.CancelFunc) |
| 54 | wrapped := func(ctx context.Context, reply Replier, req Request) error { |
| 55 | if call, ok := req.(*Call); ok { |
| 56 | cancelCtx, cancel := context.WithCancel(ctx) |
| 57 | ctx = cancelCtx |
| 58 | mu.Lock() |
| 59 | handling[call.ID()] = cancel |
| 60 | mu.Unlock() |
| 61 | innerReply := reply |
| 62 | reply = func(ctx context.Context, result interface{}, err error) error { |
| 63 | mu.Lock() |
| 64 | delete(handling, call.ID()) |
| 65 | mu.Unlock() |
| 66 | return innerReply(ctx, result, err) |
| 67 | } |
| 68 | } |
| 69 | return handler(ctx, reply, req) |
| 70 | } |
| 71 | return wrapped, func(id ID) { |
| 72 | mu.Lock() |
| 73 | cancel, found := handling[id] |
| 74 | mu.Unlock() |
| 75 | if found { |
| 76 | cancel() |
| 77 | } |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | // AsyncHandler returns a handler that processes each request goes in its own |
| 82 | // goroutine. |
| 83 | // The handler returns immediately, without the request being processed. |
| 84 | // Each request then waits for the previous request to finish before it starts. |
| 85 | // This allows the stream to unblock at the cost of unbounded goroutines |
| 86 | // all stalled on the previous one. |
| 87 | func AsyncHandler(handler Handler) Handler { |
| 88 | nextRequest := make(chan struct{}) |
| 89 | close(nextRequest) |
| 90 | return func(ctx context.Context, reply Replier, req Request) error { |
| 91 | waitForPrevious := nextRequest |
| 92 | nextRequest = make(chan struct{}) |
| 93 | unlockNext := nextRequest |
| 94 | innerReply := reply |
| 95 | reply = func(ctx context.Context, result interface{}, err error) error { |
| 96 | close(unlockNext) |
| 97 | return innerReply(ctx, result, err) |
| 98 | } |
| 99 | _, queueDone := event.Start(ctx, "queued") |
| 100 | go func() { |
| 101 | <-waitForPrevious |
| 102 | queueDone() |
| 103 | if err := handler(ctx, reply, req); err != nil { |
| 104 | event.Error(ctx, "jsonrpc2 async message delivery failed", err) |
| 105 | } |
| 106 | }() |
| 107 | return nil |
| 108 | } |
| 109 | } |
| 110 |
Members