Skip to content

Commit

Permalink
refactor(transport): handle transport error, no more log.Fatal (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
canstand committed Dec 19, 2023
1 parent 3319f69 commit 968ab19
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 143 deletions.
4 changes: 2 additions & 2 deletions browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b *browserImpl) NewContext(options ...BrowserNewContextOptions) (BrowserCo
}
channel, err := b.channel.Send("newContext", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
context := fromChannel(channel).(*browserContextImpl)
context.browser = b
Expand Down Expand Up @@ -108,7 +108,7 @@ func (b *browserImpl) NewPage(options ...BrowserNewPageOptions) (Page, error) {
func (b *browserImpl) NewBrowserCDPSession() (CDPSession, error) {
channel, err := b.channel.Send("newBrowserCDPSession")
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}

cdpSession := fromChannel(channel).(*cdpSessionImpl)
Expand Down
9 changes: 6 additions & 3 deletions browser_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b *browserContextImpl) NewCDPSession(page interface{}) (CDPSession, error)

channel, err := b.channel.Send("newCDPSession", params)
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}

cdpSession := fromChannel(channel).(*cdpSessionImpl)
Expand All @@ -93,7 +93,7 @@ func (b *browserContextImpl) NewPage() (Page, error) {
}
channel, err := b.channel.Send("newPage")
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
return fromChannel(channel).(*pageImpl), nil
}
Expand All @@ -103,7 +103,7 @@ func (b *browserContextImpl) Cookies(urls ...string) ([]Cookie, error) {
"urls": urls,
})
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
cookies := make([]Cookie, len(result.([]interface{})))
for i, item := range result.([]interface{}) {
Expand Down Expand Up @@ -369,6 +369,9 @@ func (b *browserContextImpl) Close(options ...BrowserContextCloseOptions) error
_, err = b.channel.Send("close", map[string]interface{}{
"reason": b.closeReason,
})
if err != nil {
return err
}
<-b.closed
return err
}
Expand Down
29 changes: 12 additions & 17 deletions browser_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (b *browserTypeImpl) Launch(options ...BrowserTypeLaunchOptions) (Browser,
}
channel, err := b.channel.Send("launch", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
browser := fromChannel(channel).(*browserImpl)
b.didLaunchBrowser(browser)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (b *browserTypeImpl) LaunchPersistentContext(userDataDir string, options ..
}
channel, err := b.channel.Send("launchPersistentContext", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
context := fromChannel(channel).(*browserContextImpl)
b.didCreateContext(context, option, tracesDir)
Expand All @@ -97,9 +97,15 @@ func (b *browserTypeImpl) Connect(wsEndpoint string, options ...BrowserTypeConne
return nil, err
}
jsonPipe := fromChannel(pipe.(map[string]interface{})["pipe"]).(*jsonPipe)
connection := newConnection(jsonPipe.Close, localUtils)
connection.isRemote = true
var browser *browserImpl
connection := newConnection(jsonPipe, localUtils)

playwright, err := connection.Start()
if err != nil {
return nil, err
}
playwright.setSelectors(b.playwright.Selectors)
browser := fromChannel(playwright.initializer["preLaunchedBrowser"]).(*browserImpl)
browser.shouldCloseConnectionOnClose = true
pipeClosed := func() {
for _, context := range browser.Contexts() {
pages := context.Pages()
Expand All @@ -112,18 +118,7 @@ func (b *browserTypeImpl) Connect(wsEndpoint string, options ...BrowserTypeConne
connection.cleanup()
}
jsonPipe.On("closed", pipeClosed)
connection.onmessage = func(message map[string]interface{}) error {
if err := jsonPipe.Send(message); err != nil {
pipeClosed()
return err
}
return nil
}
jsonPipe.On("message", connection.Dispatch)
playwright := connection.Start()
playwright.setSelectors(b.playwright.Selectors)
browser = fromChannel(playwright.initializer["preLaunchedBrowser"]).(*browserImpl)
browser.shouldCloseConnectionOnClose = true

b.didLaunchBrowser(browser)
return browser, nil
}
Expand Down
71 changes: 42 additions & 29 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package playwright
import (
"errors"
"fmt"
"log"
"reflect"
"regexp"
"strconv"
Expand All @@ -26,6 +25,7 @@ type result struct {
}

type connection struct {
transport transport
apiZone sync.Map
objects map[string]*channelOwner
lastID int
Expand All @@ -34,30 +34,39 @@ type connection struct {
callbacks sync.Map
afterClose func()
onClose func() error
onmessage func(map[string]interface{}) error
isRemote bool
localUtils *localUtilsImpl
tracingCount atomic.Int32
abort chan struct{}
closedError atomic.Value
abortOnce sync.Once
closedError *safeValue[error]
}

func (c *connection) Start() *Playwright {
playwright := make(chan *Playwright, 1)
func (c *connection) Start() (*Playwright, error) {
go func() {
pw, err := c.rootObject.initialize()
if err != nil {
log.Fatal(err)
return
for {
msg, err := c.transport.Poll()
if err != nil {
_ = c.transport.Close()
c.cleanup(err)
return
}
c.Dispatch(msg)
}
playwright <- pw
}()
return <-playwright

c.onClose = func() error {
if err := c.transport.Close(); err != nil {
return err
}
return nil
}

return c.rootObject.initialize()
}

func (c *connection) Stop() error {
err := c.onClose()
if err != nil {
if err := c.onClose(); err != nil {
return err
}
c.cleanup()
Expand All @@ -66,22 +75,24 @@ func (c *connection) Stop() error {

func (c *connection) cleanup(cause ...error) {
if len(cause) > 0 {
c.closedError.Store(fmt.Errorf("%w: %w", ErrTargetClosed, cause[0]))
c.closedError.Set(fmt.Errorf("%w: %w", ErrTargetClosed, cause[0]))
} else {
c.closedError.Store(ErrTargetClosed)
c.closedError.Set(ErrTargetClosed)
}
if c.afterClose != nil {
c.afterClose()
}
select {
case <-c.abort:
default:
close(c.abort)
}
c.abortOnce.Do(func() {
select {
case <-c.abort:
default:
close(c.abort)
}
})
}

func (c *connection) Dispatch(msg *message) {
if c.closedError.Load() != nil {
if c.closedError.Get() != nil {
return
}
method := msg.Method
Expand Down Expand Up @@ -208,8 +219,8 @@ func (c *connection) replaceGuidsWithChannels(payload interface{}) interface{} {
}

func (c *connection) sendMessageToServer(object *channelOwner, method string, params interface{}, noReply bool) (*protocolCallback, error) {
if e := c.closedError.Load(); e != nil {
return nil, e.(error)
if err := c.closedError.Get(); err != nil {
return nil, err
}
if object.wasCollected {
return nil, errors.New("The object has been collected to prevent unbounded heap growth.")
Expand Down Expand Up @@ -243,7 +254,7 @@ func (c *connection) sendMessageToServer(object *channelOwner, method string, pa
c.LocalUtils().AddStackToTracingNoReply(id, stack)
}

if err := c.onmessage(message); err != nil {
if err := c.transport.Send(message); err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
}

Expand Down Expand Up @@ -317,15 +328,17 @@ func serializeCallLocation(caller stack.Call) map[string]interface{} {
}
}

func newConnection(onClose func() error, localUtils ...*localUtilsImpl) *connection {
func newConnection(transport transport, localUtils ...*localUtilsImpl) *connection {
connection := &connection{
abort: make(chan struct{}, 1),
objects: make(map[string]*channelOwner),
onClose: onClose,
isRemote: false,
abort: make(chan struct{}, 1),
objects: make(map[string]*channelOwner),
transport: transport,
isRemote: false,
closedError: &safeValue[error]{},
}
if len(localUtils) > 0 {
connection.localUtils = localUtils[0]
connection.isRemote = true
}
connection.rootObject = newRootChannelOwner(connection)
return connection
Expand Down
2 changes: 1 addition & 1 deletion element_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *elementHandleImpl) Screenshot(options ...ElementHandleScreenshotOptions
}
data, err := e.channel.Send("screenshot", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message :%w", err)
return nil, err
}
image, err := base64.StdEncoding.DecodeString(data.(string))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *apiRequestImpl) NewContext(options ...APIRequestNewContextOptions) (API

channel, err := r.channel.Send("newRequest", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message: %w", err)
return nil, err
}
return fromChannel(channel).(*apiRequestContextImpl), nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-stack/stack v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/h2non/filetype v1.1.3
github.com/mitchellh/go-ps v1.0.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.17.0
go.uber.org/multierr v1.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
17 changes: 17 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,20 @@ func prepareRecordHarOptions(option recordHarInputOptions) recordHarOptions {
}
return out
}

type safeValue[T any] struct {
sync.Mutex
v T
}

func (s *safeValue[T]) Set(v T) {
s.Lock()
defer s.Unlock()
s.v = v
}

func (s *safeValue[T]) Get() T {
s.Lock()
defer s.Unlock()
return s.v
}
40 changes: 31 additions & 9 deletions jsonPipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package playwright

import (
"encoding/json"
"log"
"errors"
"fmt"
)

type jsonPipe struct {
channelOwner
msgChan chan *message
}

func (j *jsonPipe) Send(message map[string]interface{}) error {
Expand All @@ -19,23 +21,43 @@ func (j *jsonPipe) Close() error {
_, err := j.channel.Send("close")
return err
}

func (j *jsonPipe) Poll() (*message, error) {
msg := <-j.msgChan
if msg == nil {
return nil, errors.New("jsonPipe closed")
}
return msg, nil
}

func newJsonPipe(parent *channelOwner, objectType string, guid string, initializer map[string]interface{}) *jsonPipe {
j := &jsonPipe{}
j := &jsonPipe{
msgChan: make(chan *message, 2),
}
j.createChannelOwner(j, parent, objectType, guid, initializer)
j.channel.On("message", func(ev map[string]interface{}) {
var msg message
m, err := json.Marshal(ev["message"])
if err != nil {
log.Fatal(err)
if err == nil {
err = json.Unmarshal(m, &msg)
}
var msg message
err = json.Unmarshal(m, &msg)
if err != nil {
log.Fatal(err)
msg = message{
Error: &struct {
Error Error "json:\"error\""
}{
Error: Error{
Name: "Error",
Message: fmt.Sprintf("jsonPipe: could not decode message: %s", err.Error()),
},
},
}
}
j.Emit("message", &msg)
j.msgChan <- &msg
})
j.channel.On("closed", func() {
j.channel.Once("closed", func() {
j.Emit("closed")
close(j.msgChan)
})
return j
}
4 changes: 2 additions & 2 deletions page.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (p *pageImpl) Screenshot(options ...PageScreenshotOptions) ([]byte, error)
}
data, err := p.channel.Send("screenshot", options, overrides)
if err != nil {
return nil, fmt.Errorf("could not send message :%w", err)
return nil, err
}
image, err := base64.StdEncoding.DecodeString(data.(string))
if err != nil {
Expand All @@ -363,7 +363,7 @@ func (p *pageImpl) PDF(options ...PagePdfOptions) ([]byte, error) {
}
data, err := p.channel.Send("pdf", options)
if err != nil {
return nil, fmt.Errorf("could not send message :%w", err)
return nil, err
}
pdf, err := base64.StdEncoding.DecodeString(data.(string))
if err != nil {
Expand Down

0 comments on commit 968ab19

Please sign in to comment.