diff --git a/client_conn.go b/client_conn.go index 7adb7e6..2bd631e 100644 --- a/client_conn.go +++ b/client_conn.go @@ -30,11 +30,13 @@ func newClientConnWithSetup(c net.Conn, config *ConnConfig) (*ClientConn, error) if err := handshake.HandshakeWithServer(conn.rwc, conn.rwc, &handshake.Config{ SkipHandshakeVerification: conn.config.SkipHandshakeVerification, }); err != nil { + _ = conn.Close() return nil, errors.Wrap(err, "Failed to handshake") } ctrlStream, err := conn.streams.Create(ControlStreamID) if err != nil { + _ = conn.Close() return nil, errors.Wrap(err, "Failed to create control stream") } ctrlStream.handler.ChangeState(streamStateClientNotConnected) @@ -106,6 +108,30 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun return newStream, nil } +func (cc *ClientConn) DeleteStream(body *message.NetStreamDeleteStream) error { + if err := cc.controllable(); err != nil { + return err + } + + ctrlStream, err := cc.conn.streams.At(ControlStreamID) + if err != nil { + return err + } + + // Check if stream id exists + _, err = cc.conn.streams.At(body.StreamID) + if err != nil { + return err + } + + err = ctrlStream.DeleteStream(body) + if err != nil { + return err + } + + return cc.conn.streams.Delete(body.StreamID) +} + func (cc *ClientConn) startHandleMessageLoop() { if err := cc.conn.handleMessageLoop(); err != nil { cc.setLastError(err) diff --git a/message/net_stream.go b/message/net_stream.go index ab7449f..5157e0f 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -107,7 +107,10 @@ func (t *NetStreamDeleteStream) FromArgs(args ...interface{}) error { } func (t *NetStreamDeleteStream) ToArgs(ty EncodingType) ([]interface{}, error) { - panic("Not implemented") + return []interface{}{ + nil, // no command object + t.StreamID, + }, nil } type NetStreamFCPublish struct { diff --git a/stream.go b/stream.go index 3ba061c..01f1ec4 100644 --- a/stream.go +++ b/stream.go @@ -45,6 +45,10 @@ func newStream(streamID uint32, conn *Conn) *Stream { return s } +func (s *Stream) StreamID() uint32 { + return s.streamID +} + func (s *Stream) WriteWinAckSize(chunkStreamID int, timestamp uint32, msg *message.WinAckSize) error { return s.Write(chunkStreamID, timestamp, msg) } @@ -189,6 +193,18 @@ func (s *Stream) CreateStream(body *message.NetConnectionCreateStream, chunkSize //return nil, errors.New("Failed to get result") } +func (s *Stream) DeleteStream(body *message.NetStreamDeleteStream) error { + chunkStreamID := 3 // TODO: fix + + return s.writeCommandMessage( + chunkStreamID, + 0, + "deleteStream", + 0, + body, + ) +} + func (s *Stream) ReplyCreateStream( chunkStreamID int, timestamp uint32,