Apify

Apify and Crawlee Official Forum

b
F
A
J
A

How to do multi-task crawling with crawlee? (I have searched many days and can't get the answer)

This post has been deleted.
h
A
5 comments
Here is my APP's code.

index.ts
Plain Text
import { ipcMain } from 'electron'
import { CrawlerTask, TaskStatus } from '../../types'
import { createCrawler } from './crawlerFactory'
import { PlaywrightCrawler } from 'crawlee'
import { runTaskCrawler } from './crawlerRunner'

const taskCrawler: Record<
  number,
  {
    taskInfo: CrawlerTask
    crawlPagesTotal: number
    crawler: PlaywrightCrawler
  }
> = {}

export function getTaskCrawler(taskId: number) {
  return taskCrawler[taskId]
}

export function registeIpc() {
  ipcMain.on('startCrawler', async (_event, task: CrawlerTask) => {
    console.log(`\r\n start crawling,task name is 【${task.taskName}】`)
    const crawler = createCrawler(task)
    taskCrawler[task.taskId] = {
      crawler,
      taskInfo: task,
      crawlPagesTotal: 0
    }
    task.status = TaskStatus.RUNING
    await runTaskCrawler(crawler, task)
    task.status = TaskStatus.COMPLETED
    console.log(`task【${task.taskName}】done`)
  })

  ipcMain.on('pauseCrawler', (_event, taskId: number) => {
    taskCrawler[taskId]?.crawler.autoscaledPool?.pause()
    taskCrawler[taskId].taskInfo.status = TaskStatus.PAUSED
    console.log(`task[${taskId}] crawler paused`)
  })

  ipcMain.on('resumeCrawler', (_event, taskId: number) => {
    taskCrawler[taskId]?.crawler.autoscaledPool?.resume()
    taskCrawler[taskId].taskInfo.status = TaskStatus.RUNING
    console.log(`task[${taskId}] crawler resumed`)
  })
}
routerFactory.ts

Plain Text
import { createPlaywrightRouter } from 'crawlee'
import { CrawlerTask, SkipOperator } from '../../types'
import { getTaskCrawler } from '.'

export function routerFactory() {
  const router = createPlaywrightRouter()
  router.addDefaultHandler(async ctx => {
    const userData = ctx.request.userData
    console.log(userData)
    const task = userData.task as CrawlerTask
    const depth = userData.depth + 1
    const limitCrawlDepth = task.limitCrawlDepth
    const crawlPagesTotal = ++getTaskCrawler(task.taskId).crawlPagesTotal
    const limitCrawlPagesTotal = task.limitCrawlPagesTotal as number

    await collectData(ctx)
    await ctx.enqueueLinks({
      strategy: 'all',
      userData: {
        task,
        depth
      },
      transformRequestFunction(req) {
        const url = req.url
        const skipOperator = task.skipOperator

        if (task.skipType === 1) {
          const skipKeywords = task.skipKeywords
            .split('|')
            .map(k => k.trim())
            .filter(Boolean)
          if (skipOperator === SkipOperator.INCLUDE) {
            if (skipKeywords.some(k => url.includes(k))) return false
          } else {
            if (skipKeywords.every(k => !url.includes(k))) return false
          }
        } else if (task.skipType === 2) {
          const skipRegex = new RegExp(task.skipUrlRegex)
        }
        return req
      }
    })
  })
  return router
}

async function collectData({ request, page, log }) {
  log.info('current URL:' + request.url)
  const title = await page.title()
  let links = await page.$$eval('a', anchors => anchors.map(anchor => anchor.href))
  links = Array.from(new Set(links.filter(Boolean).map(l => new URL(l).hostname)))
  if (!title || links.length === 0) return

  log.info('current page crawl success', {
    url: request.url,
    did: request.userData.did,
    title,
    links
  })
}
@hunterleung. just advanced to level 3! Thanks for your contributions! 🎉
crawlerRunner.ts

Plain Text
import { PlaywrightCrawler } from 'crawlee'
import { CrawlerTask, CrawlerType } from '../../types'

export async function runTaskCrawler(crawler: PlaywrightCrawler, task: CrawlerTask) {
  switch (task.taskType) {
    case CrawlerType.WEBSITE:
      return await runWebsiteTaskCrawler(crawler, task)
    default:
      throw new Error('Invalid crawler type')
  }
}

async function runWebsiteTaskCrawler(crawler: PlaywrightCrawler, task: CrawlerTask) {
  console.log(task.sourceUrl)
  await crawler.run([
    {
      url: task.sourceUrl,
      userData: {
        task,
        depth: 0
      }
    }
  ])
}
async function runSerpsTaskCrawler(crawler: PlaywrightCrawler, task: CrawlerTask) {
  console.log(crawler, task)
}
async function runLinksTaskCrawler(crawler: PlaywrightCrawler, task: CrawlerTask) {
  console.log(crawler, task)
}
crawlerFactory.ts

Plain Text
import { routerFactory } from './routerFactory'
import { CrawlerTask, CrawlerType } from '../../types'
import { Configuration, PlaywrightCrawler, ProxyConfiguration } from 'crawlee'

export function createCrawler(task: CrawlerTask) {
  switch (task.taskType) {
    case CrawlerType.WEBSITE:
      return createWebsiteCrawler(task)

    default:
      throw new Error('Invalid crawler type')
  }
}

function createWebsiteCrawler(task: CrawlerTask) {
  let proxyConfiguration

  if (task.proxyType === 1 && task.proxyRule) {
    proxyConfiguration = new ProxyConfiguration({
      proxyUrls: [task.proxyRule]
    })
  }
  const crawler = new PlaywrightCrawler({
    headless: true,
    maxRequestRetries: 2,
    sessionPoolOptions: {
      maxPoolSize: 1000,
      blockedStatusCodes: [429]
    },
    proxyConfiguration,
    requestHandler: routerFactory(),
    maxConcurrency: task.maxWorkerThreads
  })
  return crawler
}
Add a reply
Sign up and join the conversation on Discord
Join