Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AQ not enqueued even if transaction is completed? #331

Open
aca opened this issue Apr 3, 2024 · 12 comments
Open

AQ not enqueued even if transaction is completed? #331

aca opened this issue Apr 3, 2024 · 12 comments

Comments

@aca
Copy link

aca commented Apr 3, 2024

Hi, I was having issue with #93 (comment), but besides connection pool issue, I also found wierd behavior on AQ with the same code.
I'm sorry but I'm not familiar with AQ. Please correct me if I'm doing something wrong...

Describe the bug

I tried to write code that just enqueue data and dequeue data one by one 10 times. But even though enqueue transaction is completed, dequeue fails sometimes.

I enqueued 10 data but dequeue success only 7 times.
And I rerun the same code it acts like it never enqueued. (If is enqueued correctly queue should have 3 message)

To Reproduce

Environment and code is basically same as #93. But I set standaloneConnection=true.

package main

import (
	"context"
	"database/sql"
	"log"

	"github.com/godror/godror"
)

var db *sql.DB

func main() {
	var err error
	db, err = sql.Open("godror", `user="SYSTEM" password="password" charset=KO16MSWIN949 connectString="localhost:1521/FREEPDB1" poolMaxSessions=3 standaloneConnection=true`)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10; i++ {
		err := enqueue("AQ_XXX_DATA")
		if err != nil {
			panic(err)
		}

		err = dequeue("AQ_XXX_DATA")
		if err != nil {
			panic(err)
		}
	}

}

func dequeue(dbQueue string) error {
	log.Println("dequeue")
	ctx := context.Background()
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}

	defer tx.Rollback()

	q, err := godror.NewQueue(ctx, tx, dbQueue, "QUEUE_MESSAGE_TYPE")
	if err != nil {
		return err
	}
	defer q.Close()

	msgs := make([]godror.Message, 1)
	n, err := q.Dequeue(msgs)
	if err != nil {
		return err
	}
	if n == 0 {
		log.Println("no message found")
		return nil
	}

	for _, m := range msgs[:n] {
		var data godror.Data
		if m.Object == nil {
			panic("nil")
		}
		if err := m.Object.GetAttribute(&data, "DATA"); err != nil {
			panic("object not found")
		}
		m.Object.Close()
		log.Println("dequeued: ", string(data.GetBytes()))
		err = m.Object.Close()
		if err != nil {
			return err
		}
	}

	return tx.Commit()
}

func enqueue(dbQueue string) error {
	log.Println("enqueue")
	ctx := context.Background()
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	q, err := godror.NewQueue(ctx, tx, dbQueue, "QUEUE_MESSAGE_TYPE", godror.WithEnqOptions(godror.EnqOptions{
		Visibility:   godror.VisibleOnCommit,
		DeliveryMode: godror.DeliverPersistent,
	}))
	if err != nil {
		return err
	}
	defer q.Close()

	payloadObject, err := q.PayloadObjectType.NewObject()
	if err != nil {
		panic(err)
	}

	err = payloadObject.Set("data", "hello from godror")
	if err != nil {
		panic(err)
	}

	messages := []godror.Message{{
		Object: payloadObject,
	}}

	err = q.Enqueue(messages)
	if err != nil {
		panic(err)
	}

	return tx.Commit()
}

Error output

2024/04/04 01:45:09 enqueue
2024/04/04 01:45:09 dequeue
2024/04/04 01:45:09 dequeued:  hello from godror
2024/04/04 01:45:09 enqueue
2024/04/04 01:45:09 dequeue
2024/04/04 01:45:09 no message found
2024/04/04 01:45:09 enqueue
2024/04/04 01:45:09 dequeue
2024/04/04 01:45:09 dequeued:  hello from godror
2024/04/04 01:45:09 enqueue
2024/04/04 01:45:09 dequeue
2024/04/04 01:45:09 dequeued:  hello from godror
2024/04/04 01:45:09 enqueue
2024/04/04 01:45:09 dequeue
2024/04/04 01:45:09 no message found

Expected behavior
It should not print no message found

Your oracle client version
21.10.0.0.0

Your godror version
v0.41.1-0.20240114194139-fdee3321b7db

Your go version
1.22

Your gcc version
12.3.0

Machine (please complete the following information):

  • OS: [e.g. win] Linux, NixOS
  • Architecture: [e.g. x86_64] x86_64
  • Version: [e.g. 10] 23.11
@sudarshan12s
Copy link
Collaborator

sudarshan12s commented Apr 5, 2024

Can you increase the Wait (in seconds) Deq options

q, err := godror.NewQueue(ctx, tx, dbQueue, "QUEUE_MESSAGE_TYPE",
                godror.WithDeqOptions(godror.DeqOptions{
                        Mode:       godror.DeqBrowse,
                        Visibility: godror.VisibleOnCommit,
                        Navigation: godror.NavFirst,
                        Wait:       1,
                })) 

@aca
Copy link
Author

aca commented Apr 8, 2024

I get same result with your options, I'm using latest version of this library.

If I try

  • enqueue 10 messages and exit the process
  • start new process try dequeue 10 messages
    there's no message to dequeue.

I think there's issue with enqueue..

@aca
Copy link
Author

aca commented Apr 8, 2024

enqueueQry string = `
DECLARE
	EnqOpt dbms_aq.enqueue_options_t;
	MsgProperty dbms_aq.message_properties_t;
	MsgHandle RAW(16);
	EnqMsg queue_message_type;
	Recipients dbms_aq.aq$_recipient_list_t;
  BEGIN
	EnqOpt.visibility := DBMS_AQ.ON_COMMIT;
	MsgProperty.correlation := :1;
	EnqMsg := queue_message_type(0, :2);
     
	DBMS_AQ.ENQUEUE(
        queue_name => :3,
        enqueue_options => EnqOpt,
        message_properties => MsgProperty,
        payload => EnqMsg,
        msgid => MsgHandle
    );
  END;
`

