File: //opt/go/pkg/mod/github.com/aws/
[email protected]/private/model/api/eventstream_tmpl.go
//go:build codegen
// +build codegen
package api
import (
"fmt"
"io"
"strings"
"text/template"
)
func renderEventStreamAPI(w io.Writer, op *Operation) error {
// Imports needed by the EventStream APIs.
op.API.AddImport("fmt")
op.API.AddImport("bytes")
op.API.AddImport("io")
op.API.AddImport("time")
op.API.AddSDKImport("aws")
op.API.AddSDKImport("aws/awserr")
op.API.AddSDKImport("aws/request")
op.API.AddSDKImport("private/protocol/eventstream")
op.API.AddSDKImport("private/protocol/eventstream/eventstreamapi")
// usages of these imports are conditional - generate a compile-only
// reference to avoid potential unused imports:
// - awserr is only used for input streams or json protocols
// - time is only used for input streams or if an event payload has a
// timestamp field
w.Write([]byte(`
var _ awserr.Error
var _ time.Time
`))
return eventStreamAPITmpl.Execute(w, op)
}
// Template for an EventStream API Shape that will provide read/writing events
// across the EventStream. This is a special shape that's only public members
// are the Events channel and a Close and Err method.
//
// Executed in the context of a Shape.
var eventStreamAPITmpl = template.Must(
template.New("eventStreamAPITmplDef").
Funcs(template.FuncMap{
"unexported": func(v string) string {
return strings.ToLower(string(v[0])) + v[1:]
},
}).
Parse(eventStreamAPITmplDef),
)
const eventStreamAPITmplDef = `
{{- $esapi := $.EventStreamAPI }}
{{- $outputStream := $esapi.OutputStream }}
{{- $inputStream := $esapi.InputStream }}
// {{ $esapi.Name }} provides the event stream handling for the {{ $.ExportedName }}.
//
// For testing and mocking the event stream this type should be initialized via
// the New{{ $esapi.Name }} constructor function. Using the functional options
// to pass in nested mock behavior.
type {{ $esapi.Name }} struct {
{{- if $inputStream }}
// Writer is the EventStream writer for the {{ $inputStream.Name }}
// events. This value is automatically set by the SDK when the API call is made
// Use this member when unit testing your code with the SDK to mock out the
// EventStream Writer.
//
// Must not be nil.
Writer {{ $inputStream.StreamWriterAPIName }}
inputWriter io.WriteCloser
{{- if eq .API.Metadata.Protocol "json" }}
input {{ $.InputRef.GoType }}
{{- end }}
{{- end }}
{{- if $outputStream }}
// Reader is the EventStream reader for the {{ $outputStream.Name }}
// events. This value is automatically set by the SDK when the API call is made
// Use this member when unit testing your code with the SDK to mock out the
// EventStream Reader.
//
// Must not be nil.
Reader {{ $outputStream.StreamReaderAPIName }}
outputReader io.ReadCloser
{{- if eq .API.Metadata.Protocol "json" }}
output {{ $.OutputRef.GoType }}
{{- end }}
{{- end }}
{{- if $esapi.Legacy }}
// StreamCloser is the io.Closer for the EventStream connection. For HTTP
// EventStream this is the response Body. The stream will be closed when
// the Close method of the EventStream is called.
StreamCloser io.Closer
{{- end }}
done chan struct{}
closeOnce sync.Once
err *eventstreamapi.OnceError
}
// New{{ $esapi.Name }} initializes an {{ $esapi.Name }}.
// This function should only be used for testing and mocking the {{ $esapi.Name }}
// stream within your application.
{{- if $inputStream }}
//
// The Writer member must be set before writing events to the stream.
{{- end }}
{{- if $outputStream }}
//
// The Reader member must be set before reading events from the stream.
{{- end }}
{{- if $esapi.Legacy }}
//
// The StreamCloser member should be set to the underlying io.Closer,
// (e.g. http.Response.Body), that will be closed when the stream Close method
// is called.
{{- end }}
//
// es := New{{ $esapi.Name }}(func(o *{{ $esapi.Name}}){
{{- if $inputStream }}
// es.Writer = myMockStreamWriter
{{- end }}
{{- if $outputStream }}
// es.Reader = myMockStreamReader
{{- end }}
{{- if $esapi.Legacy }}
// es.StreamCloser = myMockStreamCloser
{{- end }}
// })
func New{{ $esapi.Name }}(opts ...func(*{{ $esapi.Name}})) *{{ $esapi.Name }} {
es := &{{ $esapi.Name }} {
done: make(chan struct{}),
err: eventstreamapi.NewOnceError(),
}
for _, fn := range opts {
fn(es)
}
return es
}
{{- if $esapi.Legacy }}
func (es *{{ $esapi.Name }}) setStreamCloser(r *request.Request) {
es.StreamCloser = r.HTTPResponse.Body
}
{{- end }}
func (es *{{ $esapi.Name }}) runOnStreamPartClose(r *request.Request) {
if es.done == nil {
return
}
go es.waitStreamPartClose()
}
func (es *{{ $esapi.Name }}) waitStreamPartClose() {
{{- if $inputStream }}
var inputErrCh <-chan struct{}
if v, ok := es.Writer.(interface{ErrorSet() <-chan struct{}}); ok {
inputErrCh = v.ErrorSet()
}
{{- end }}
{{- if $outputStream }}
var outputErrCh <-chan struct{}
if v, ok := es.Reader.(interface{ErrorSet() <-chan struct{}}); ok {
outputErrCh = v.ErrorSet()
}
var outputClosedCh <- chan struct{}
if v, ok := es.Reader.(interface{Closed() <-chan struct{}}); ok {
outputClosedCh = v.Closed()
}
{{- end }}
select {
case <-es.done:
{{- if $inputStream }}
case <-inputErrCh:
es.err.SetError(es.Writer.Err())
es.Close()
{{- end }}
{{- if $outputStream }}
case <-outputErrCh:
es.err.SetError(es.Reader.Err())
es.Close()
case <-outputClosedCh:
if err := es.Reader.Err(); err != nil {
es.err.SetError(es.Reader.Err())
}
es.Close()
{{- end }}
}
}
{{- if $inputStream }}
{{- if eq .API.Metadata.Protocol "json" }}
func {{ $esapi.StreamInputEventTypeGetterName }}(event {{ $inputStream.EventGroupName }}) (string, error) {
if _, ok := event.({{ $.InputRef.GoType }}); ok {
return "initial-request", nil
}
return {{ $inputStream.StreamEventTypeGetterName }}(event)
}
{{- end }}
func (es *{{ $esapi.Name }}) setupInputPipe(r *request.Request) {
inputReader, inputWriter := io.Pipe()
r.SetStreamingBody(inputReader)
es.inputWriter = inputWriter
}
// Closes the input-pipe writer
func (es *{{ $esapi.Name }}) closeInputPipe() error {
if es.inputWriter != nil {
return es.inputWriter.Close()
}
return nil
}
// Send writes the event to the stream blocking until the event is written.
// Returns an error if the event was not written.
//
// These events are:
// {{ range $_, $event := $inputStream.Events }}
// * {{ $event.Shape.ShapeName }}
{{- end }}
func (es *{{ $esapi.Name }}) Send(ctx aws.Context, event {{ $inputStream.EventGroupName }}) error {
return es.Writer.Send(ctx, event)
}
func (es *{{ $esapi.Name }}) runInputStream(r *request.Request) {
var opts []func(*eventstream.Encoder)
if r.Config.Logger != nil && r.Config.LogLevel.Matches(aws.LogDebugWithEventStreamBody) {
opts = append(opts, eventstream.EncodeWithLogger(r.Config.Logger))
}
var encoder eventstreamapi.Encoder = eventstream.NewEncoder(es.inputWriter, opts...)
var closer aws.MultiCloser
{{- if $.ShouldSignRequestBody }}
{{- $_ := $.API.AddSDKImport "aws/signer/v4" }}
sigSeed, err := v4.GetSignedRequestSignature(r.HTTPRequest)
if err != nil {
r.Error = awserr.New(request.ErrCodeSerialization,
"unable to get initial request's signature", err)
return
}
signer := eventstreamapi.NewSignEncoder(
v4.NewStreamSigner(r.ClientInfo.SigningRegion, r.ClientInfo.SigningName,
sigSeed, r.Config.Credentials),
encoder,
)
encoder = signer
closer = append(closer, signer)
{{- end }}
closer = append(closer, es.inputWriter)
eventWriter := eventstreamapi.NewEventWriter(encoder,
protocol.HandlerPayloadMarshal{
Marshalers: r.Handlers.BuildStream,
},
{{- if eq .API.Metadata.Protocol "json" }}
{{ $esapi.StreamInputEventTypeGetterName }},
{{- else }}
{{ $inputStream.StreamEventTypeGetterName }},
{{- end }}
)
es.Writer = &{{ $inputStream.StreamWriterImplName }}{
StreamWriter: eventstreamapi.NewStreamWriter(eventWriter, closer),
}
}
{{- if eq .API.Metadata.Protocol "json" }}
func (es *{{ $esapi.Name }}) sendInitialEvent(r *request.Request) {
if err := es.Send(es.input); err != nil {
r.Error = err
}
}
{{- end }}
{{- end }}
{{- if $outputStream }}
{{- if eq .API.Metadata.Protocol "json" }}
type {{ $esapi.StreamOutputUnmarshalerForEventName }} struct {
unmarshalerForEvent func(string) (eventstreamapi.Unmarshaler, error)
output {{ $.OutputRef.GoType }}
}
func (e {{ $esapi.StreamOutputUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) {
if eventType == "initial-response" {
return e.output, nil
}
return e.unmarshalerForEvent(eventType)
}
{{- end }}
// Events returns a channel to read events from.
//
// These events are:
// {{ range $_, $event := $outputStream.Events }}
// * {{ $event.Shape.ShapeName }}
{{- end }}
// * {{ $outputStream.StreamUnknownEventName }}
func (es *{{ $esapi.Name }}) Events() <-chan {{ $outputStream.EventGroupName }} {
return es.Reader.Events()
}
func (es *{{ $esapi.Name }}) runOutputStream(r *request.Request) {
var opts []func(*eventstream.Decoder)
if r.Config.Logger != nil && r.Config.LogLevel.Matches(aws.LogDebugWithEventStreamBody) {
opts = append(opts, eventstream.DecodeWithLogger(r.Config.Logger))
}
unmarshalerForEvent := {{ $outputStream.StreamUnmarshalerForEventName }}{
metadata: protocol.ResponseMetadata{
StatusCode: r.HTTPResponse.StatusCode,
RequestID: r.RequestID,
},
}.UnmarshalerForEventName
{{- if eq .API.Metadata.Protocol "json" }}
unmarshalerForEvent = {{ $esapi.StreamOutputUnmarshalerForEventName }}{
unmarshalerForEvent: unmarshalerForEvent,
output: es.output,
}.UnmarshalerForEventName
{{- end }}
decoder := eventstream.NewDecoder(r.HTTPResponse.Body, opts...)
eventReader := eventstreamapi.NewEventReader(decoder,
protocol.HandlerPayloadUnmarshal{
Unmarshalers: r.Handlers.UnmarshalStream,
},
unmarshalerForEvent,
)
es.outputReader = r.HTTPResponse.Body
es.Reader = {{ $outputStream.StreamReaderImplConstructorName }}(eventReader)
}
{{- if eq .API.Metadata.Protocol "json" }}
func (es *{{ $esapi.Name }}) recvInitialEvent(r *request.Request) {
// Wait for the initial response event, which must be the first
// event to be received from the API.
select {
case event, ok := <- es.Events():
if !ok {
return
}
v, ok := event.({{ $.OutputRef.GoType }})
if !ok || v == nil {
r.Error = awserr.New(
request.ErrCodeSerialization,
fmt.Sprintf("invalid event, %T, expect %T, %v",
event, ({{ $.OutputRef.GoType }})(nil), v),
nil,
)
return
}
*es.output = *v
es.output.{{ $.EventStreamAPI.OutputMemberName }} = es
}
}
{{- end }}
{{- end }}
// Close closes the stream. This will also cause the stream to be closed.
// Close must be called when done using the stream API. Not calling Close
// may result in resource leaks.
{{- if $inputStream }}
//
// Will close the underlying EventStream writer, and no more events can be
// sent.
{{- end }}
{{- if $outputStream }}
//
// You can use the closing of the Reader's Events channel to terminate your
// application's read from the API's stream.
{{- end }}
//
func (es *{{ $esapi.Name }}) Close() (err error) {
es.closeOnce.Do(es.safeClose)
return es.Err()
}
func (es *{{ $esapi.Name }}) safeClose() {
if es.done != nil {
close(es.done)
}
{{- if $inputStream }}
t := time.NewTicker(time.Second)
defer t.Stop()
writeCloseDone := make(chan error)
go func() {
if err := es.Writer.Close(); err != nil {
es.err.SetError(err)
}
close(writeCloseDone)
}()
select {
case <-t.C:
case <-writeCloseDone:
}
if err := es.closeInputPipe(); err != nil {
es.err.SetError(err)
}
{{- end }}
{{- if $outputStream }}
es.Reader.Close()
if es.outputReader != nil {
es.outputReader.Close()
}
{{- end }}
{{- if $esapi.Legacy }}
es.StreamCloser.Close()
{{- end }}
}
// Err returns any error that occurred while reading or writing EventStream
// Events from the service API's response. Returns nil if there were no errors.
func (es *{{ $esapi.Name }}) Err() error {
if err := es.err.Err(); err != nil {
return err
}
{{- if $inputStream }}
if err := es.Writer.Err(); err != nil {
return err
}
{{- end }}
{{- if $outputStream }}
if err := es.Reader.Err(); err != nil {
return err
}
{{- end }}
return nil
}
`
func renderEventStreamShape(w io.Writer, s *Shape) error {
// Imports needed by the EventStream APIs.
s.API.AddImport("fmt")
s.API.AddImport("bytes")
s.API.AddImport("io")
s.API.AddImport("sync")
s.API.AddSDKImport("aws")
s.API.AddSDKImport("aws/awserr")
s.API.AddSDKImport("private/protocol/eventstream")
s.API.AddSDKImport("private/protocol/eventstream/eventstreamapi")
return eventStreamShapeTmpl.Execute(w, s)
}
var eventStreamShapeTmpl = func() *template.Template {
t := template.Must(
template.New("eventStreamShapeTmplDef").
Parse(eventStreamShapeTmplDef),
)
template.Must(
t.AddParseTree(
"eventStreamShapeWriterTmpl", eventStreamShapeWriterTmpl.Tree),
)
template.Must(
t.AddParseTree(
"eventStreamShapeReaderTmpl", eventStreamShapeReaderTmpl.Tree),
)
return t
}()
const eventStreamShapeTmplDef = `
{{- $eventStream := $.EventStream }}
{{- $eventStreamEventGroup := printf "%sEvent" $eventStream.Name }}
// {{ $eventStreamEventGroup }} groups together all EventStream
// events writes for {{ $eventStream.Name }}.
//
// These events are:
// {{ range $_, $event := $eventStream.Events }}
// * {{ $event.Shape.ShapeName }}
{{- end }}
type {{ $eventStreamEventGroup }} interface {
event{{ $eventStream.Name }}()
eventstreamapi.Marshaler
eventstreamapi.Unmarshaler
}
{{- if $.IsInputEventStream }}
{{- template "eventStreamShapeWriterTmpl" $ }}
{{- end }}
{{- if $.IsOutputEventStream }}
{{- template "eventStreamShapeReaderTmpl" $ }}
{{- end }}
`
// EventStreamHeaderTypeMap provides the mapping of a EventStream Header's
// Value type to the shape reference's member type.
type EventStreamHeaderTypeMap struct {
Header string
Member string
}
// Returns if the event has any members which are not the event's blob payload,
// nor a header.
func eventHasNonBlobPayloadMembers(s *Shape) bool {
num := len(s.MemberRefs)
for _, ref := range s.MemberRefs {
if ref.IsEventHeader || (ref.IsEventPayload && (ref.Shape.Type == "blob" || ref.Shape.Type == "string")) {
num--
}
}
return num > 0
}
func setEventHeaderValueForType(s *Shape, memVar string) string {
switch s.Type {
case "blob":
return fmt.Sprintf("eventstream.BytesValue(%s)", memVar)
case "string":
return fmt.Sprintf("eventstream.StringValue(*%s)", memVar)
case "boolean":
return fmt.Sprintf("eventstream.BoolValue(*%s)", memVar)
case "byte":
return fmt.Sprintf("eventstream.Int8Value(int8(*%s))", memVar)
case "short":
return fmt.Sprintf("eventstream.Int16Value(int16(*%s))", memVar)
case "integer":
return fmt.Sprintf("eventstream.Int32Value(int32(*%s))", memVar)
case "long":
return fmt.Sprintf("eventstream.Int64Value(*%s)", memVar)
case "float":
return fmt.Sprintf("eventstream.Float32Value(float32(*%s))", memVar)
case "double":
return fmt.Sprintf("eventstream.Float64Value(*%s)", memVar)
case "timestamp":
return fmt.Sprintf("eventstream.TimestampValue(*%s)", memVar)
default:
panic(fmt.Sprintf("value type %s not supported for event headers, %s", s.Type, s.ShapeName))
}
}
func shapeMessageType(s *Shape) string {
if s.Exception {
return "eventstreamapi.ExceptionMessageType"
}
return "eventstreamapi.EventMessageType"
}
var eventStreamEventShapeTmplFuncs = template.FuncMap{
"EventStreamHeaderTypeMap": func(ref *ShapeRef) EventStreamHeaderTypeMap {
switch ref.Shape.Type {
case "boolean":
return EventStreamHeaderTypeMap{Header: "bool", Member: "bool"}
case "byte":
return EventStreamHeaderTypeMap{Header: "int8", Member: "int64"}
case "short":
return EventStreamHeaderTypeMap{Header: "int16", Member: "int64"}
case "integer":
return EventStreamHeaderTypeMap{Header: "int32", Member: "int64"}
case "long":
return EventStreamHeaderTypeMap{Header: "int64", Member: "int64"}
case "timestamp":
return EventStreamHeaderTypeMap{Header: "time.Time", Member: "time.Time"}
case "blob":
return EventStreamHeaderTypeMap{Header: "[]byte", Member: "[]byte"}
case "string":
return EventStreamHeaderTypeMap{Header: "string", Member: "string"}
case "uuid":
return EventStreamHeaderTypeMap{Header: "[]byte", Member: "[]byte"}
default:
panic("unsupported EventStream header type, " + ref.Shape.Type)
}
},
"EventHeaderValueForType": setEventHeaderValueForType,
"ShapeMessageType": shapeMessageType,
"HasNonBlobPayloadMembers": eventHasNonBlobPayloadMembers,
}
// Template for an EventStream Event shape. This is a normal API shape that is
// decorated as an EventStream Event.
//
// Executed in the context of a Shape.
var eventStreamEventShapeTmpl = template.Must(template.New("eventStreamEventShapeTmpl").
Funcs(eventStreamEventShapeTmplFuncs).Parse(`
{{ range $_, $eventStream := $.EventFor }}
// The {{ $.ShapeName }} is and event in the {{ $eventStream.Name }} group of events.
func (s *{{ $.ShapeName }}) event{{ $eventStream.Name }}() {}
{{ end }}
// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value.
// This method is only used internally within the SDK's EventStream handling.
func (s *{{ $.ShapeName }}) UnmarshalEvent(
payloadUnmarshaler protocol.PayloadUnmarshaler,
msg eventstream.Message,
) error {
{{- range $memName, $memRef := $.MemberRefs }}
{{- if $memRef.IsEventHeader }}
if hv := msg.Headers.Get("{{ $memName }}"); hv != nil {
{{ $types := EventStreamHeaderTypeMap $memRef -}}
v := hv.Get().({{ $types.Header }})
{{- if ne $types.Header $types.Member }}
m := {{ $types.Member }}(v)
s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}m
{{- else }}
s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}v
{{- end }}
}
{{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "blob")) }}
s.{{ $memName }} = make([]byte, len(msg.Payload))
copy(s.{{ $memName }}, msg.Payload)
{{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "string")) }}
s.{{ $memName }} = aws.String(string(msg.Payload))
{{- end }}
{{- end }}
{{- if HasNonBlobPayloadMembers $ }}
if err := payloadUnmarshaler.UnmarshalPayload(
bytes.NewReader(msg.Payload), s,
); err != nil {
return err
}
{{- end }}
return nil
}
// MarshalEvent marshals the type into an stream event value. This method
// should only used internally within the SDK's EventStream handling.
func (s *{{ $.ShapeName}}) MarshalEvent(pm protocol.PayloadMarshaler) (msg eventstream.Message, err error) {
msg.Headers.Set(eventstreamapi.MessageTypeHeader, eventstream.StringValue({{ ShapeMessageType $ }}))
{{- range $memName, $memRef := $.MemberRefs }}
{{- if $memRef.IsEventHeader }}
{{ $memVar := printf "s.%s" $memName -}}
{{ $typedMem := EventHeaderValueForType $memRef.Shape $memVar -}}
msg.Headers.Set("{{ $memName }}", {{ $typedMem }})
{{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "blob")) }}
msg.Headers.Set(":content-type", eventstream.StringValue("application/octet-stream"))
msg.Payload = s.{{ $memName }}
{{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "string")) }}
msg.Payload = []byte(aws.StringValue(s.{{ $memName }}))
{{- end }}
{{- end }}
{{- if HasNonBlobPayloadMembers $ }}
var buf bytes.Buffer
if err = pm.MarshalPayload(&buf, s); err != nil {
return eventstream.Message{}, err
}
msg.Payload = buf.Bytes()
{{- end }}
return msg, err
}
`))