scheduler

class pytq.scheduler.BaseDBTableBackedScheduler(logger=None)

Scheduler that use database table as backend storage.

  • Task.id as primary_key
  • Other column / field for input_data, output_data storage.
class pytq.scheduler.BaseScheduler(logger=None)

All Scheduler has to inherit from this base class.

Step1. Workflow:

Generate task queue, it is a list of input_data.

Step2. Pre-process input data:

  1. remove duplicate.
  2. generate Task.

_default_batch_pre_process() method will be called by default.

Step3. For each task, _process() method will be called, includes:

  1. pre_process
  2. user_process, input_data -> output_data
  3. post_process
clear_all()

Clear all data.

中文文档

重置Filter至初始状态。

do(input_data_queue, pre_process=None, multiprocess=False, processes=None, ignore_error=True)

Process all input_data.

Parameters:
  • input_data_queue – list of input data (or generator).
  • pre_process – a callable function take input_data_queue, and pre-process it, returns a task_queue (iterable object, item are Task.
  • multiprocess – trigger to use multiprocess.

中文文档

处理数据序列中的所有数据。

  1. 预处理所有数据,将其打包成 task_queue
  2. 进行单线程处理或是多线程处理。
get(id)

Get output data by fingerprint of input_data.

中文文档

根据输入的指纹, 直接获得已经完成的输出数据。

get_output(input_data)

Get output data of the input_data.

中文文档

根据输入的数据, 直接获得已经完成的输出的数据。

user_batch_pre_process(input_data_queue)

A method will be called to pre process task queue before doing any real per task process. Usually it can be duplicate filter, statistic check.

Parameters:input_data_queue
Returns:task_queue, iterable object, item in it has to be

Task. Recommend to implement task.nth_counter and task.left_counter variable.

user_hash_input(input_data)

(Optional) Get identical fingerprint for input data.

Returns:fingerprint for input_data
Return type:string or integer. depends on use case.

中文文档

(可选自定义) 用户自己定义的取 input_data 指纹的操作。如果不定义,则使用

BaseScheduler._default_hash_input() 方法。

user_is_duplicate(task)

(Optional) Check if a task is duplicate.

Returns:return True, when it’s a duplicate item.
Return type:boolean.

Warning

If you customized this method, usually you also need to implement user_batch_pre_process() method.

Because default batch pre-process includes duplicate filter.

中文文档

(可选自定义) 用户自定义的任务排重检测。

user_post_process(task)

(Optional) Defines the action that after the post_process() are not defined.

Warning

When you customized this method, usually you also need to update get() method. Because post process usually is used to write output_data to data persistence layer. If you changed the way you store it, you have to change the way you read it.

Parameters:taskpytq.task.Task instance.
user_pre_process(task)

(Optional) Defines the action that before the BaseScheduler.user_process() been called. Will be called when :attr:`pytq.task.Task.pre_process() are not defined.

Parameters:taskpytq.task.Task instance.
user_process(input_data)

(Required) Defines the logic that process the input_data, returns output_data

Parameters:input_data
Returns:the output_data
class pytq.scheduler.StatusFlag

MongoDB collection backed scheduler.

Feature:

  1. there’s pre-defined integer - duplicate_flag, will be stored in
    status column / field.
  2. there’s a edit_at datetime field, represent the
    last time the document been edited.

Note

Any value greater or equal than duplicate_flag, AND the edit_at time is smaller update_interval seconds ago, means it is a duplicate item.

Parameters:
  • duplicate_flag – int, represent a status code for finished / duplicate
  • update_interval – int, represent need-to-update interval (unit: seconds)
  • status_key – str.
  • edit_key – str.
duplicate_flag

A integer value represent its a duplicate item. Any value greater or equal than this will be a duplicate item, otherwise its not.

You could define that when you initiate the scheduler.

pre_process_duplicate_flag_and_update_interval(duplicate_flag, update_interval)

bind settings.

update_interval

If a item has been finished more than update_interval seconds, then it should be re-do, and it is NOT a duplicate item.

You could define that when you initiate the scheduler.