001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.commons.rdf.simple.experimental;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.net.URI;
023import java.nio.file.Files;
024import java.nio.file.Path;
025import java.util.Optional;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.function.Consumer;
030
031import org.apache.commons.rdf.api.Dataset;
032import org.apache.commons.rdf.api.Graph;
033import org.apache.commons.rdf.api.IRI;
034import org.apache.commons.rdf.api.Quad;
035import org.apache.commons.rdf.api.RDFSyntax;
036import org.apache.commons.rdf.api.RDF;
037import org.apache.commons.rdf.experimental.RDFParser;
038import org.apache.commons.rdf.simple.SimpleRDF;
039
040/**
041 * Abstract RDFParser
042 * <p>
043 * This abstract class keeps the properties in protected fields like
044 * {@link #sourceFile} using {@link Optional}. Some basic checking like
045 * {@link #checkIsAbsolute(IRI)} is performed.
046 * <p>
047 * This class and its subclasses are {@link Cloneable}, immutable and
048 * (therefore) thread-safe - each call to option methods like
049 * {@link #contentType(String)} or {@link #source(IRI)} will return a cloned,
050 * mutated copy.
051 * <p>
052 * By default, parsing is done by the abstract method
053 * {@link #parseSynchronusly()} - which is executed in a cloned snapshot - hence
054 * multiple {@link #parse()} calls are thread-safe. The default {@link #parse()}
055 * uses a thread pool in {@link #threadGroup} - but implementations can override
056 * {@link #parse()} (e.g. because it has its own threading model or use
057 * asynchronous remote execution).
058 */
059public abstract class AbstractRDFParser<T extends AbstractRDFParser<T>> implements RDFParser, Cloneable {
060
061    public static final ThreadGroup threadGroup = new ThreadGroup("Commons RDF parsers");
062    private static final ExecutorService threadpool = Executors.newCachedThreadPool(r -> new Thread(threadGroup, r));
063
064    // Basically only used for creating IRIs
065    private static RDF internalRdfTermFactory = new SimpleRDF();
066
067    /**
068     * Get the set {@link RDF}, if any.
069     *
070     * @return The {@link RDF} to use, or {@link Optional#empty()} if it has not
071     *         been set
072     */
073    public Optional<RDF> getRdfTermFactory() {
074        return rdfTermFactory;
075    }
076
077    /**
078     * Get the set content-type {@link RDFSyntax}, if any.
079     * <p>
080     * If this is {@link Optional#isPresent()}, then {@link #getContentType()}
081     * contains the value of {@link RDFSyntax#mediaType}.
082     *
083     * @return The {@link RDFSyntax} of the content type, or
084     *         {@link Optional#empty()} if it has not been set
085     */
086    public Optional<RDFSyntax> getContentTypeSyntax() {
087        return contentTypeSyntax;
088    }
089
090    /**
091     * Get the set content-type String, if any.
092     * <p>
093     * If this is {@link Optional#isPresent()} and is recognized by
094     * {@link RDFSyntax#byMediaType(String)}, then the corresponding
095     * {@link RDFSyntax} is set on {@link #getContentType()}, otherwise that is
096     * {@link Optional#empty()}.
097     *
098     * @return The Content-Type IANA media type, e.g. <code>text/turtle</code>,
099     *         or {@link Optional#empty()} if it has not been set
100     */
101    public final Optional<String> getContentType() {
102        return contentType;
103    }
104
105    /**
106     * Get the target to consume parsed Quads.
107     * <p>
108     * From the call to {@link #parseSynchronusly()}, this will be a
109     * non-<code>null</code> value (as a target is a required setting).
110     *
111     * @return The target consumer of {@link Quad}s, or <code>null</code> if it
112     *         has not yet been set.
113     *
114     */
115    public Consumer<Quad> getTarget() {
116        return target;
117    }
118
119    /**
120     * Get the target dataset as set by {@link #target(Dataset)}.
121     * <p>
122     * The return value is {@link Optional#isPresent()} if and only if
123     * {@link #target(Dataset)} has been set, meaning that the implementation
124     * may choose to append parsed quads to the {@link Dataset} directly instead
125     * of relying on the generated {@link #getTarget()} consumer.
126     * <p>
127     * If this value is present, then {@link #getTargetGraph()} MUST be
128     * {@link Optional#empty()}.
129     *
130     * @return The target Dataset, or {@link Optional#empty()} if another kind
131     *         of target has been set.
132     */
133    public Optional<Dataset> getTargetDataset() {
134        return targetDataset;
135    }
136
137    /**
138     * Get the target graph as set by {@link #target(Graph)}.
139     * <p>
140     * The return value is {@link Optional#isPresent()} if and only if
141     * {@link #target(Graph)} has been set, meaning that the implementation may
142     * choose to append parsed triples to the {@link Graph} directly instead of
143     * relying on the generated {@link #getTarget()} consumer.
144     * <p>
145     * If this value is present, then {@link #getTargetDataset()} MUST be
146     * {@link Optional#empty()}.
147     *
148     * @return The target Graph, or {@link Optional#empty()} if another kind of
149     *         target has been set.
150     */
151    public Optional<Graph> getTargetGraph() {
152        return targetGraph;
153    }
154
155    /**
156     * Get the set base {@link IRI}, if present.
157     *
158     * @return The base {@link IRI}, or {@link Optional#empty()} if it has not
159     *         been set
160     */
161    public Optional<IRI> getBase() {
162        return base;
163    }
164
165    /**
166     * Get the set source {@link InputStream}.
167     * <p>
168     * If this is {@link Optional#isPresent()}, then {@link #getSourceFile()}
169     * and {@link #getSourceIri()} are {@link Optional#empty()}.
170     *
171     * @return The source {@link InputStream}, or {@link Optional#empty()} if it
172     *         has not been set
173     */
174    public Optional<InputStream> getSourceInputStream() {
175        return sourceInputStream;
176    }
177
178    /**
179     * Get the set source {@link Path}.
180     * <p>
181     * If this is {@link Optional#isPresent()}, then
182     * {@link #getSourceInputStream()} and {@link #getSourceIri()} are
183     * {@link Optional#empty()}.
184     *
185     * @return The source {@link Path}, or {@link Optional#empty()} if it has
186     *         not been set
187     */
188    public Optional<Path> getSourceFile() {
189        return sourceFile;
190    }
191
192    /**
193     * Get the set source {@link Path}.
194     * <p>
195     * If this is {@link Optional#isPresent()}, then
196     * {@link #getSourceInputStream()} and {@link #getSourceInputStream()} are
197     * {@link Optional#empty()}.
198     *
199     * @return The source {@link IRI}, or {@link Optional#empty()} if it has not
200     *         been set
201     */
202    public Optional<IRI> getSourceIri() {
203        return sourceIri;
204    }
205
206    private Optional<RDF> rdfTermFactory = Optional.empty();
207    private Optional<RDFSyntax> contentTypeSyntax = Optional.empty();
208    private Optional<String> contentType = Optional.empty();
209    private Optional<IRI> base = Optional.empty();
210    private Optional<InputStream> sourceInputStream = Optional.empty();
211    private Optional<Path> sourceFile = Optional.empty();
212    private Optional<IRI> sourceIri = Optional.empty();
213    private Consumer<Quad> target;
214    private Optional<Dataset> targetDataset;
215    private Optional<Graph> targetGraph;
216
217    @SuppressWarnings("unchecked")
218    @Override
219    public T clone() {
220        try {
221            return (T) super.clone();
222        } catch (final CloneNotSupportedException e) {
223            throw new RuntimeException(e);
224        }
225    }
226
227    @SuppressWarnings("unchecked")
228    protected T asT() {
229        return (T) this;
230    }
231
232    @Override
233    public T rdfTermFactory(final RDF rdfTermFactory) {
234        final AbstractRDFParser<T> c = clone();
235        c.rdfTermFactory = Optional.ofNullable(rdfTermFactory);
236        return c.asT();
237    }
238
239    @Override
240    public T contentType(final RDFSyntax rdfSyntax) throws IllegalArgumentException {
241        final AbstractRDFParser<T> c = clone();
242        c.contentTypeSyntax = Optional.ofNullable(rdfSyntax);
243        c.contentType = c.contentTypeSyntax.map(syntax -> syntax.mediaType());
244        return c.asT();
245    }
246
247    @Override
248    public T contentType(final String contentType) throws IllegalArgumentException {
249        final AbstractRDFParser<T> c = clone();
250        c.contentType = Optional.ofNullable(contentType);
251        c.contentTypeSyntax = c.contentType.flatMap(RDFSyntax::byMediaType);
252        return c.asT();
253    }
254
255    @Override
256    public T base(final IRI base) {
257        final AbstractRDFParser<T> c = clone();
258        c.base = Optional.ofNullable(base);
259        c.base.ifPresent(i -> checkIsAbsolute(i));
260        return c.asT();
261    }
262
263    @Override
264    public T base(final String base) throws IllegalArgumentException {
265        return base(internalRdfTermFactory.createIRI(base));
266    }
267
268    @Override
269    public T source(final InputStream inputStream) {
270        final AbstractRDFParser<T> c = clone();
271        c.resetSource();
272        c.sourceInputStream = Optional.ofNullable(inputStream);
273        return c.asT();
274    }
275
276    @Override
277    public T source(final Path file) {
278        final AbstractRDFParser<T> c = clone();
279        c.resetSource();
280        c.sourceFile = Optional.ofNullable(file);
281        return c.asT();
282    }
283
284    @Override
285    public T source(final IRI iri) {
286        final AbstractRDFParser<T> c = clone();
287        c.resetSource();
288        c.sourceIri = Optional.ofNullable(iri);
289        c.sourceIri.ifPresent(i -> checkIsAbsolute(i));
290        return c.asT();
291    }
292
293    @Override
294    public T source(final String iri) throws IllegalArgumentException {
295        final AbstractRDFParser<T> c = clone();
296        c.resetSource();
297        c.sourceIri = Optional.ofNullable(iri).map(internalRdfTermFactory::createIRI);
298        c.sourceIri.ifPresent(i -> checkIsAbsolute(i));
299        return source(internalRdfTermFactory.createIRI(iri));
300    }
301
302    /**
303     * Check if an iri is absolute.
304     * <p>
305     * Used by {@link #source(String)} and {@link #base(String)}.
306     *
307     * @param iri
308     *            IRI to check
309     * @throws IllegalArgumentException
310     *             If the IRI is not absolute
311     */
312    protected void checkIsAbsolute(final IRI iri) throws IllegalArgumentException {
313        if (!URI.create(iri.getIRIString()).isAbsolute()) {
314            throw new IllegalArgumentException("IRI is not absolute: " + iri);
315        }
316    }
317
318    /**
319     * Check that one and only one source is present and valid.
320     * <p>
321     * Used by {@link #parse()}.
322     * <p>
323     * Subclasses might override this method, e.g. to support other source
324     * combinations, or to check if the sourceIri is resolvable.
325     *
326     * @throws IOException
327     *             If a source file can't be read
328     */
329    protected void checkSource() throws IOException {
330        if (!sourceFile.isPresent() && !sourceInputStream.isPresent() && !sourceIri.isPresent()) {
331            throw new IllegalStateException("No source has been set");
332        }
333        if (sourceIri.isPresent() && sourceInputStream.isPresent()) {
334            throw new IllegalStateException("Both sourceIri and sourceInputStream have been set");
335        }
336        if (sourceIri.isPresent() && sourceFile.isPresent()) {
337            throw new IllegalStateException("Both sourceIri and sourceFile have been set");
338        }
339        if (sourceInputStream.isPresent() && sourceFile.isPresent()) {
340            throw new IllegalStateException("Both sourceInputStream and sourceFile have been set");
341        }
342        if (sourceFile.isPresent() && !sourceFile.filter(Files::isReadable).isPresent()) {
343            throw new IOException("Can't read file: " + sourceFile);
344        }
345    }
346
347    /**
348     * Check if base is required.
349     *
350     * @throws IllegalStateException
351     *             if base is required, but not set.
352     */
353    protected void checkBaseRequired() throws IllegalStateException {
354        if (!base.isPresent() && sourceInputStream.isPresent()
355                && !contentTypeSyntax.filter(t -> t == RDFSyntax.NQUADS || t == RDFSyntax.NTRIPLES).isPresent()) {
356            throw new IllegalStateException("base iri required for inputstream source");
357        }
358    }
359
360    /**
361     * Reset all source* fields to Optional.empty()
362     * <p>
363     * Subclasses should override this and call <code>super.resetSource()</code>
364     * if they need to reset any additional source* fields.
365     *
366     */
367    protected void resetSource() {
368        sourceInputStream = Optional.empty();
369        sourceIri = Optional.empty();
370        sourceFile = Optional.empty();
371    }
372
373    /**
374     * Reset all optional target* fields to {@link Optional#empty()}.
375     * <p>
376     * Note that the consumer set for {@link #getTarget()} is
377     * <strong>note</strong> reset.
378     * <p>
379     * Subclasses should override this and call <code>super.resetTarget()</code>
380     * if they need to reset any additional target* fields.
381     *
382     */
383    protected void resetTarget() {
384        targetDataset = Optional.empty();
385        targetGraph = Optional.empty();
386    }
387
388    /**
389     * Parse {@link #sourceInputStream}, {@link #sourceFile} or
390     * {@link #sourceIri}.
391     * <p>
392     * One of the source fields MUST be present, as checked by
393     * {@link #checkSource()}.
394     * <p>
395     * {@link #checkBaseRequired()} is called to verify if {@link #getBase()} is
396     * required.
397     *
398     * @throws IOException
399     *             If the source could not be read
400     * @throws RDFParseException
401     *             If the source could not be parsed (e.g. a .ttl file was not
402     *             valid Turtle)
403     */
404    protected abstract void parseSynchronusly() throws IOException, RDFParseException;
405
406    /**
407     * Prepare a clone of this RDFParser which have been checked and completed.
408     * <p>
409     * The returned clone will always have {@link #getTarget()} and
410     * {@link #getRdfTermFactory()} present.
411     * <p>
412     * If the {@link #getSourceFile()} is present, but the {@link #getBase()} is
413     * not present, the base will be set to the <code>file:///</code> IRI for
414     * the Path's real path (e.g. resolving any symbolic links).
415     *
416     * @return A completed and checked clone of this RDFParser
417     * @throws IOException
418     *             If the source was not accessible (e.g. a file was not found)
419     * @throws IllegalStateException
420     *             If the parser was not in a compatible setting (e.g.
421     *             contentType was an invalid string)
422     */
423    protected T prepareForParsing() throws IOException, IllegalStateException {
424        checkSource();
425        checkBaseRequired();
426        checkContentType();
427        checkTarget();
428
429        // We'll make a clone of our current state which will be passed to
430        // parseSynchronously()
431        final AbstractRDFParser<T> c = clone();
432
433        // Use a fresh SimpleRDF for each parse
434        if (!c.rdfTermFactory.isPresent()) {
435            c.rdfTermFactory = Optional.of(createRDFTermFactory());
436        }
437        // sourceFile, but no base? Let's follow any symlinks and use
438        // the file:/// URI
439        if (c.sourceFile.isPresent() && !c.base.isPresent()) {
440            final URI baseUri = c.sourceFile.get().toRealPath().toUri();
441            c.base = Optional.of(internalRdfTermFactory.createIRI(baseUri.toString()));
442        }
443
444        return c.asT();
445    }
446
447    /**
448     * Subclasses can override this method to check the target is valid.
449     * <p>
450     * The default implementation throws an IllegalStateException if the target
451     * has not been set.
452     */
453    protected void checkTarget() {
454        if (target == null) {
455            throw new IllegalStateException("target has not been set");
456        }
457        if (targetGraph.isPresent() && targetDataset.isPresent()) {
458            // This should not happen as each target(..) method resets the
459            // optionals
460            throw new IllegalStateException("targetGraph and targetDataset can't both be set");
461        }
462    }
463
464    /**
465     * Subclasses can override this method to check compatibility with the
466     * contentType setting.
467     *
468     * @throws IllegalStateException
469     *             if the {@link #getContentType()} or
470     *             {@link #getContentTypeSyntax()} is not compatible or invalid
471     */
472    protected void checkContentType() throws IllegalStateException {
473    }
474
475    /**
476     * Guess RDFSyntax from a local file's extension.
477     * <p>
478     * This method can be used by subclasses if {@link #getContentType()} is not
479     * present and {@link #getSourceFile()} is set.
480     *
481     * @param path
482     *            Path which extension should be checked
483     * @return The {@link RDFSyntax} which has a matching
484     *         {@link RDFSyntax#fileExtension}, otherwise
485     *         {@link Optional#empty()}.
486     */
487    protected static Optional<RDFSyntax> guessRDFSyntax(final Path path) {
488        return fileExtension(path).flatMap(RDFSyntax::byFileExtension);
489    }
490
491    /**
492     * Return the file extension of a Path - if any.
493     * <p>
494     * The returned file extension includes the leading <code>.</code>
495     * <p>
496     * Note that this only returns the last extension, e.g. the file extension
497     * for <code>archive.tar.gz</code> would be <code>.gz</code>
498     *
499     * @param path
500     *            Path which filename might contain an extension
501     * @return File extension (including the leading <code>.</code>, or
502     *         {@link Optional#empty()} if the path has no extension
503     */
504    private static Optional<String> fileExtension(final Path path) {
505        final Path fileName = path.getFileName();
506        if (fileName == null) {
507            return Optional.empty();
508        }
509        final String filenameStr = fileName.toString();
510        final int last = filenameStr.lastIndexOf(".");
511        if (last > -1) {
512            return Optional.of(filenameStr.substring(last));
513        }
514        return Optional.empty();
515    }
516
517    /**
518     * Create a new {@link RDF} for a parse session.
519     * <p>
520     * This is called by {@link #parse()} to set {@link #rdfTermFactory(RDF)} if
521     * it is {@link Optional#empty()}.
522     * <p>
523     * As parsed blank nodes might be made with
524     * {@link RDF#createBlankNode(String)}, each call to this method SHOULD
525     * return a new RDF instance.
526     *
527     * @return A new {@link RDF}
528     */
529    protected RDF createRDFTermFactory() {
530        return new SimpleRDF();
531    }
532
533    @Override
534    public Future<ParseResult> parse() throws IOException, IllegalStateException {
535        final AbstractRDFParser<T> c = prepareForParsing();
536        return threadpool.submit(() -> {
537            c.parseSynchronusly();
538            return null;
539        });
540    }
541
542    @Override
543    public T target(final Consumer<Quad> consumer) {
544        final AbstractRDFParser<T> c = clone();
545        c.resetTarget();
546        c.target = consumer;
547        return c.asT();
548    }
549
550    @Override
551    public T target(final Dataset dataset) {
552        @SuppressWarnings({ "rawtypes", "unchecked" })
553        final
554        AbstractRDFParser<T> c = (AbstractRDFParser) RDFParser.super.target(dataset);
555        c.resetTarget();
556        c.targetDataset = Optional.of(dataset);
557        return c.asT();
558    }
559
560    @Override
561    public T target(final Graph graph) {
562        @SuppressWarnings({ "rawtypes", "unchecked" }) // super calls our
563        final
564                                                       // .clone()
565        AbstractRDFParser<T> c = (AbstractRDFParser) RDFParser.super.target(graph);
566        c.resetTarget();
567        c.targetGraph = Optional.of(graph);
568        return c.asT();
569    }
570
571}