Skip to content

Commit

Permalink
Merge pull request #56 from jbreich/master
Browse files Browse the repository at this point in the history
Implement deleteStream command on client connection
  • Loading branch information
yutopp committed Jul 1, 2023
2 parents db41187 + 1d3bce5 commit 9a7a975
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
26 changes: 26 additions & 0 deletions client_conn.go
Expand Up @@ -30,11 +30,13 @@ func newClientConnWithSetup(c net.Conn, config *ConnConfig) (*ClientConn, error)
if err := handshake.HandshakeWithServer(conn.rwc, conn.rwc, &handshake.Config{
SkipHandshakeVerification: conn.config.SkipHandshakeVerification,
}); err != nil {
_ = conn.Close()
return nil, errors.Wrap(err, "Failed to handshake")
}

ctrlStream, err := conn.streams.Create(ControlStreamID)
if err != nil {
_ = conn.Close()
return nil, errors.Wrap(err, "Failed to create control stream")
}
ctrlStream.handler.ChangeState(streamStateClientNotConnected)
Expand Down Expand Up @@ -106,6 +108,30 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun
return newStream, nil
}

func (cc *ClientConn) DeleteStream(body *message.NetStreamDeleteStream) error {
if err := cc.controllable(); err != nil {
return err
}

ctrlStream, err := cc.conn.streams.At(ControlStreamID)
if err != nil {
return err
}

// Check if stream id exists
_, err = cc.conn.streams.At(body.StreamID)
if err != nil {
return err
}

err = ctrlStream.DeleteStream(body)
if err != nil {
return err
}

return cc.conn.streams.Delete(body.StreamID)
}

func (cc *ClientConn) startHandleMessageLoop() {
if err := cc.conn.handleMessageLoop(); err != nil {
cc.setLastError(err)
Expand Down
5 changes: 4 additions & 1 deletion message/net_stream.go
Expand Up @@ -107,7 +107,10 @@ func (t *NetStreamDeleteStream) FromArgs(args ...interface{}) error {
}

func (t *NetStreamDeleteStream) ToArgs(ty EncodingType) ([]interface{}, error) {
panic("Not implemented")
return []interface{}{
nil, // no command object
t.StreamID,
}, nil
}

type NetStreamFCPublish struct {
Expand Down
16 changes: 16 additions & 0 deletions stream.go
Expand Up @@ -45,6 +45,10 @@ func newStream(streamID uint32, conn *Conn) *Stream {
return s
}

func (s *Stream) StreamID() uint32 {
return s.streamID
}

func (s *Stream) WriteWinAckSize(chunkStreamID int, timestamp uint32, msg *message.WinAckSize) error {
return s.Write(chunkStreamID, timestamp, msg)
}
Expand Down Expand Up @@ -189,6 +193,18 @@ func (s *Stream) CreateStream(body *message.NetConnectionCreateStream, chunkSize
//return nil, errors.New("Failed to get result")
}

func (s *Stream) DeleteStream(body *message.NetStreamDeleteStream) error {
chunkStreamID := 3 // TODO: fix

return s.writeCommandMessage(
chunkStreamID,
0,
"deleteStream",
0,
body,
)
}

func (s *Stream) ReplyCreateStream(
chunkStreamID int,
timestamp uint32,
Expand Down

0 comments on commit 9a7a975

Please sign in to comment.