File: //opt/go/pkg/mod/github.com/aws/
[email protected]/private/model/api/eventstream.go
//go:build codegen
// +build codegen
package api
import (
"bytes"
"fmt"
"text/template"
)
// EventStreamAPI provides details about the event stream async API and
// associated EventStream shapes.
type EventStreamAPI struct {
API *API
Operation *Operation
Name string
InputStream *EventStream
OutputStream *EventStream
RequireHTTP2 bool
// The eventstream generated code was generated with an older model that
// does not scale with bi-directional models. This drives the need to
// expose the output shape's event stream member as an exported member.
Legacy bool
}
func (es *EventStreamAPI) StreamInputEventTypeGetterName() string {
return "eventTypeFor" + es.Name + "InputEvent"
}
func (es *EventStreamAPI) StreamOutputUnmarshalerForEventName() string {
return "eventTypeFor" + es.Name + "OutputEvent"
}
// EventStream represents a single eventstream group (input/output) and the
// modeled events that are known for the stream.
type EventStream struct {
Name string
Shape *Shape
Events []*Event
Exceptions []*Event
}
func (es *EventStream) EventGroupName() string {
return es.Name + "Event"
}
func (es *EventStream) StreamWriterAPIName() string {
return es.Name + "Writer"
}
func (es *EventStream) StreamWriterImplName() string {
return "write" + es.Name
}
func (es *EventStream) StreamEventTypeGetterName() string {
return "eventTypeFor" + es.Name + "Event"
}
func (es *EventStream) StreamReaderAPIName() string {
return es.Name + "Reader"
}
func (es *EventStream) StreamReaderImplName() string {
return "read" + es.Name
}
func (es *EventStream) StreamReaderImplConstructorName() string {
return "newRead" + es.Name
}
func (es *EventStream) StreamUnmarshalerForEventName() string {
return "unmarshalerFor" + es.Name + "Event"
}
func (es *EventStream) StreamUnknownEventName() string {
return es.Name + "UnknownEvent"
}
// Event is a single EventStream event that can be sent or received in an
// EventStream.
type Event struct {
Name string
Shape *Shape
For *EventStream
Private bool
}
// ShapeDoc returns the docstring for the EventStream API.
func (esAPI *EventStreamAPI) ShapeDoc() string {
tmpl := template.Must(template.New("eventStreamShapeDoc").Parse(`
{{- $.Name }} provides handling of EventStreams for
the {{ $.Operation.ExportedName }} API.
{{- if $.OutputStream }}
Use this type to receive {{ $.OutputStream.Name }} events. The events
can be read from the stream.
The events that can be received are:
{{- range $_, $event := $.OutputStream.Events }}
* {{ $event.Shape.ShapeName }}
{{- end }}
{{- end }}
{{- if $.InputStream }}
Use this type to send {{ $.InputStream.Name }} events. The events
can be written to the stream.
The events that can be sent are:
{{ range $_, $event := $.InputStream.Events -}}
* {{ $event.Shape.ShapeName }}
{{- end }}
{{- end }}`))
var w bytes.Buffer
if err := tmpl.Execute(&w, esAPI); err != nil {
panic(fmt.Sprintf("failed to generate eventstream shape template for %v, %v",
esAPI.Operation.ExportedName, err))
}
return commentify(w.String())
}
func hasEventStream(topShape *Shape) bool {
for _, ref := range topShape.MemberRefs {
if ref.Shape.IsEventStream {
return true
}
}
return false
}
func eventStreamAPIShapeRefDoc(refName string) string {
return commentify(fmt.Sprintf("Use %s to use the API's stream.", refName))
}
func (a *API) setupEventStreams() error {
streams := EventStreams{}
for opName, op := range a.Operations {
inputRef := getEventStreamMember(op.InputRef.Shape)
outputRef := getEventStreamMember(op.OutputRef.Shape)
if inputRef == nil && outputRef == nil {
continue
}
if inputRef != nil && outputRef == nil {
return fmt.Errorf("event stream input only stream not supported for protocol %s, %s, %v",
a.NiceName(), opName, a.Metadata.Protocol)
}
switch a.Metadata.Protocol {
case `rest-json`, `rest-xml`, `json`:
default:
return UnsupportedAPIModelError{
Err: fmt.Errorf("EventStream not supported for protocol %s, %s, %v",
a.NiceName(), opName, a.Metadata.Protocol),
}
}
var inputStream *EventStream
if inputRef != nil {
inputStream = streams.GetStream(op.InputRef.Shape, inputRef.Shape)
inputStream.Shape.IsInputEventStream = true
}
var outputStream *EventStream
if outputRef != nil {
outputStream = streams.GetStream(op.OutputRef.Shape, outputRef.Shape)
outputStream.Shape.IsOutputEventStream = true
}
requireHTTP2 := op.API.Metadata.ProtocolSettings.HTTP2 == "eventstream" &&
inputStream != nil && outputStream != nil
a.HasEventStream = true
op.EventStreamAPI = &EventStreamAPI{
API: a,
Operation: op,
Name: op.ExportedName + "EventStream",
InputStream: inputStream,
OutputStream: outputStream,
Legacy: isLegacyEventStream(op),
RequireHTTP2: requireHTTP2,
}
op.OutputRef.Shape.OutputEventStreamAPI = op.EventStreamAPI
if s, ok := a.Shapes[op.EventStreamAPI.Name]; ok {
newName := op.EventStreamAPI.Name + "Data"
if _, ok := a.Shapes[newName]; ok {
panic(fmt.Sprintf(
"%s: attempting to rename %s to %s, but shape with that name already exists",
a.NiceName(), op.EventStreamAPI.Name, newName))
}
s.Rename(newName)
}
}
return nil
}
// EventStreams is a map of streams for the API shared across all operations.
// Ensurs that no stream is duplicated.
type EventStreams map[*Shape]*EventStream
// GetStream returns an EventStream for the operations top level shape, and
// member reference to the stream shape.
func (es *EventStreams) GetStream(topShape *Shape, streamShape *Shape) *EventStream {
var stream *EventStream
if v, ok := (*es)[streamShape]; ok {
stream = v
} else {
stream = setupEventStream(streamShape)
(*es)[streamShape] = stream
}
if topShape.API.Metadata.Protocol == "json" {
if topShape.EventFor == nil {
topShape.EventFor = map[string]*EventStream{}
}
topShape.EventFor[stream.Name] = stream
}
return stream
}
var legacyEventStream = map[string]map[string]struct{}{
"s3": {
"SelectObjectContent": struct{}{},
},
"kinesis": {
"SubscribeToShard": struct{}{},
},
}
func isLegacyEventStream(op *Operation) bool {
if s, ok := legacyEventStream[op.API.PackageName()]; ok {
if _, ok = s[op.ExportedName]; ok {
return true
}
}
return false
}
func (e EventStreamAPI) OutputMemberName() string {
if e.Legacy {
return "EventStream"
}
return "eventStream"
}
func getEventStreamMember(topShape *Shape) *ShapeRef {
for _, ref := range topShape.MemberRefs {
if !ref.Shape.IsEventStream {
continue
}
return ref
}
return nil
}
func setupEventStream(s *Shape) *EventStream {
eventStream := &EventStream{
Name: s.ShapeName,
Shape: s,
}
s.EventStream = eventStream
for _, eventRefName := range s.MemberNames() {
eventRef := s.MemberRefs[eventRefName]
if !(eventRef.Shape.IsEvent || eventRef.Shape.Exception) {
panic(fmt.Sprintf("unexpected non-event member reference %s.%s",
s.ShapeName, eventRefName))
}
updateEventPayloadRef(eventRef.Shape)
if eventRef.Shape.EventFor == nil {
eventRef.Shape.EventFor = map[string]*EventStream{}
}
eventRef.Shape.EventFor[eventStream.Name] = eventStream
eventName := eventRefName
if v := eventRef.LocationName; v != "" {
eventName = v
}
// Exceptions and events are two different lists to allow the SDK
// to easily generate code with the two handled differently.
event := &Event{
Name: eventName,
Shape: eventRef.Shape,
For: eventStream,
}
if eventRef.Shape.Exception {
eventStream.Exceptions = append(eventStream.Exceptions, event)
} else {
eventStream.Events = append(eventStream.Events, event)
}
}
return eventStream
}
func updateEventPayloadRef(parent *Shape) {
refName := parent.PayloadRefName()
if len(refName) == 0 {
return
}
payloadRef := parent.MemberRefs[refName]
if payloadRef.Shape.Type == "blob" {
return
}
if len(payloadRef.LocationName) != 0 {
return
}
payloadRef.LocationName = refName
}