result, err := q.db.ExecContext(ctx, enqueueQry,
	arg.RCPName,
	string(arg.Data),
	arg.QueueName,
)
if err != nil {
	return throw.Wrap(err)
}

Suprisingly, if I write this kind of go-native sql queries, data is enqueued as expected and dequeue on other process.

@tgulacsi
Copy link
Contributor

tgulacsi commented Apr 8, 2024

That Wait is a time.Duration, so 1 is very small - use 1*time.Second if you want a second.

@aca
Copy link
Author

aca commented Apr 8, 2024

I tried with 1 second now it returns 10/10, it still fails sometimes (9/10).
with 0.5 second, still misses 3 messages..

I think the problem is "enqueue", as I've stated above, there's 0 message enqueued if I just try enqueue 10 message and exit.

Am I missing something?

@sudarshan12s
Copy link
Collaborator

I tried with 1 second now it returns 10/10, it still fails sometimes (9/10). with 0.5 second, still misses 3 messages..

I think the problem is "enqueue", as I've stated above, there's 0 message enqueued if I just try enqueue 10 message and exit.

Am I missing something?

For the failed case, Can you share the logs printed on console after setting the env DPI_DEBUG_LEVEL=92 . We will check on the enqueue part..

@aca
Copy link
Author

aca commented Apr 9, 2024

Here's a reproducible environment with oracle container and godror driver in nix (non-nix requires env settings like ORACLE_HOME)

Here's the code and debug log.

https://github.com/aca/godror-issue-331/blob/c6fb69d4ee8cfbf167947ac334e34683dcd1fa26/main.go#L22
https://github.com/aca/godror-issue-331/blob/main/debug.log

@tgulacsi
Copy link
Contributor

tgulacsi commented Apr 9, 2024

You should create a queue and use it, don't recreate it for each message!
Also, take care of all objects' lifetime - Close everything ASAP! (In Enqueue, you leave that NewObject lingering).

This AQ code does work in production for years now.
I even have an RPC dispatcher on it: https://github.com/UNO-SOFT/aqdispatch/blob/master/aqdispatch.go
it receives messages, calls the given webservice, then puts the answer into the queue.

@aca
Copy link
Author

aca commented Apr 10, 2024

Thanks! that fix issue on connection leak. It works fine. And thanks for your code.

But this issue remains..
I saw your code, and when it enqueue it set visibility to eOpts.Visibility = godror.VisibleImmediate.
So there seems no need to create new queue object.

I wish I can re-use the queue object.
But in my case I'm trying to wrap AQ queries (visibility set to oncommit) in transaction with other normal queries, how can I re-use the
queue object? I passed go db transaction object when I create queue, and there seems no way to change the
underlying that transaction object after queue created.

q, err := godror.NewQueue(ctx, tx /*this one*/, dbQueue, "QUEUE_MESSAGE_TYPE")

And.. even using single queue doesn't make enqueue work.

  1. Enqueue 1 data, exit
  2. Start new process and try dequeue 1 data from other process.
    And there's no data to dequeue.

Question here..

  1. Is it okay to pass 'go db transaction object' as a exer in godror.NewQueue? Am I doing something wrong?
  2. Is there a way to re-use queue with different execer(transaction) ?
  3. Is it okay to use this? AQ not enqueued even if transaction is completed? #331 (comment)
  4. Creating new queue for every enqueue might not be good, but shouldn't it return error at least?

@tgulacsi
Copy link
Contributor

It's not an error to create a new Queue object every time, just a performance penalty.

  1. Yes, that can be used.
    I'll try to look into this issue.

@tgulacsi
Copy link
Contributor

TestQueue (testQueue) in queue_test.go test right this scenario: enqueueing than dequeueing, in separate transactions, with freshly created Queue objects.
So, can you try go test -run=Queue -v in your env?
If that does work, then can you add another subtest to TestQueue that represents your case?

@aca
Copy link
Author

aca commented Apr 15, 2024

https://pastebin.com/j34JJ2YX

Test results OK, but not sure what this means..
I've only set GODROR_TEST_DSN
Please tell me if there's other setup I need to do for proper testing.

    queue_test.go:283: "BEGIN SYS.DBMS_AQADM.stop_queue(SYSTEM.TEST_QOBJ); END;": ORA-06550: line 1, column 33:
        PLS-00201: identifier 'SYSTEM.TEST_QOBJ' must be declared
        ORA-06550: line 1, column 7:
        PL/SQL: Statement ignored
    queue_test.go:283: "BEGIN SYS.DBMS_AQADM.drop_queue(SYSTEM.TEST_QOBJ); END;": ORA-06550: line 1, column 33:
        PLS-00201: identifier 'SYSTEM.TEST_QOBJ' must be declared
        ORA-06550: line 1, column 7:
        PL/SQL: Statement ignored
    queue_test.go:283: "BEGIN SYS.DBMS_AQADM.drop_queue_table(SYSTEM.TEST_QOBJ_TBL, TRUE); END;": ORA-06550: line 1, column 39:
        PLS-00201: identifier 'SYSTEM.TEST_QOBJ_TBL' must be declared
        ORA-06550: line 1, column 7:
        PL/SQL: Statement ignored
    queue_test.go:283: "DROP TABLE SYSTEM.TEST_QOBJ_TBL": ORA-00942: table or view does not exist

Okay I'll try to add some subtest when I can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants