使用 lucene 改进多线程索引

2022-01-15 00:00:00 indexing multithreading lucene java

我正在尝试使用多个线程在 Lucene 中构建我的索引.因此,我开始编写代码并编写了以下代码.首先,我找到文件,并为每个文件创建一个线程来索引它.之后我加入线程并优化索引.它有效,但我不确定......我可以大规模信任它吗?有什么办法可以改善吗?

I am trying to build my indexes in Lucene with multiple threads. So, I started my coding and wrote the following code. First I find the files and for each file, I create a thread to index it. After that I join the threads and optimize the indexes. It works but I'm not sure... can I trust it in large scale? Is there any way to improve it?

import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;

public class mIndexer extends Thread {

    private File ifile;
    private static IndexWriter writer;

    public mIndexer(File f) {
    ifile = f.getAbsoluteFile();
    }

    public static void main(String args[]) throws Exception {
    System.out.println("here...");

    String indexDir;
        String dataDir;
    if (args.length != 2) {
        dataDir = new String("/home/omid/Ranking/docs/");
        indexDir = new String("/home/omid/Ranking/indexes/");
    }
    else {
        dataDir = args[0];
        indexDir = args[1];
    }

    long start = System.currentTimeMillis();

    Directory dir = FSDirectory.open(new File(indexDir));
    writer = new IndexWriter(dir,
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
    true,
    IndexWriter.MaxFieldLength.UNLIMITED);
    int numIndexed = 0;
    try {
        numIndexed = index(dataDir, new TextFilesFilter());
    } finally {
        long end = System.currentTimeMillis();
        System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
        writer.optimize();
        System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
        writer.close();
    }
    System.out.println("Enjoy your day/night");
    }

    public static int index(String dataDir, FileFilter filter) throws Exception {
    File[] dires = new File(dataDir).listFiles();
    for (File d: dires) {
        if (d.isDirectory()) {
        File[] files = new File(d.getAbsolutePath()).listFiles();
        for (File f: files) {
            if (!f.isDirectory() &&
            !f.isHidden() &&
            f.exists() &&
            f.canRead() &&
            (filter == null || filter.accept(f))) {
                Thread t = new mIndexer(f);
                t.start();
                t.join();
            }
        }
        }
    }
    return writer.numDocs();
    }

    private static class TextFilesFilter implements FileFilter {
    public boolean accept(File path) {
        return path.getName().toLowerCase().endsWith(".txt");
    }
    }

    protected Document getDocument() throws Exception {
    Document doc = new Document();
    if (ifile.exists()) {
        doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
        doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
        String cat = "WIR";
        cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
        cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
        //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
        //System.out.println(cat.subSequence(0, cat.length()));
    }
    return doc;
    }

    public void run() {
    try {
        System.out.println("Indexing " + ifile.getAbsolutePath());
        Document doc = getDocument();
        writer.addDocument(doc);
    } catch (Exception e) {
        System.out.println(e.toString());
    }

    }
}

任何 hep 都会被考虑.

Any hep is regarded.

推荐答案

如果你想并行化索引,你可以做两件事:

If you want to parallelize indexing, there are two things you can do:

  • 并行调用 addDocument,
  • 增加合并调度程序的最大线程数.

您在并行化对 addDocuments 的调用方面是正确的,但是随着您需要索引的文档数量的增加,每个文档生成一个线程不会扩展.您应该使用固定大小的 ThreadPoolExecutor.由于此任务主要是 CPU 密集型任务(取决于您的分析器和检索数据的方式),因此将计算机的 CPU 数量设置为最大线程数可能是一个好的开始.

You are on the right path to parallelize calls to addDocuments, but spawning one thread per document will not scale as the number of documents you need to index will grow. You should rather use a fixed-size ThreadPoolExecutor. Since this task is mainly CPU-intensive (depending on your analyzer and the way you retrieve your data), setting the number of CPUs of your computer as the maximum number of threads might be a good start.

关于合并调度程序,您可以增加可与 ConcurrentMergeScheduler 的setMaxThreadCount 方法.请注意,磁盘在顺序读/写方面比随机读/写要好得多,因此为合并调度程序设置过高的最大线程数更有可能减慢索引速度而不是加快速度.

Regarding the merge scheduler, you can increase the maximum number of threads which can be used with the setMaxThreadCount method of ConcurrentMergeScheduler. Beware that disks are much better at sequential reads/writes than random read/writes, as a consequence setting a too high maximum number of threads to your merge scheduler is more likely to slow indexing down than to speed it up.

但在尝试并行化索引过程之前,您可能应该尝试找出瓶颈所在.如果您的磁盘太慢,则瓶颈可能是刷新和合并步骤,因此并行调用 addDocument(主要包括分析文档并将分析结果缓冲在内存中)不会提高索引速度完全没有.

But before trying to parallelizing your indexing process, you should probably try to find where the bottleneck is. If your disk is too slow, the bottleneck is likely to be the flush and the merge steps, as a consequence parallelizing calls to addDocument (which essentially consists in analyzing a document and buffering the result of the analysis in memory) will not improve indexing speed at all.

一些旁注:

  • 在 Lucene 的开发版本中有一些正在进行的工作,以提高索引并行性(尤其是刷新部分,这个 博客文章解释了它是如何工作的).

  • There is some ongoing work in the development version of Lucene in order to improve indexing parallelism (the flushing part especially, this blog entry explains how it works).

Lucene 在 如何提高索引速度上有一个不错的 wiki 页面,其中您会找到其他方法来提高索引速度.

Lucene has a nice wiki page on how to improve indexing speed where you will find other ways to improve indexing speed.

相关文章