ServiceProcessManager.java
001 /* This file is part of the project "Hilbert II" - http://www.qedeq.org
002  *
003  * Copyright 2000-2013,  Michael Meyling <mime@qedeq.org>.
004  *
005  * "Hilbert II" is free software; you can redistribute
006  * it and/or modify it under the terms of the GNU General Public
007  * License as published by the Free Software Foundation; either
008  * version 2 of the License, or (at your option) any later version.
009  *
010  * This program is distributed in the hope that it will be useful,
011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013  * GNU General Public License for more details.
014  */
015 
016 package org.qedeq.kernel.bo.service;
017 
018 import java.util.ArrayList;
019 import java.util.List;
020 
021 import org.qedeq.base.io.Parameters;
022 import org.qedeq.base.trace.Trace;
023 import org.qedeq.kernel.bo.KernelContext;
024 import org.qedeq.kernel.bo.common.PluginCall;
025 import org.qedeq.kernel.bo.common.ServiceProcess;
026 import org.qedeq.kernel.bo.log.QedeqLog;
027 import org.qedeq.kernel.bo.module.InternalServiceProcess;
028 import org.qedeq.kernel.bo.module.KernelQedeqBo;
029 import org.qedeq.kernel.bo.module.PluginBo;
030 import org.qedeq.kernel.bo.module.PluginExecutor;
031 import org.qedeq.kernel.se.common.Plugin;
032 import org.qedeq.kernel.se.visitor.InterruptException;
033 
034 /**
035  * Manage all known processes.
036  */
037 public class ServiceProcessManager {
038 
039     /** This class. */
040     private static final Class CLASS = ServiceProcessManager.class;
041 
042     /** Stores all running processes. */
043     private final List processes = new ArrayList();
044 
045     /** Stores some finished processes. FIXME 20130408 m31: use! */
046     private final List finished = new ArrayList();
047 
048     /** Stores all calls. */
049     private final List calls = new ArrayList();
050 
051     /** Manage all known plugins. */
052     private final PluginManager pluginManager;
053 
054     /** Manage synchronized module access. */
055     private final ModuleArbiter arbiter;
056 
057     /**
058      * Constructor.
059      *
060      @param   pluginManager   Collects process information.
061      @param   arbiter         For module access synchronization.
062      */
063     public ServiceProcessManager(final PluginManager pluginManager, final ModuleArbiter arbiter) {
064         this.pluginManager = pluginManager;
065         this.arbiter = arbiter;
066     }
067 
068 
069     /**
070      * Get all service processes.
071      *
072      @return  All service processes.
073      */
074     public synchronized ServiceProcess[] getServiceProcesses() {
075         return (ServiceProcess[]) processes.toArray(new ServiceProcess[] {});
076     }
077 
078     /**
079      * Get all running service processes. But remember a running process might currently
080      * be blocked.
081      *
082      @return  All service running processes.
083      */
084     public synchronized ServiceProcess[] getRunningServiceProcesses() {
085         final ArrayList result = new ArrayList(processes);
086         for (int i = 0; i < result.size()) {
087             if (!((ServiceProcessresult.get(i)).isRunning()) {
088                 result.remove(i);
089             else {
090                 i++;
091             }
092         }
093         return (ServiceProcess[]) result.toArray(new ServiceProcess[] {});
094     }
095 
096     /**
097      * Create service process.
098      *
099      @param   service     The service that runs in current thread.
100      @param   qedeq       QEDEQ module for service.
101      @param   parameters  Parameter for the service.
102      @param   process     We run in this process.
103      @param   parent      Parent process that creates a new one.
104      @return  Created process.
105      */
106     public synchronized PluginCallImpl createPluginCall(final Plugin service,
107             final KernelQedeqBo qedeq, final Parameters parameters,
108             final InternalServiceProcess process, final PluginCall parent) {
109         final PluginCallImpl call = new PluginCallImpl(service, qedeq, parameters, process, parent);
110         calls.add(call);
111         return call;
112     }
113 
114     /**
115      * Remove all service processes. All processes are also terminated via interruption.
116      */
117     public synchronized void terminateAndRemoveAllServiceProcesses() {
118         terminateAllServiceProcesses();
119         processes.clear();
120     }
121 
122     /**
123      * Terminate all service processes.
124      */
125     public synchronized void terminateAllServiceProcesses() {
126         for (int i = 0; i < processes.size(); i++) {
127             final ServiceProcess proc = (ServiceProcessprocesses.get(i);
128             proc.interrupt();
129         }
130     }
131 
132 
133     /**
134      * Execute a plugin on an QEDEQ module.
135      *
136      @param   id          Plugin to use.
137      @param   qedeq       QEDEQ module to work on.
138      @param   data        Process parameters.
139      @param   proc        Process. Might be <code>null</code>.
140      @return  Plugin specific result object. Might be <code>null</code>.
141      @throws  RuntimeException    Plugin unknown.
142      */
143     Object executePlugin(final String id, final KernelQedeqBo qedeq, final Object data,
144             final InternalServiceProcess proc) {
145         final String method = "executePlugin(String, KernelQedeqBo, Object)";
146         final PluginBo plugin = pluginManager.getPlugin(id);
147         if (plugin == null) {
148             final String message = "Kernel does not know about plugin: ";
149             final RuntimeException e = new RuntimeException(message + id);
150             Trace.fatal(CLASS, this, method, message + id,
151                 e);
152             throw e;
153         }
154         final Parameters parameters = KernelContext.getInstance().getConfig().getPluginEntries(plugin);
155         InternalServiceProcess process = null;
156         if (proc != null) {
157             if (!proc.isRunning()) {
158                 return null;
159             }
160             process = proc;
161         else {
162             process = new ServiceProcessImpl(plugin.getPluginActionName());
163             synchronized (this) {
164                 processes.add(process);
165             }
166         }
167         process.setBlocked(true);
168         final PluginCallImpl call = new PluginCallImpl(plugin, qedeq, parameters, process,
169             process.getPluginCall());
170         process.setPluginCall(call);
171         boolean newBlockedModule = false;
172         try {
173             newBlockedModule = arbiter.lockRequiredModule(process, qedeq);
174         catch (InterruptException e) {
175             final String msg = plugin.getPluginActionName() " was interrupted.";
176             Trace.fatal(CLASS, this, method, msg, e);
177             QedeqLog.getInstance().logFailureReply(msg, qedeq.getUrl(), e.getMessage());
178             call.setFailureState();
179             process.setFailureState();
180             return null;
181         }
182 //        synchronized (qedeq) {
183             process.setBlocked(false);
184 // FIXME            final boolean newBlockedModule = !process.getBlockedModules().contains(qedeq);
185             try {
186                 process.addBlockedModule(qedeq);
187                 final PluginExecutor exe = plugin.createExecutor(qedeq, parameters);
188                 qedeq.setCurrentlyRunningPlugin(plugin);
189                 final Object result = exe.executePlugin(process, data);
190                 if (exe.getInterrupted()) {
191                     call.setFailureState();
192                     process.setFailureState();
193                 else {
194                     call.setSuccessState();
195                 }
196                 return result;
197             catch (final RuntimeException e) {
198                 final String msg = plugin.getPluginActionName() " failed with a runtime exception.";
199                 Trace.fatal(CLASS, this, method, msg, e);
200                 QedeqLog.getInstance().logFailureReply(msg, qedeq.getUrl(), e.getMessage());
201                 call.setFailureState();
202                 process.setFailureState();
203                 return null;
204             finally {
205                 if (newBlockedModule) {
206                     arbiter.unlockRequiredModule(process, qedeq);
207                     process.removeBlockedModule(qedeq);
208                 }
209                 // remove old executor
210                 call.setExecutor(null);
211                 qedeq.setCurrentlyRunningPlugin(null);
212                 // if we created the process we also close it
213                 if (proc == null) {
214                     if (process.isRunning()) {
215                         process.setSuccessState();
216                     }
217                 }
218             }
219 //        }
220     }
221 
222 
223 }