Using `mysql-connector-python` 8.4.0 with Python 3.11 and Django 4.2
I have a model `Job` and when I try to create an instance, I get the following error which seems to suggest that the connector is not able to handle the Django enumeration type as described here...
https://docs.djangoproject.com/en/3.2/ref/models/fields/#enumeration-types
What am I missing?
Rob
```Error
Traceback (most recent call last):
File "/Users/robertcollins/github/mtm-crm-patch/venv11/lib/python3.11/site-packages/mysql/connector/cursor.py", line 564, in _process_params
res = [to_mysql(value) for value in res]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/robertcollins/github/mtm-crm-patch/venv11/lib/python3.11/site-packages/mysql/connector/cursor.py", line 564, in <listcomp>
res = [to_mysql(value) for value in res]
^^^^^^^^^^^^^^^
File "/Users/robertcollins/github/mtm-crm-patch/venv11/lib/python3.11/site-packages/mysql/connector/conversion.py", line 237, in to_mysql
raise TypeError(
TypeError: Python 'states' cannot be converted to a MySQL type
```
The Django Model definition is:-
```
class Job(models.Model):
class STATES(TextChoices):
NEW = "NEW"
READY = "READY"
PROCESSING = "PROCESSING"
STOPPING = "STOPPING"
FAILED = "FAILED"
COMPLETE = "COMPLETE"
id = UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
created = models.DateTimeField(auto_now_add=True, db_index=True)
modified = models.DateTimeField(auto_now=True)
name = models.CharField(max_length=100)
state = models.CharField(
max_length=20, choices=STATES.choices, default=STATES.NEW, db_index=True
)
next_task = models.CharField(max_length=100, blank=True)
workspace = JSONField(null=True)
queue_name = models.CharField(max_length=20, default="default", db_index=True)
priority = models.SmallIntegerField(default=0, db_index=True)
run_after = models.DateTimeField(null=True, db_index=True)
class Meta:
ordering = ["-priority", "created"]
objects = JobManager()
def save(self, *args, **kwargs):
if self._state.adding:
self.next_task = get_next_task_name(self.name)
self.workspace = self.workspace or {}
try:
self.run_creation_hook()
except Exception as exception: # noqa
logger.exception(
"Failed to create new job, creation hook raised an exception"
)
return # cancel the save
return super().save(*args, **kwargs)
def update_next_task(self):
self.next_task = get_next_task_name(self.name, self.next_task) or ""
def get_failure_hook_name(self):
return get_failure_hook_name(self.name)
def get_creation_hook_name(self):
return get_creation_hook_name(self.name)
def run_creation_hook(self):
creation_hook_name = self.get_creation_hook_name()
if creation_hook_name:
logger.info("Running creation hook %s for new job", creation_hook_name)
creation_hook_function = import_string(creation_hook_name)
creation_hook_function(self)
@staticmethod
def get_queue_depths():
annotation_dicts = (
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
.values("queue_name")
.order_by("queue_name")
.annotate(Count("queue_name"))
)
return {
annotation_dict["queue_name"]: annotation_dict["queue_name__count"]
for annotation_dict in annotation_dicts
}
```
The Job create is...
```
context = {
"postal_code": self.applicant_postal_code,
"application_id": self.id,
}
try:
Job.objects.create(
name="locate_application_form_job",
workspace=context,
queue_name="mapit_queue",
)
except Exception as e:
logger.error(
f"Cannot create background locate_application_form_job for application"
f":{self.id}\n{e}"
)
```