Hi!
I have an issue with python threading when I run the code as Google Function. It looks like “wait()” for “threading.Event” doesn’t work. In addition, I observe a lack of synchronization between threads. In my little sample code, thread-safe python’s queue isn’t synchronized.
The bug isn’t reproducible locally on the same python version (3.11).
Sample code:
[details=Show More]
import functools
import logging
import queue
import threading
from typing import Optional
from flask import Flask, Request, request
logger: logging.Logger = logging.getLogger('thread_test')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] "
"%(filename)s:%(lineno)s %(funcName)s(): "
"'%(message)s'")
handler.setFormatter(formatter)
logger.addHandler(handler)
class Scheduler:
class ScheduleThread(threading.Thread):
def __init__(self, interval: float = 10.0):
super().__init__()
self.name += " (Scheduler)"
self._interval = interval
self.job_queue = queue.Queue()
self.event_job = threading.Event()
self.event_stop = threading.Event()
def run(self):
while not self.event_stop.is_set():
if self.event_job.wait(self._interval):
self.event_job.clear()
logger.debug(id(self.job_queue))
logger.debug(self.job_queue)
logger.debug(self.job_queue.__dict__)
while not self.job_queue.empty():
logger.debug("")
job = self.job_queue.get(False)
logger.debug(job)
if isinstance(job, functools.partial):
logger.debug("")
job()
logger.debug("")
def add_job(self, job_func, *args, **kwargs):
logger.debug("")
job = functools.partial(job_func, *args, **kwargs)
logger.debug(id(self._thread.job_queue))
logger.debug(self._thread.job_queue)
logger.debug(self._thread.job_queue.__dict__)
self._thread.job_queue.put(job)
logger.debug(self._thread.job_queue.__dict__)
self._thread.event_job.set()
logger.debug("")
def __init__(self):
self._thread = self.ScheduleThread()
self._thread.start()
def __del__(self):
self._thread.event_stop.set()
flask_app = Flask("Thread Test")
scheduler: Optional[Scheduler] = None
with flask_app.app_context():
scheduler = Scheduler()
def test_func():
logger.debug("Thread: %s", threading.current_thread().name)
# Cloud Functions
def app_test(req: Request):
"""HTTP Cloud Function.
Args:
req (flask.Request): The request object.
Log (Google Function):
[details=Show More]
```html
2023-07-02 17:43:29,951 [DEBUG] main.py:34 run(): '68299283007952'
2023-07-02 17:43:29,951 [DEBUG] main.py:35 run(): '
Log (local):
[details=Show More]
```html
2023-07-02 20:41:01,127 [DEBUG] main.py:34 run(): '4394904080'
2023-07-02 20:41:01,128 [DEBUG] main.py:35 run(): '