Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When a process worker goes down, mark running tasks/flows as "Crashed" #16746

Open
leesavoie-voltaiq opened this issue Jan 16, 2025 · 0 comments
Labels
enhancement An improvement of an existing feature

Comments

@leesavoie-voltaiq
Copy link

Describe the current behavior

When a process worker is shut down, any flows it was running at the time stay in the "Running" state indefinitely. If the work pool has a concurrency limit, that limit is effectively reduced by the number of hanging flows.

To reproduce:

  1. Install docker and docker compose
  2. Copy the files below into the same directory
  3. Run docker compose up. If this fails, it is probably because the database didn't have enough time to initialize. Simply wait a few seconds and run docker compose up again.
  4. Wait for a flow run to be triggered and for the worker to start it.
  5. While the flow is still running, hit Ctrl-C to stop the containers.
  6. Run docker compose up again and wait for everything to initialize.
  7. View the Prefect server UI at http://localhost:4200. You will see a "Running" task that never ends. Over time, new scheduled flows will be added but they will never run because the work pool has a concurrency limit of 1.

Files used (put these in the same directory):

docker-compose.yml:

version: '3.7'
services:
    prefect_test_db:
        image: postgres:17.0-alpine
        volumes:
            - prefect_test_db_data:/var/lib/postgresql/data
        environment:
            - POSTGRES_NAME=postgres
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - POSTGRES_DB=prefect

    prefect_test_server:
        image: prefecthq/prefect:3-python3.9
        environment:
            - PREFECT_HOME=/data
            - PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://postgres:postgres@prefect_test_db/prefect
        command: prefect server start --host 0.0.0.0
        volumes:
            - prefect_test_data:/data
        ports:
            - 4200:4200
        depends_on:
            - prefect_test_db

    prefect_test_worker:
        image: prefecthq/prefect:3-python3.9
        environment:
            - PREFECT_API_URL=http://prefect_test_server:4200/api
            - PREFECT_LOGGING_LOG_PRINTS=true
        command: >
            sh -c "sleep 5 &&
                   prefect work-pool create test_pool --type process
                   prefect work-pool set-concurrency-limit test_pool 1 &&
                   prefect deploy --prefect-file /data/prefect.yaml &&
                   prefect worker start --pool test_pool --type process"
        volumes:
            - .:/data
        depends_on:
            - prefect_test_server

volumes:
    prefect_test_db_data:
    prefect_test_data:

prefect.yaml:

# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: prefect_deployments
prefect-version: 3.1.12

# build section allows you to manage and build docker images
build: null

# push section allows you to manage if and how this project is uploaded to remote locations
push: null

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /data

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: "test_deployment"
  version: "1"
  tags: []
  description: "Run the test flow"
  schedule:
    interval: 20
  flow_name: "test_flow"
  entrypoint: /data/flows.py:test_flow
  parameters: {}
  work_pool:
    name: "test_pool"
    work_queue_name: "test_pool"
    job_variables: {}

flows.py:

from prefect import flow
from time import sleep


@flow
def test_flow():
    print("Running flow")
    sleep(15)

Describe the proposed behavior

When a process worker crashes or is shut down gracefully, mark any flows it was running as "Crashed".

Example Use

This is a common problem during development when using docker compose to bring up/shut down a development stack with long running processes. Anything that is still running at the time docker compose down is executed will hang indefinitely.

Additional context

Some context from @cicdw on Slack:

Our current recommendation for these situations is to setup a zombie-killer automation as described here: https://docs.prefect.io/v3/automate/events/automations-triggers#detect-and-respond-to-zombie-flows
It was actually an intentional design decision to not couple submitted work to worker health as a form of fault tolerance, but honestly this decision doesn't make much sense for the Process Worker specifically because the work is in fact coupled; I think we can look to add an attempt to cancel subprocesses when that particular worker shuts down gracefully.

@leesavoie-voltaiq leesavoie-voltaiq added the enhancement An improvement of an existing feature label Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

1 participant