001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.commons.rdf.simple.experimental; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.net.URI; 023import java.nio.file.Files; 024import java.nio.file.Path; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.function.Consumer; 030 031import org.apache.commons.rdf.api.Dataset; 032import org.apache.commons.rdf.api.Graph; 033import org.apache.commons.rdf.api.IRI; 034import org.apache.commons.rdf.api.Quad; 035import org.apache.commons.rdf.api.RDFSyntax; 036import org.apache.commons.rdf.api.RDF; 037import org.apache.commons.rdf.experimental.RDFParser; 038import org.apache.commons.rdf.simple.SimpleRDF; 039 040/** 041 * Abstract RDFParser 042 * <p> 043 * This abstract class keeps the properties in protected fields like 044 * {@link #sourceFile} using {@link Optional}. Some basic checking like 045 * {@link #checkIsAbsolute(IRI)} is performed. 046 * <p> 047 * This class and its subclasses are {@link Cloneable}, immutable and 048 * (therefore) thread-safe - each call to option methods like 049 * {@link #contentType(String)} or {@link #source(IRI)} will return a cloned, 050 * mutated copy. 051 * <p> 052 * By default, parsing is done by the abstract method 053 * {@link #parseSynchronusly()} - which is executed in a cloned snapshot - hence 054 * multiple {@link #parse()} calls are thread-safe. The default {@link #parse()} 055 * uses a thread pool in {@link #threadGroup} - but implementations can override 056 * {@link #parse()} (e.g. because it has its own threading model or use 057 * asynchronous remote execution). 058 */ 059public abstract class AbstractRDFParser<T extends AbstractRDFParser<T>> implements RDFParser, Cloneable { 060 061 public static final ThreadGroup threadGroup = new ThreadGroup("Commons RDF parsers"); 062 private static final ExecutorService threadpool = Executors.newCachedThreadPool(r -> new Thread(threadGroup, r)); 063 064 // Basically only used for creating IRIs 065 private static RDF internalRdfTermFactory = new SimpleRDF(); 066 067 /** 068 * Get the set {@link RDF}, if any. 069 * 070 * @return The {@link RDF} to use, or {@link Optional#empty()} if it has not 071 * been set 072 */ 073 public Optional<RDF> getRdfTermFactory() { 074 return rdfTermFactory; 075 } 076 077 /** 078 * Get the set content-type {@link RDFSyntax}, if any. 079 * <p> 080 * If this is {@link Optional#isPresent()}, then {@link #getContentType()} 081 * contains the value of {@link RDFSyntax#mediaType}. 082 * 083 * @return The {@link RDFSyntax} of the content type, or 084 * {@link Optional#empty()} if it has not been set 085 */ 086 public Optional<RDFSyntax> getContentTypeSyntax() { 087 return contentTypeSyntax; 088 } 089 090 /** 091 * Get the set content-type String, if any. 092 * <p> 093 * If this is {@link Optional#isPresent()} and is recognized by 094 * {@link RDFSyntax#byMediaType(String)}, then the corresponding 095 * {@link RDFSyntax} is set on {@link #getContentType()}, otherwise that is 096 * {@link Optional#empty()}. 097 * 098 * @return The Content-Type IANA media type, e.g. <code>text/turtle</code>, 099 * or {@link Optional#empty()} if it has not been set 100 */ 101 public final Optional<String> getContentType() { 102 return contentType; 103 } 104 105 /** 106 * Get the target to consume parsed Quads. 107 * <p> 108 * From the call to {@link #parseSynchronusly()}, this will be a 109 * non-<code>null</code> value (as a target is a required setting). 110 * 111 * @return The target consumer of {@link Quad}s, or <code>null</code> if it 112 * has not yet been set. 113 * 114 */ 115 public Consumer<Quad> getTarget() { 116 return target; 117 } 118 119 /** 120 * Get the target dataset as set by {@link #target(Dataset)}. 121 * <p> 122 * The return value is {@link Optional#isPresent()} if and only if 123 * {@link #target(Dataset)} has been set, meaning that the implementation 124 * may choose to append parsed quads to the {@link Dataset} directly instead 125 * of relying on the generated {@link #getTarget()} consumer. 126 * <p> 127 * If this value is present, then {@link #getTargetGraph()} MUST be 128 * {@link Optional#empty()}. 129 * 130 * @return The target Dataset, or {@link Optional#empty()} if another kind 131 * of target has been set. 132 */ 133 public Optional<Dataset> getTargetDataset() { 134 return targetDataset; 135 } 136 137 /** 138 * Get the target graph as set by {@link #target(Graph)}. 139 * <p> 140 * The return value is {@link Optional#isPresent()} if and only if 141 * {@link #target(Graph)} has been set, meaning that the implementation may 142 * choose to append parsed triples to the {@link Graph} directly instead of 143 * relying on the generated {@link #getTarget()} consumer. 144 * <p> 145 * If this value is present, then {@link #getTargetDataset()} MUST be 146 * {@link Optional#empty()}. 147 * 148 * @return The target Graph, or {@link Optional#empty()} if another kind of 149 * target has been set. 150 */ 151 public Optional<Graph> getTargetGraph() { 152 return targetGraph; 153 } 154 155 /** 156 * Get the set base {@link IRI}, if present. 157 * 158 * @return The base {@link IRI}, or {@link Optional#empty()} if it has not 159 * been set 160 */ 161 public Optional<IRI> getBase() { 162 return base; 163 } 164 165 /** 166 * Get the set source {@link InputStream}. 167 * <p> 168 * If this is {@link Optional#isPresent()}, then {@link #getSourceFile()} 169 * and {@link #getSourceIri()} are {@link Optional#empty()}. 170 * 171 * @return The source {@link InputStream}, or {@link Optional#empty()} if it 172 * has not been set 173 */ 174 public Optional<InputStream> getSourceInputStream() { 175 return sourceInputStream; 176 } 177 178 /** 179 * Get the set source {@link Path}. 180 * <p> 181 * If this is {@link Optional#isPresent()}, then 182 * {@link #getSourceInputStream()} and {@link #getSourceIri()} are 183 * {@link Optional#empty()}. 184 * 185 * @return The source {@link Path}, or {@link Optional#empty()} if it has 186 * not been set 187 */ 188 public Optional<Path> getSourceFile() { 189 return sourceFile; 190 } 191 192 /** 193 * Get the set source {@link Path}. 194 * <p> 195 * If this is {@link Optional#isPresent()}, then 196 * {@link #getSourceInputStream()} and {@link #getSourceInputStream()} are 197 * {@link Optional#empty()}. 198 * 199 * @return The source {@link IRI}, or {@link Optional#empty()} if it has not 200 * been set 201 */ 202 public Optional<IRI> getSourceIri() { 203 return sourceIri; 204 } 205 206 private Optional<RDF> rdfTermFactory = Optional.empty(); 207 private Optional<RDFSyntax> contentTypeSyntax = Optional.empty(); 208 private Optional<String> contentType = Optional.empty(); 209 private Optional<IRI> base = Optional.empty(); 210 private Optional<InputStream> sourceInputStream = Optional.empty(); 211 private Optional<Path> sourceFile = Optional.empty(); 212 private Optional<IRI> sourceIri = Optional.empty(); 213 private Consumer<Quad> target; 214 private Optional<Dataset> targetDataset; 215 private Optional<Graph> targetGraph; 216 217 @SuppressWarnings("unchecked") 218 @Override 219 public T clone() { 220 try { 221 return (T) super.clone(); 222 } catch (final CloneNotSupportedException e) { 223 throw new RuntimeException(e); 224 } 225 } 226 227 @SuppressWarnings("unchecked") 228 protected T asT() { 229 return (T) this; 230 } 231 232 @Override 233 public T rdfTermFactory(final RDF rdfTermFactory) { 234 final AbstractRDFParser<T> c = clone(); 235 c.rdfTermFactory = Optional.ofNullable(rdfTermFactory); 236 return c.asT(); 237 } 238 239 @Override 240 public T contentType(final RDFSyntax rdfSyntax) throws IllegalArgumentException { 241 final AbstractRDFParser<T> c = clone(); 242 c.contentTypeSyntax = Optional.ofNullable(rdfSyntax); 243 c.contentType = c.contentTypeSyntax.map(syntax -> syntax.mediaType()); 244 return c.asT(); 245 } 246 247 @Override 248 public T contentType(final String contentType) throws IllegalArgumentException { 249 final AbstractRDFParser<T> c = clone(); 250 c.contentType = Optional.ofNullable(contentType); 251 c.contentTypeSyntax = c.contentType.flatMap(RDFSyntax::byMediaType); 252 return c.asT(); 253 } 254 255 @Override 256 public T base(final IRI base) { 257 final AbstractRDFParser<T> c = clone(); 258 c.base = Optional.ofNullable(base); 259 c.base.ifPresent(i -> checkIsAbsolute(i)); 260 return c.asT(); 261 } 262 263 @Override 264 public T base(final String base) throws IllegalArgumentException { 265 return base(internalRdfTermFactory.createIRI(base)); 266 } 267 268 @Override 269 public T source(final InputStream inputStream) { 270 final AbstractRDFParser<T> c = clone(); 271 c.resetSource(); 272 c.sourceInputStream = Optional.ofNullable(inputStream); 273 return c.asT(); 274 } 275 276 @Override 277 public T source(final Path file) { 278 final AbstractRDFParser<T> c = clone(); 279 c.resetSource(); 280 c.sourceFile = Optional.ofNullable(file); 281 return c.asT(); 282 } 283 284 @Override 285 public T source(final IRI iri) { 286 final AbstractRDFParser<T> c = clone(); 287 c.resetSource(); 288 c.sourceIri = Optional.ofNullable(iri); 289 c.sourceIri.ifPresent(i -> checkIsAbsolute(i)); 290 return c.asT(); 291 } 292 293 @Override 294 public T source(final String iri) throws IllegalArgumentException { 295 final AbstractRDFParser<T> c = clone(); 296 c.resetSource(); 297 c.sourceIri = Optional.ofNullable(iri).map(internalRdfTermFactory::createIRI); 298 c.sourceIri.ifPresent(i -> checkIsAbsolute(i)); 299 return source(internalRdfTermFactory.createIRI(iri)); 300 } 301 302 /** 303 * Check if an iri is absolute. 304 * <p> 305 * Used by {@link #source(String)} and {@link #base(String)}. 306 * 307 * @param iri 308 * IRI to check 309 * @throws IllegalArgumentException 310 * If the IRI is not absolute 311 */ 312 protected void checkIsAbsolute(final IRI iri) throws IllegalArgumentException { 313 if (!URI.create(iri.getIRIString()).isAbsolute()) { 314 throw new IllegalArgumentException("IRI is not absolute: " + iri); 315 } 316 } 317 318 /** 319 * Check that one and only one source is present and valid. 320 * <p> 321 * Used by {@link #parse()}. 322 * <p> 323 * Subclasses might override this method, e.g. to support other source 324 * combinations, or to check if the sourceIri is resolvable. 325 * 326 * @throws IOException 327 * If a source file can't be read 328 */ 329 protected void checkSource() throws IOException { 330 if (!sourceFile.isPresent() && !sourceInputStream.isPresent() && !sourceIri.isPresent()) { 331 throw new IllegalStateException("No source has been set"); 332 } 333 if (sourceIri.isPresent() && sourceInputStream.isPresent()) { 334 throw new IllegalStateException("Both sourceIri and sourceInputStream have been set"); 335 } 336 if (sourceIri.isPresent() && sourceFile.isPresent()) { 337 throw new IllegalStateException("Both sourceIri and sourceFile have been set"); 338 } 339 if (sourceInputStream.isPresent() && sourceFile.isPresent()) { 340 throw new IllegalStateException("Both sourceInputStream and sourceFile have been set"); 341 } 342 if (sourceFile.isPresent() && !sourceFile.filter(Files::isReadable).isPresent()) { 343 throw new IOException("Can't read file: " + sourceFile); 344 } 345 } 346 347 /** 348 * Check if base is required. 349 * 350 * @throws IllegalStateException 351 * if base is required, but not set. 352 */ 353 protected void checkBaseRequired() throws IllegalStateException { 354 if (!base.isPresent() && sourceInputStream.isPresent() 355 && !contentTypeSyntax.filter(t -> t == RDFSyntax.NQUADS || t == RDFSyntax.NTRIPLES).isPresent()) { 356 throw new IllegalStateException("base iri required for inputstream source"); 357 } 358 } 359 360 /** 361 * Reset all source* fields to Optional.empty() 362 * <p> 363 * Subclasses should override this and call <code>super.resetSource()</code> 364 * if they need to reset any additional source* fields. 365 * 366 */ 367 protected void resetSource() { 368 sourceInputStream = Optional.empty(); 369 sourceIri = Optional.empty(); 370 sourceFile = Optional.empty(); 371 } 372 373 /** 374 * Reset all optional target* fields to {@link Optional#empty()}. 375 * <p> 376 * Note that the consumer set for {@link #getTarget()} is 377 * <strong>note</strong> reset. 378 * <p> 379 * Subclasses should override this and call <code>super.resetTarget()</code> 380 * if they need to reset any additional target* fields. 381 * 382 */ 383 protected void resetTarget() { 384 targetDataset = Optional.empty(); 385 targetGraph = Optional.empty(); 386 } 387 388 /** 389 * Parse {@link #sourceInputStream}, {@link #sourceFile} or 390 * {@link #sourceIri}. 391 * <p> 392 * One of the source fields MUST be present, as checked by 393 * {@link #checkSource()}. 394 * <p> 395 * {@link #checkBaseRequired()} is called to verify if {@link #getBase()} is 396 * required. 397 * 398 * @throws IOException 399 * If the source could not be read 400 * @throws RDFParseException 401 * If the source could not be parsed (e.g. a .ttl file was not 402 * valid Turtle) 403 */ 404 protected abstract void parseSynchronusly() throws IOException, RDFParseException; 405 406 /** 407 * Prepare a clone of this RDFParser which have been checked and completed. 408 * <p> 409 * The returned clone will always have {@link #getTarget()} and 410 * {@link #getRdfTermFactory()} present. 411 * <p> 412 * If the {@link #getSourceFile()} is present, but the {@link #getBase()} is 413 * not present, the base will be set to the <code>file:///</code> IRI for 414 * the Path's real path (e.g. resolving any symbolic links). 415 * 416 * @return A completed and checked clone of this RDFParser 417 * @throws IOException 418 * If the source was not accessible (e.g. a file was not found) 419 * @throws IllegalStateException 420 * If the parser was not in a compatible setting (e.g. 421 * contentType was an invalid string) 422 */ 423 protected T prepareForParsing() throws IOException, IllegalStateException { 424 checkSource(); 425 checkBaseRequired(); 426 checkContentType(); 427 checkTarget(); 428 429 // We'll make a clone of our current state which will be passed to 430 // parseSynchronously() 431 final AbstractRDFParser<T> c = clone(); 432 433 // Use a fresh SimpleRDF for each parse 434 if (!c.rdfTermFactory.isPresent()) { 435 c.rdfTermFactory = Optional.of(createRDFTermFactory()); 436 } 437 // sourceFile, but no base? Let's follow any symlinks and use 438 // the file:/// URI 439 if (c.sourceFile.isPresent() && !c.base.isPresent()) { 440 final URI baseUri = c.sourceFile.get().toRealPath().toUri(); 441 c.base = Optional.of(internalRdfTermFactory.createIRI(baseUri.toString())); 442 } 443 444 return c.asT(); 445 } 446 447 /** 448 * Subclasses can override this method to check the target is valid. 449 * <p> 450 * The default implementation throws an IllegalStateException if the target 451 * has not been set. 452 */ 453 protected void checkTarget() { 454 if (target == null) { 455 throw new IllegalStateException("target has not been set"); 456 } 457 if (targetGraph.isPresent() && targetDataset.isPresent()) { 458 // This should not happen as each target(..) method resets the 459 // optionals 460 throw new IllegalStateException("targetGraph and targetDataset can't both be set"); 461 } 462 } 463 464 /** 465 * Subclasses can override this method to check compatibility with the 466 * contentType setting. 467 * 468 * @throws IllegalStateException 469 * if the {@link #getContentType()} or 470 * {@link #getContentTypeSyntax()} is not compatible or invalid 471 */ 472 protected void checkContentType() throws IllegalStateException { 473 } 474 475 /** 476 * Guess RDFSyntax from a local file's extension. 477 * <p> 478 * This method can be used by subclasses if {@link #getContentType()} is not 479 * present and {@link #getSourceFile()} is set. 480 * 481 * @param path 482 * Path which extension should be checked 483 * @return The {@link RDFSyntax} which has a matching 484 * {@link RDFSyntax#fileExtension}, otherwise 485 * {@link Optional#empty()}. 486 */ 487 protected static Optional<RDFSyntax> guessRDFSyntax(final Path path) { 488 return fileExtension(path).flatMap(RDFSyntax::byFileExtension); 489 } 490 491 /** 492 * Return the file extension of a Path - if any. 493 * <p> 494 * The returned file extension includes the leading <code>.</code> 495 * <p> 496 * Note that this only returns the last extension, e.g. the file extension 497 * for <code>archive.tar.gz</code> would be <code>.gz</code> 498 * 499 * @param path 500 * Path which filename might contain an extension 501 * @return File extension (including the leading <code>.</code>, or 502 * {@link Optional#empty()} if the path has no extension 503 */ 504 private static Optional<String> fileExtension(final Path path) { 505 final Path fileName = path.getFileName(); 506 if (fileName == null) { 507 return Optional.empty(); 508 } 509 final String filenameStr = fileName.toString(); 510 final int last = filenameStr.lastIndexOf("."); 511 if (last > -1) { 512 return Optional.of(filenameStr.substring(last)); 513 } 514 return Optional.empty(); 515 } 516 517 /** 518 * Create a new {@link RDF} for a parse session. 519 * <p> 520 * This is called by {@link #parse()} to set {@link #rdfTermFactory(RDF)} if 521 * it is {@link Optional#empty()}. 522 * <p> 523 * As parsed blank nodes might be made with 524 * {@link RDF#createBlankNode(String)}, each call to this method SHOULD 525 * return a new RDF instance. 526 * 527 * @return A new {@link RDF} 528 */ 529 protected RDF createRDFTermFactory() { 530 return new SimpleRDF(); 531 } 532 533 @Override 534 public Future<ParseResult> parse() throws IOException, IllegalStateException { 535 final AbstractRDFParser<T> c = prepareForParsing(); 536 return threadpool.submit(() -> { 537 c.parseSynchronusly(); 538 return null; 539 }); 540 } 541 542 @Override 543 public T target(final Consumer<Quad> consumer) { 544 final AbstractRDFParser<T> c = clone(); 545 c.resetTarget(); 546 c.target = consumer; 547 return c.asT(); 548 } 549 550 @Override 551 public T target(final Dataset dataset) { 552 @SuppressWarnings({ "rawtypes", "unchecked" }) 553 final 554 AbstractRDFParser<T> c = (AbstractRDFParser) RDFParser.super.target(dataset); 555 c.resetTarget(); 556 c.targetDataset = Optional.of(dataset); 557 return c.asT(); 558 } 559 560 @Override 561 public T target(final Graph graph) { 562 @SuppressWarnings({ "rawtypes", "unchecked" }) // super calls our 563 final 564 // .clone() 565 AbstractRDFParser<T> c = (AbstractRDFParser) RDFParser.super.target(graph); 566 c.resetTarget(); 567 c.targetGraph = Optional.of(graph); 568 return c.asT(); 569 } 570 571}