Commit 3e3457ee authored by jcorvi's avatar jcorvi Committed by javi
Browse files

bugfix when threads number < than files to process

parents 8b0e9be6 20e710c2
......@@ -4,12 +4,21 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
......@@ -20,17 +29,11 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.ArrayUtils;
import org.apache.maven.shared.utils.io.FileUtils;
import gate.Corpus;
import gate.Document;
import gate.Factory;
import gate.FeatureMap;
import gate.Gate;
import gate.LanguageAnalyser;
import gate.ProcessingResource;
import gate.creole.Plugin;
import gate.creole.SerialAnalyserController;
import gate.util.ExtensionFileFilter;
import gate.util.GateException;
/**
......@@ -96,6 +99,10 @@ public class App {
workdir.setRequired(false);
options.addOption(workdir);
Option threads_option = new Option("t", "threads", true, "Threads to be used. The documents are splited for data parallelization");
threads_option.setRequired(false);
options.addOption(threads_option);
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;
......@@ -114,6 +121,7 @@ public class App {
String workdirPath = cmd.getOptionValue("workdir");
String listsDefinitionsPath = cmd.getOptionValue("listsURL");
String japeMainPath = cmd.getOptionValue("jape_main");
String threads_str = cmd.getOptionValue("threads");
if (!java.nio.file.Files.isDirectory(Paths.get(cmd.getOptionValue("input")))) {
System.out.println(" Please set the inputDirectoryPath ");
......@@ -228,6 +236,22 @@ public class App {
outputDirectory.mkdirs();
parameters.put("outputDirectory", outputFilePath);
Set<String> processedFiles = null;
try {
processedFiles = getFiles(outputFilePath);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Integer threads = 1;
if(threads_str!=null) {
try {
threads = Integer.parseInt(threads_str.trim());
} catch (NumberFormatException nfe) {
System.out.println("NumberFormatException: " + nfe.getMessage());
}
}
try {
Gate.init();
......@@ -238,7 +262,7 @@ public class App {
}
try {
process(parameters);
process(threads, parameters, processedFiles);
} catch (GateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
......@@ -256,98 +280,78 @@ public class App {
* @throws GateException
* @throws IOException
*/
private static void process(Map<String,String> parameters) throws GateException, IOException {
private static void process(Integer threads, Map<String,String> parameters,Set<String> processedFiles) throws GateException, IOException {
try {
System.out.println("App :: main :: INIT PROCESS");
Corpus corpus = Factory.newCorpus("My Files");
File directory = new File(parameters.get("inputFilePath"));
ExtensionFileFilter filter = new ExtensionFileFilter("Txt files", new String[]{"txt","xml"});
URL url = directory.toURL();
corpus.populate(url, filter, null, false);
Plugin anniePlugin = new Plugin.Maven("uk.ac.gate.plugins", "annie", "8.6");
Gate.getCreoleRegister().registerPlugin(anniePlugin);
// create a serial analyser controller to run ANNIE with
SerialAnalyserController annieController = (SerialAnalyserController) Factory.createResource("gate.creole.SerialAnalyserController",
Factory.newFeatureMap(), Factory.newFeatureMap(), "ANNIE");
annieController.setCorpus(corpus);
if(parameters.get("listsURL")!=null) {
//Basic Gazetter
//Gazetter parameters
FeatureMap params = Factory.newFeatureMap();
params.put("listsURL", new File(parameters.get("listsURL")).toURL());
params.put("gazetteerFeatureSeparator", parameters.get("gazetteerFeatureSeparator"));
params.put("caseSensitive",parameters.get("caseSensitive"));
params.put("longestMatchOnly",parameters.get("longestMatchOnly"));
ProcessingResource pr_basic_gazetter = (ProcessingResource) Factory.createResource("gate.creole.gazetteer.DefaultGazetteer", params);
//Flexible Gazetter
if(parameters.get("gazetter_type").equals("flexible")) {
Plugin toolsPlugin = new Plugin.Maven("uk.ac.gate.plugins", "tools", "8.6");
Gate.getCreoleRegister().registerPlugin(toolsPlugin);
FeatureMap params2 = Factory.newFeatureMap();
ProcessingResource pr_flexi_gazetter = (ProcessingResource) Factory.createResource("gate.creole.gazetteer.FlexibleGazetteer", params2);
pr_flexi_gazetter.setParameterValue("inputASName", parameters.get("inputASName"));
if (parameters.get("inputFeatureNames")==null) {
System.out.println("No inputFeatureNames defined, Token.root and Token.word as default");
ArrayList<String> l = new ArrayList<String>(Arrays.asList("Token.root","Token.word"));
pr_flexi_gazetter.setParameterValue("inputFeatureNames", l);
}else {
ArrayList<String> l = new ArrayList<String>(Arrays.asList(parameters.get("inputFeatureNames").toString().split(",")));
pr_flexi_gazetter.setParameterValue("inputFeatureNames", l);
}
pr_flexi_gazetter.setParameterValue("gazetteerInst", pr_basic_gazetter);
pr_flexi_gazetter.setParameterValue("outputASName", parameters.get("outputASName"));
annieController.add(pr_flexi_gazetter);
}else { //Default Gazetter
pr_basic_gazetter.setParameterValue("annotationSetName", parameters.get("outputASName"));
annieController.add(pr_basic_gazetter);
}
}
LanguageAnalyser jape = null;
if(parameters.get("japeMainPath")!=null) {
jape = (LanguageAnalyser)gate.Factory.createResource("gate.creole.Transducer", gate.Utils.featureMap(
"grammarURL", new File(parameters.get("japeMainPath")).toURI().toURL(),"encoding", "UTF-8"));
jape.setParameterValue("inputASName", parameters.get("inputASName"));
jape.setParameterValue("outputASName", parameters.get("outputASName"));
annieController.add(jape);
File[] files = directory.listFiles();
int len = files.length;
if(len<threads) {
System.out.println("App :: process :: total files less than threads numbers, only one thread will be launch ");
threads = 1;
}
// execute controller
annieController.execute();
//free resources
// if(pr_gazetter!=null) {
// Factory.deleteResource(pr_gazetter);
// }
if(jape!=null) {
Factory.deleteResource(jape);
}
Factory.deleteResource(annieController);
Gate.removeKnownPlugin(anniePlugin);
//Save documents in different output
for (Document document : corpus) {
String nameOutput = "";
if(document.getName().indexOf(".txt")!=-1) {
nameOutput = document.getName().substring(0, document.getName().indexOf(".txt")+4).replace(".txt", ".xml");
}else {
nameOutput = document.getName().substring(0, document.getName().indexOf(".xml")+4);
}
java.io.Writer out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(new FileOutputStream(new File(parameters.get("outputDirectory") + File.separator + nameOutput), false)));
out.write(document.toXml());
out.close();
int x = len/threads;
System.out.println("Total files : " + len);
System.out.println("Files already processed : " + processedFiles.size());
List<File[]> list = new ArrayList<File[]>();
for (int i = 0; i < len - x + 1; i += x)
list.add(Arrays.copyOfRange(files, i, i + x));
if (len % x != 0) {
File[] second = Arrays.copyOfRange(files, len - len % x, len);
File[] first = list.get(threads-1);
File[] the_two = (File[])ArrayUtils.addAll(first, second);
list.remove(threads-1);
list.add(the_two);
}
long begTest = new java.util.Date().getTime();
List<Future> futuresList = new ArrayList<Future>();
ExecutorService eservice = Executors.newFixedThreadPool(10);
for(int index = 0; index < threads; index++)
futuresList.add(eservice.submit(new Process(index, list.get(index), processedFiles, parameters)));
Object taskResult;
for(Future future:futuresList) {
try {
taskResult = future.get();
}catch (InterruptedException e) {}
catch (ExecutionException e) {}
}
Double secs = new Double((new java.util.Date().getTime() - begTest)*0.001);
System.out.println("Tiempo de ejecucion " + secs + " segundos");
Gate.removeKnownPlugin(anniePlugin);
}catch(Exception e) {
System.out.println("App :: main :: ERROR ");
e.printStackTrace();
System.exit(1);
}
System.out.println("App :: main :: END PROCESS");
System.exit(0);
}
/**
* Return a set of files
* @param dir
* @return
* @throws IOException
*/
public static Set<String> getFiles(String dir) throws IOException {
Set<String> fileList = new HashSet<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(dir))) {
for (Path path : stream) {
if (!Files.isDirectory(path)) {
fileList.add(FileUtils.removeExtension(path.getFileName().toString()));
}
}
}
return fileList;
}
/**
* Basic unzipping folder method
......
package es.bsc.inb.nlp.gate.generic.component.main;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.maven.shared.utils.io.FileUtils;
import gate.Corpus;
import gate.Factory;
import gate.FeatureMap;
import gate.Gate;
import gate.LanguageAnalyser;
import gate.ProcessingResource;
import gate.creole.ExecutionException;
import gate.creole.Plugin;
import gate.creole.ResourceInstantiationException;
import gate.creole.SerialAnalyserController;
import gate.util.GateException;
import gate.util.InvalidOffsetException;
/**
*
* @author javi
*
*/
public class Process implements Runnable {
int name;
File[] files = null;
Map<String,String> parameters;
Set<String> processedFiles = null;
public Process(int name, File[] files, Set<String> processedFiles,Map<String,String> parameters) {
super();
this.name=name;
this.files = files;
this.processedFiles = processedFiles;
this.parameters = parameters;
}
public void run() {
try {
System.out.println("Thread " + name + " -- Total files to process : " + files.length);
// create a serial analyser controller to run ANNIE with
SerialAnalyserController annieController;
annieController = (SerialAnalyserController) Factory.createResource("gate.creole.SerialAnalyserController",
Factory.newFeatureMap(), Factory.newFeatureMap(), "ANNIE");
if(parameters.get("listsURL")!=null) {
//Basic Gazetter
//Gazetter parameters
FeatureMap params = Factory.newFeatureMap();
params.put("listsURL", new File(parameters.get("listsURL")).toURL());
params.put("gazetteerFeatureSeparator", parameters.get("gazetteerFeatureSeparator"));
params.put("caseSensitive",parameters.get("caseSensitive"));
params.put("longestMatchOnly",parameters.get("longestMatchOnly"));
ProcessingResource pr_basic_gazetter = (ProcessingResource) Factory.createResource("gate.creole.gazetteer.DefaultGazetteer", params);
//Flexible Gazetter
if(parameters.get("gazetter_type").equals("flexible")) {
Plugin toolsPlugin = new Plugin.Maven("uk.ac.gate.plugins", "tools", "8.6");
Gate.getCreoleRegister().registerPlugin(toolsPlugin);
FeatureMap params2 = Factory.newFeatureMap();
ProcessingResource pr_flexi_gazetter = (ProcessingResource) Factory.createResource("gate.creole.gazetteer.FlexibleGazetteer", params2);
pr_flexi_gazetter.setParameterValue("inputASName", parameters.get("inputASName"));
if (parameters.get("inputFeatureNames")==null) {
System.out.println("No inputFeatureNames defined, Token.root and Token.word as default");
ArrayList<String> l = new ArrayList<String>(Arrays.asList("Token.root","Token.word"));
pr_flexi_gazetter.setParameterValue("inputFeatureNames", l);
}else {
ArrayList<String> l = new ArrayList<String>(Arrays.asList(parameters.get("inputFeatureNames").toString().split(",")));
pr_flexi_gazetter.setParameterValue("inputFeatureNames", l);
}
pr_flexi_gazetter.setParameterValue("gazetteerInst", pr_basic_gazetter);
pr_flexi_gazetter.setParameterValue("outputASName", parameters.get("outputASName"));
annieController.add(pr_flexi_gazetter);
}else { //Default Gazetter
pr_basic_gazetter.setParameterValue("annotationSetName", parameters.get("outputASName"));
annieController.add(pr_basic_gazetter);
}
}
LanguageAnalyser jape = null;
if(parameters.get("japeMainPath")!=null) {
jape = (LanguageAnalyser)gate.Factory.createResource("gate.creole.Transducer", gate.Utils.featureMap(
"grammarURL", new File(parameters.get("japeMainPath")).toURI().toURL(),"encoding", "UTF-8"));
jape.setParameterValue("inputASName", parameters.get("inputASName"));
jape.setParameterValue("outputASName", parameters.get("outputASName"));
annieController.add(jape);
}
for (File file : files) {
if((file.getName().endsWith(".xml") || file.getName().endsWith(".txt")) && !processedFiles.contains(FileUtils.removeExtension(file.getName()))){
try {
String fileOutPutName = file.getName();
if(fileOutPutName.endsWith(".txt")) {
fileOutPutName = fileOutPutName.replace(".txt", ".xml");
}
File outputGATEFile = new File (parameters.get("outputDirectory") + File.separator + fileOutPutName);
processDocument(annieController, file, outputGATEFile);
fileOutPutName=null;
outputGATEFile=null;
} catch (ResourceInstantiationException e) {
System.out.println("App::process :: error with document " + file.getAbsolutePath());
e.printStackTrace();
} catch (MalformedURLException e) {
System.out.println("App::process :: error with document " + file.getAbsolutePath());
e.printStackTrace();
} catch (InvalidOffsetException e) {
System.out.println("App::process :: error with document " + file.getAbsolutePath());
e.printStackTrace();
}
}
}
//free resources
// if(pr_gazetter!=null) {
// Factory.deleteResource(pr_gazetter);
// }
if(jape!=null) {
Factory.deleteResource(jape);
}
Factory.deleteResource(annieController);
} catch (MalformedURLException | GateException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
/**
* Execute process in a document
* @param pipeline
* @param inputFile
* @param outputGATEFile
* @throws ResourceInstantiationException
* @throws MalformedURLException
* @throws InvalidOffsetException
*/
private static void processDocument(SerialAnalyserController annieController, File inputFile, File outputGATEFile) throws ResourceInstantiationException, MalformedURLException, InvalidOffsetException {
try {
gate.Document gateDocument = Factory.newDocument(inputFile.toURI().toURL(), "UTF-8");
Corpus corpus = Factory.newCorpus("My XML Files");
corpus.add(gateDocument);
annieController.setCorpus(corpus);
annieController.execute();
java.io.Writer out = new java.io.BufferedWriter(new java.io.OutputStreamWriter(new FileOutputStream(outputGATEFile, false)));
out.write(gateDocument.toXml());
out.flush();
out.close();
gateDocument.cleanup();
gateDocument=null;
out=null;
corpus.clear();
corpus.cleanup();
corpus=null;
} catch (IOException e) {
System.out.println("App :: processDocument :: IOException ");
e.printStackTrace();
} catch (Exception e) {
System.out.println("App :: processDocument :: Exception ");
e.printStackTrace();
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment