Skip to content

Add QueueBundle.Remove() to stop and remove queues#1235

Merged
brandur merged 1 commit intoriverqueue:masterfrom
tigrato:master
May 5, 2026
Merged

Add QueueBundle.Remove() to stop and remove queues#1235
brandur merged 1 commit intoriverqueue:masterfrom
tigrato:master

Conversation

@tigrato
Copy link
Copy Markdown
Contributor

@tigrato tigrato commented Apr 29, 2026

Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal.

@tigrato
Copy link
Copy Markdown
Contributor Author

tigrato commented Apr 30, 2026

cc @brandur @magaldima

@brandur
Copy link
Copy Markdown
Contributor

brandur commented May 1, 2026

Cool! Thx. A few small asks on this one:

  • Can you add some additional validation similar to what's found in the Add function? For example, check that the client will run jobs:

    if !b.clientWillExecuteJobs {
            return errors.New("client is not configured to execute jobs, cannot add queue")
    }

    And also, return an error in case the queue doesn't exist.

  • Can you make it explicit in the dockblock that the function will block until all jobs in the given queue have been worked? And furthermore that this blocking may affect other operations like shutdown.

  • Can you drop the trivial comments found in the test suite? e.g.

    // Remove the queue
    err = client.Queues().Remove(queueName)
  • Just since it looks like the code is LLM-generated anyway, could you see if you can get it to write the tests without the sleeps?

Anything else you want to add @bgentry?

brandur added a commit that referenced this pull request May 1, 2026
While doing a little work on #1235, my LLM flagged that the way that
we're accessing `producersByQueueName` today is already a little unsafe
as it may be read in multiple places concurrently. There's a mutex to
protect it somewhat in the `QueueBundle`, but it may still race between
a change there a "notify producer" send.

Here, add one additional `RWMutex` that makes sure to synchronize read
access on the map.
brandur added a commit that referenced this pull request May 1, 2026
While doing a little work on #1235, my LLM flagged that the way that
we're accessing `producersByQueueName` today is already a little unsafe
as it may be read in multiple places concurrently. There's a mutex to
protect it somewhat in the `QueueBundle`, but it may still race between
a change there a "notify producer" send.

Here, add one additional `RWMutex` that makes sure to synchronize read
access on the map.
@brandur
Copy link
Copy Markdown
Contributor

brandur commented May 1, 2026

And actually, while looking at this discovered a minor concurrency bug around producer map access. I patched that in #1236 — can you rebase, and make sure to lock that mutex in the removeProducer function in Client? (Thanks!)

@tigrato
Copy link
Copy Markdown
Contributor Author

tigrato commented May 3, 2026

@brandur thank your for the review. changes pushed.
the tests were indeed LLM generated but the code lives in one of our forks since 2022. It's now time to push it upstream to reduce the maintaining cost

Adds dynamic queue removal at runtime. When removed, the queue's
producer is stopped and jobs can no longer be worked on that queue.
Queues can be re-added after removal.

Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com>
@brandur
Copy link
Copy Markdown
Contributor

brandur commented May 4, 2026

@tigrato Thanks! Going to pull this, but I'm going to make a few tweaks on top of it before release. I think the biggest one is that since QueuesBundle.Remove is a blocking operation with an uncertain deadline, it should take a context like Stop does already which allows it to be cancelled out of. Hopefully this won't be too big of a problem in terms of an update on your end!

@brandur
Copy link
Copy Markdown
Contributor

brandur commented May 4, 2026

@tigrato Actually, just before I do, would you mind signing our CLA over at https://github.com/riverqueue/rivercla?

@brandur brandur merged commit 2109122 into riverqueue:master May 5, 2026
12 checks passed
brandur added a commit that referenced this pull request May 5, 2026
This builds on #1235 to bring in a few tweaks:

* Removing a queue is a blocking operation because it needs to wait for
  the producer to finish up its jobs and shut down. It'd be better to
  provide a way for this not to block forever, so here we add a context
  parameter to `QueueBundle.Remove` similar to the one taken by
  `Client.Stop`. If the client becomes done before the producer
  resolves, `QueueBundle.Remove` falls through with the error.

* Add a "stress" test case for `QueueBundle.Remove`. It's meant to
  detect a deadlock or other concurrency bug in case there is one and
  gives us a little more confidence that what we have here is right.

* Renamed `addProducer` and `removeProducer` to `producerAdd` and
  `producerRemove` so they sort more nicely against each other.

* Add changelogentry.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants