1 | // Copyright 2019 The Go Authors. All rights reserved. |
---|---|
2 | // Use of this source code is governed by a BSD-style |
3 | // license that can be found in the LICENSE file. |
4 | |
5 | // Package ocagent adds the ability to export all telemetry to an ocagent. |
6 | // This keeps the compile time dependencies to zero and allows the agent to |
7 | // have the exporters needed for telemetry aggregation and viewing systems. |
8 | package ocagent |
9 | |
10 | import ( |
11 | "bytes" |
12 | "context" |
13 | "encoding/json" |
14 | "fmt" |
15 | "net/http" |
16 | "os" |
17 | "path/filepath" |
18 | "sync" |
19 | "time" |
20 | |
21 | "golang.org/x/tools/internal/event" |
22 | "golang.org/x/tools/internal/event/core" |
23 | "golang.org/x/tools/internal/event/export" |
24 | "golang.org/x/tools/internal/event/export/metric" |
25 | "golang.org/x/tools/internal/event/export/ocagent/wire" |
26 | "golang.org/x/tools/internal/event/keys" |
27 | "golang.org/x/tools/internal/event/label" |
28 | ) |
29 | |
30 | type Config struct { |
31 | Start time.Time |
32 | Host string |
33 | Process uint32 |
34 | Client *http.Client |
35 | Service string |
36 | Address string |
37 | Rate time.Duration |
38 | } |
39 | |
40 | var ( |
41 | connectMu sync.Mutex |
42 | exporters = make(map[Config]*Exporter) |
43 | ) |
44 | |
45 | // Discover finds the local agent to export to, it will return nil if there |
46 | // is not one running. |
47 | // TODO: Actually implement a discovery protocol rather than a hard coded address |
48 | func Discover() *Config { |
49 | return &Config{ |
50 | Address: "http://localhost:55678", |
51 | } |
52 | } |
53 | |
54 | type Exporter struct { |
55 | mu sync.Mutex |
56 | config Config |
57 | spans []*export.Span |
58 | metrics []metric.Data |
59 | } |
60 | |
61 | // Connect creates a process specific exporter with the specified |
62 | // serviceName and the address of the ocagent to which it will upload |
63 | // its telemetry. |
64 | func Connect(config *Config) *Exporter { |
65 | if config == nil || config.Address == "off" { |
66 | return nil |
67 | } |
68 | resolved := *config |
69 | if resolved.Host == "" { |
70 | hostname, _ := os.Hostname() |
71 | resolved.Host = hostname |
72 | } |
73 | if resolved.Process == 0 { |
74 | resolved.Process = uint32(os.Getpid()) |
75 | } |
76 | if resolved.Client == nil { |
77 | resolved.Client = http.DefaultClient |
78 | } |
79 | if resolved.Service == "" { |
80 | resolved.Service = filepath.Base(os.Args[0]) |
81 | } |
82 | if resolved.Rate == 0 { |
83 | resolved.Rate = 2 * time.Second |
84 | } |
85 | |
86 | connectMu.Lock() |
87 | defer connectMu.Unlock() |
88 | if exporter, found := exporters[resolved]; found { |
89 | return exporter |
90 | } |
91 | exporter := &Exporter{config: resolved} |
92 | exporters[resolved] = exporter |
93 | if exporter.config.Start.IsZero() { |
94 | exporter.config.Start = time.Now() |
95 | } |
96 | go func() { |
97 | for range time.Tick(exporter.config.Rate) { |
98 | exporter.Flush() |
99 | } |
100 | }() |
101 | return exporter |
102 | } |
103 | |
104 | func (e *Exporter) ProcessEvent(ctx context.Context, ev core.Event, lm label.Map) context.Context { |
105 | switch { |
106 | case event.IsEnd(ev): |
107 | e.mu.Lock() |
108 | defer e.mu.Unlock() |
109 | span := export.GetSpan(ctx) |
110 | if span != nil { |
111 | e.spans = append(e.spans, span) |
112 | } |
113 | case event.IsMetric(ev): |
114 | e.mu.Lock() |
115 | defer e.mu.Unlock() |
116 | data := metric.Entries.Get(lm).([]metric.Data) |
117 | e.metrics = append(e.metrics, data...) |
118 | } |
119 | return ctx |
120 | } |
121 | |
122 | func (e *Exporter) Flush() { |
123 | e.mu.Lock() |
124 | defer e.mu.Unlock() |
125 | spans := make([]*wire.Span, len(e.spans)) |
126 | for i, s := range e.spans { |
127 | spans[i] = convertSpan(s) |
128 | } |
129 | e.spans = nil |
130 | metrics := make([]*wire.Metric, len(e.metrics)) |
131 | for i, m := range e.metrics { |
132 | metrics[i] = convertMetric(m, e.config.Start) |
133 | } |
134 | e.metrics = nil |
135 | |
136 | if len(spans) > 0 { |
137 | e.send("/v1/trace", &wire.ExportTraceServiceRequest{ |
138 | Node: e.config.buildNode(), |
139 | Spans: spans, |
140 | //TODO: Resource? |
141 | }) |
142 | } |
143 | if len(metrics) > 0 { |
144 | e.send("/v1/metrics", &wire.ExportMetricsServiceRequest{ |
145 | Node: e.config.buildNode(), |
146 | Metrics: metrics, |
147 | //TODO: Resource? |
148 | }) |
149 | } |
150 | } |
151 | |
152 | func (cfg *Config) buildNode() *wire.Node { |
153 | return &wire.Node{ |
154 | Identifier: &wire.ProcessIdentifier{ |
155 | HostName: cfg.Host, |
156 | Pid: cfg.Process, |
157 | StartTimestamp: convertTimestamp(cfg.Start), |
158 | }, |
159 | LibraryInfo: &wire.LibraryInfo{ |
160 | Language: wire.LanguageGo, |
161 | ExporterVersion: "0.0.1", |
162 | CoreLibraryVersion: "x/tools", |
163 | }, |
164 | ServiceInfo: &wire.ServiceInfo{ |
165 | Name: cfg.Service, |
166 | }, |
167 | } |
168 | } |
169 | |
170 | func (e *Exporter) send(endpoint string, message interface{}) { |
171 | blob, err := json.Marshal(message) |
172 | if err != nil { |
173 | errorInExport("ocagent failed to marshal message for %v: %v", endpoint, err) |
174 | return |
175 | } |
176 | uri := e.config.Address + endpoint |
177 | req, err := http.NewRequest("POST", uri, bytes.NewReader(blob)) |
178 | if err != nil { |
179 | errorInExport("ocagent failed to build request for %v: %v", uri, err) |
180 | return |
181 | } |
182 | req.Header.Set("Content-Type", "application/json") |
183 | res, err := e.config.Client.Do(req) |
184 | if err != nil { |
185 | errorInExport("ocagent failed to send message: %v \n", err) |
186 | return |
187 | } |
188 | if res.Body != nil { |
189 | res.Body.Close() |
190 | } |
191 | } |
192 | |
193 | func errorInExport(message string, args ...interface{}) { |
194 | // This function is useful when debugging the exporter, but in general we |
195 | // want to just drop any export |
196 | } |
197 | |
198 | func convertTimestamp(t time.Time) wire.Timestamp { |
199 | return t.Format(time.RFC3339Nano) |
200 | } |
201 | |
202 | func toTruncatableString(s string) *wire.TruncatableString { |
203 | if s == "" { |
204 | return nil |
205 | } |
206 | return &wire.TruncatableString{Value: s} |
207 | } |
208 | |
209 | func convertSpan(span *export.Span) *wire.Span { |
210 | result := &wire.Span{ |
211 | TraceID: span.ID.TraceID[:], |
212 | SpanID: span.ID.SpanID[:], |
213 | TraceState: nil, //TODO? |
214 | ParentSpanID: span.ParentID[:], |
215 | Name: toTruncatableString(span.Name), |
216 | Kind: wire.UnspecifiedSpanKind, |
217 | StartTime: convertTimestamp(span.Start().At()), |
218 | EndTime: convertTimestamp(span.Finish().At()), |
219 | Attributes: convertAttributes(span.Start(), 1), |
220 | TimeEvents: convertEvents(span.Events()), |
221 | SameProcessAsParentSpan: true, |
222 | //TODO: StackTrace? |
223 | //TODO: Links? |
224 | //TODO: Status? |
225 | //TODO: Resource? |
226 | } |
227 | return result |
228 | } |
229 | |
230 | func convertMetric(data metric.Data, start time.Time) *wire.Metric { |
231 | descriptor := dataToMetricDescriptor(data) |
232 | timeseries := dataToTimeseries(data, start) |
233 | |
234 | if descriptor == nil && timeseries == nil { |
235 | return nil |
236 | } |
237 | |
238 | // TODO: handle Histogram metrics |
239 | return &wire.Metric{ |
240 | MetricDescriptor: descriptor, |
241 | Timeseries: timeseries, |
242 | // TODO: attach Resource? |
243 | } |
244 | } |
245 | |
246 | func skipToValidLabel(list label.List, index int) (int, label.Label) { |
247 | // skip to the first valid label |
248 | for ; list.Valid(index); index++ { |
249 | l := list.Label(index) |
250 | if !l.Valid() || l.Key() == keys.Label { |
251 | continue |
252 | } |
253 | return index, l |
254 | } |
255 | return -1, label.Label{} |
256 | } |
257 | |
258 | func convertAttributes(list label.List, index int) *wire.Attributes { |
259 | index, l := skipToValidLabel(list, index) |
260 | if !l.Valid() { |
261 | return nil |
262 | } |
263 | attributes := make(map[string]wire.Attribute) |
264 | for { |
265 | if l.Valid() { |
266 | attributes[l.Key().Name()] = convertAttribute(l) |
267 | } |
268 | index++ |
269 | if !list.Valid(index) { |
270 | return &wire.Attributes{AttributeMap: attributes} |
271 | } |
272 | l = list.Label(index) |
273 | } |
274 | } |
275 | |
276 | func convertAttribute(l label.Label) wire.Attribute { |
277 | switch key := l.Key().(type) { |
278 | case *keys.Int: |
279 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
280 | case *keys.Int8: |
281 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
282 | case *keys.Int16: |
283 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
284 | case *keys.Int32: |
285 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
286 | case *keys.Int64: |
287 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
288 | case *keys.UInt: |
289 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
290 | case *keys.UInt8: |
291 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
292 | case *keys.UInt16: |
293 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
294 | case *keys.UInt32: |
295 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
296 | case *keys.UInt64: |
297 | return wire.IntAttribute{IntValue: int64(key.From(l))} |
298 | case *keys.Float32: |
299 | return wire.DoubleAttribute{DoubleValue: float64(key.From(l))} |
300 | case *keys.Float64: |
301 | return wire.DoubleAttribute{DoubleValue: key.From(l)} |
302 | case *keys.Boolean: |
303 | return wire.BoolAttribute{BoolValue: key.From(l)} |
304 | case *keys.String: |
305 | return wire.StringAttribute{StringValue: toTruncatableString(key.From(l))} |
306 | case *keys.Error: |
307 | return wire.StringAttribute{StringValue: toTruncatableString(key.From(l).Error())} |
308 | case *keys.Value: |
309 | return wire.StringAttribute{StringValue: toTruncatableString(fmt.Sprint(key.From(l)))} |
310 | default: |
311 | return wire.StringAttribute{StringValue: toTruncatableString(fmt.Sprintf("%T", key))} |
312 | } |
313 | } |
314 | |
315 | func convertEvents(events []core.Event) *wire.TimeEvents { |
316 | //TODO: MessageEvents? |
317 | result := make([]wire.TimeEvent, len(events)) |
318 | for i, event := range events { |
319 | result[i] = convertEvent(event) |
320 | } |
321 | return &wire.TimeEvents{TimeEvent: result} |
322 | } |
323 | |
324 | func convertEvent(ev core.Event) wire.TimeEvent { |
325 | return wire.TimeEvent{ |
326 | Time: convertTimestamp(ev.At()), |
327 | Annotation: convertAnnotation(ev), |
328 | } |
329 | } |
330 | |
331 | func getAnnotationDescription(ev core.Event) (string, int) { |
332 | l := ev.Label(0) |
333 | if l.Key() != keys.Msg { |
334 | return "", 0 |
335 | } |
336 | if msg := keys.Msg.From(l); msg != "" { |
337 | return msg, 1 |
338 | } |
339 | l = ev.Label(1) |
340 | if l.Key() != keys.Err { |
341 | return "", 1 |
342 | } |
343 | if err := keys.Err.From(l); err != nil { |
344 | return err.Error(), 2 |
345 | } |
346 | return "", 2 |
347 | } |
348 | |
349 | func convertAnnotation(ev core.Event) *wire.Annotation { |
350 | description, index := getAnnotationDescription(ev) |
351 | if _, l := skipToValidLabel(ev, index); !l.Valid() && description == "" { |
352 | return nil |
353 | } |
354 | return &wire.Annotation{ |
355 | Description: toTruncatableString(description), |
356 | Attributes: convertAttributes(ev, index), |
357 | } |
358 | } |
359 |
Members