Django Celery Task: AttributeError: type object ‘Command’ has no attribute ‘__annotations__’

I’m trying to set up a celery task which basically runs a script to deactivate a bunch of facilities inside a json file. I used to run it manually and it ran fine but now i get the following error when Celery tries to run it as a task:

AttributeError: type object 'Command' has no attribute '__annotations__'

Can anyone tell me why it is not working?
This is the code i am using:

tasks.py

import requests
import json

from project.users.models import Facility
from django.core.management.base import BaseCommand
from config import celery_app

IMPORT_URL = 'https://example.com/file.json'

@celery_app.task()
class Command(BaseCommand):
    def delete_facility(self, data):
                    
                    data = data
                    

                    if Facility.objects.filter(UUID=data, ReportedStatusField='closed'):
                        
                        print("Facility status already set to closed")


                    else:
                        
                        facility, facility_closed = Facility.objects.update_or_create(UUID=data,
                            defaults={
                            'ReportedStatusField': 'closed'
                            }
                        )
                        print("Facility status set to closed")
                        

    def handle(self, *args, **options):

        headers = {'Content-Type': 'application/json'}
        response = requests.get(
            url=IMPORT_URL,
            headers=headers,
        )

        response.raise_for_status()
        data = response.json()


        for data, data_object in data.items():
            if data in ["deleted"]:
                for data in data_object:
                    self.delete_facility(data)

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

I think i figured it out. It’s properly running now. I’m calling the base command file from the tasks.py instead of adding the code directly into the tasks.py:

from celery import shared_task
from django.core.management import call_command

@shared_task
def start_import():
    call_command("import_from_url", )


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x