scheduler¶
-
class
pytq.scheduler.
BaseDBTableBackedScheduler
(logger=None)¶ Scheduler that use database table as backend storage.
- Task.id as primary_key
- Other column / field for input_data, output_data storage.
-
class
pytq.scheduler.
BaseScheduler
(logger=None)¶ All Scheduler has to inherit from this base class.
Step1. Workflow:
Generate task queue, it is a list of input_data.
Step2. Pre-process input data:
- remove duplicate.
- generate
Task
.
_default_batch_pre_process()
method will be called by default.Step3. For each task,
_process()
method will be called, includes:- pre_process
- user_process, input_data -> output_data
- post_process
-
clear_all
()¶ Clear all data.
中文文档
重置Filter至初始状态。
-
do
(input_data_queue, pre_process=None, multiprocess=False, processes=None, ignore_error=True)¶ Process all input_data.
Parameters: - input_data_queue – list of input data (or generator).
- pre_process – a callable function take input_data_queue, and
pre-process it, returns a task_queue (iterable object, item are
Task
. - multiprocess – trigger to use multiprocess.
中文文档
处理数据序列中的所有数据。
- 预处理所有数据,将其打包成
task_queue
。 - 进行单线程处理或是多线程处理。
-
get
(id)¶ Get output data by fingerprint of input_data.
中文文档
根据输入的指纹, 直接获得已经完成的输出数据。
-
get_output
(input_data)¶ Get output data of the input_data.
中文文档
根据输入的数据, 直接获得已经完成的输出的数据。
-
user_batch_pre_process
(input_data_queue)¶ A method will be called to pre process task queue before doing any real per task process. Usually it can be duplicate filter, statistic check.
Parameters: input_data_queue – Returns: task_queue, iterable object, item in it has to be Task
. Recommend to implementtask.nth_counter
andtask.left_counter
variable.
-
user_hash_input
(input_data)¶ (Optional) Get identical fingerprint for input data.
Returns: fingerprint for input_data
Return type: string or integer. depends on use case. 中文文档
(可选自定义) 用户自己定义的取
input_data
指纹的操作。如果不定义,则使用BaseScheduler._default_hash_input()
方法。
-
user_is_duplicate
(task)¶ (Optional) Check if a task is duplicate.
Returns: return True, when it’s a duplicate item. Return type: boolean. Warning
If you customized this method, usually you also need to implement
user_batch_pre_process()
method.Because default batch pre-process includes duplicate filter.
中文文档
(可选自定义) 用户自定义的任务排重检测。
-
user_post_process
(task)¶ (Optional) Defines the action that after the
post_process()
are not defined.Warning
When you customized this method, usually you also need to update
get()
method. Because post process usually is used to write output_data to data persistence layer. If you changed the way you store it, you have to change the way you read it.Parameters: task – pytq.task.Task
instance.
-
user_pre_process
(task)¶ (Optional) Defines the action that before the
BaseScheduler.user_process() been called. Will be called when :attr:`pytq.task.Task.pre_process()
are not defined.Parameters: task – pytq.task.Task
instance.
-
user_process
(input_data)¶ (Required) Defines the logic that process the input_data, returns output_data
Parameters: input_data – Returns: the output_data
-
class
pytq.scheduler.
StatusFlag
¶ MongoDB collection backed scheduler.
Feature:
- there’s pre-defined integer -
duplicate_flag
, will be stored in status
column / field.
- there’s pre-defined integer -
- there’s a
edit_at
datetime field, represent the - last time the document been edited.
- there’s a
Note
Any value greater or equal than
duplicate_flag
, AND theedit_at
time is smallerupdate_interval
seconds ago, means it is a duplicate item.Parameters: - duplicate_flag – int, represent a status code for finished / duplicate
- update_interval – int, represent need-to-update interval (unit: seconds)
- status_key – str.
- edit_key – str.
-
duplicate_flag
¶ A integer value represent its a duplicate item. Any value greater or equal than this will be a duplicate item, otherwise its not.
You could define that when you initiate the scheduler.
-
pre_process_duplicate_flag_and_update_interval
(duplicate_flag, update_interval)¶ bind settings.
-
update_interval
¶ If a item has been finished more than
update_interval
seconds, then it should be re-do, and it is NOT a duplicate item.You could define that when you initiate the scheduler.