diff --git a/api-select.go b/api-select.go index a9b6f17ca..10e1d47d6 100644 --- a/api-select.go +++ b/api-select.go @@ -191,13 +191,20 @@ type StatsMessage struct { BytesReturned int64 } +// messageType represents the type of message. +type messageType string + +const ( + errorMsg messageType = "error" + commonMsg = "event" +) + // eventType represents the type of event. type eventType string // list of event-types returned by Select API. const ( endEvent eventType = "End" - errorEvent = "Error" recordsEvent = "Records" progressEvent = "Progress" statsEvent = "Stats" @@ -314,53 +321,58 @@ func (s *SelectResults) start(pipeWriter *io.PipeWriter) { // bytes can be read or parsed. payloadLen := prelude.PayloadLen() - // Get content-type of the payload. - c := contentType(headers.Get("content-type")) - - // Get event type of the payload. - e := eventType(headers.Get("event-type")) + m := messageType(headers.Get("message-type")) - // Handle all supported events. - switch e { - case endEvent: - pipeWriter.Close() - closeResponse(s.resp) - return - case errorEvent: + switch m { + case errorMsg: pipeWriter.CloseWithError(errors.New("Error Type of " + headers.Get("error-type") + " " + headers.Get("error-message"))) closeResponse(s.resp) return - case recordsEvent: - if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { - pipeWriter.CloseWithError(err) + case commonMsg: + // Get content-type of the payload. + c := contentType(headers.Get("content-type")) + + // Get event type of the payload. + e := eventType(headers.Get("event-type")) + + // Handle all supported events. + switch e { + case endEvent: + pipeWriter.Close() closeResponse(s.resp) return - } - case progressEvent: - switch c { - case xmlContent: - if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { + case recordsEvent: + if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { pipeWriter.CloseWithError(err) closeResponse(s.resp) return } - default: - pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) - closeResponse(s.resp) - return - } - case statsEvent: - switch c { - case xmlContent: - if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { - pipeWriter.CloseWithError(err) + case progressEvent: + switch c { + case xmlContent: + if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { + pipeWriter.CloseWithError(err) + closeResponse(s.resp) + return + } + default: + pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) + closeResponse(s.resp) + return + } + case statsEvent: + switch c { + case xmlContent: + if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { + pipeWriter.CloseWithError(err) + closeResponse(s.resp) + return + } + default: + pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) closeResponse(s.resp) return } - default: - pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) - closeResponse(s.resp) - return } }