001 /* This file is part of the project "Hilbert II" - http://www.qedeq.org
002 *
003 * Copyright 2000-2013, Michael Meyling <mime@qedeq.org>.
004 *
005 * "Hilbert II" is free software; you can redistribute
006 * it and/or modify it under the terms of the GNU General Public
007 * License as published by the Free Software Foundation; either
008 * version 2 of the License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU General Public License for more details.
014 */
015
016 package org.qedeq.kernel.bo.service;
017
018 import java.util.ArrayList;
019 import java.util.List;
020
021 import org.qedeq.base.io.Parameters;
022 import org.qedeq.base.trace.Trace;
023 import org.qedeq.kernel.bo.KernelContext;
024 import org.qedeq.kernel.bo.common.PluginCall;
025 import org.qedeq.kernel.bo.common.ServiceProcess;
026 import org.qedeq.kernel.bo.log.QedeqLog;
027 import org.qedeq.kernel.bo.module.InternalServiceProcess;
028 import org.qedeq.kernel.bo.module.KernelQedeqBo;
029 import org.qedeq.kernel.bo.module.PluginBo;
030 import org.qedeq.kernel.bo.module.PluginExecutor;
031 import org.qedeq.kernel.se.common.Plugin;
032 import org.qedeq.kernel.se.visitor.InterruptException;
033
034 /**
035 * Manage all known processes.
036 */
037 public class ServiceProcessManager {
038
039 /** This class. */
040 private static final Class CLASS = ServiceProcessManager.class;
041
042 /** Stores all running processes. */
043 private final List processes = new ArrayList();
044
045 /** Stores some finished processes. FIXME 20130408 m31: use! */
046 private final List finished = new ArrayList();
047
048 /** Stores all calls. */
049 private final List calls = new ArrayList();
050
051 /** Manage all known plugins. */
052 private final PluginManager pluginManager;
053
054 /** Manage synchronized module access. */
055 private final ModuleArbiter arbiter;
056
057 /**
058 * Constructor.
059 *
060 * @param pluginManager Collects process information.
061 * @param arbiter For module access synchronization.
062 */
063 public ServiceProcessManager(final PluginManager pluginManager, final ModuleArbiter arbiter) {
064 this.pluginManager = pluginManager;
065 this.arbiter = arbiter;
066 }
067
068
069 /**
070 * Get all service processes.
071 *
072 * @return All service processes.
073 */
074 public synchronized ServiceProcess[] getServiceProcesses() {
075 return (ServiceProcess[]) processes.toArray(new ServiceProcess[] {});
076 }
077
078 /**
079 * Get all running service processes. But remember a running process might currently
080 * be blocked.
081 *
082 * @return All service running processes.
083 */
084 public synchronized ServiceProcess[] getRunningServiceProcesses() {
085 final ArrayList result = new ArrayList(processes);
086 for (int i = 0; i < result.size(); ) {
087 if (!((ServiceProcess) result.get(i)).isRunning()) {
088 result.remove(i);
089 } else {
090 i++;
091 }
092 }
093 return (ServiceProcess[]) result.toArray(new ServiceProcess[] {});
094 }
095
096 /**
097 * Create service process.
098 *
099 * @param service The service that runs in current thread.
100 * @param qedeq QEDEQ module for service.
101 * @param parameters Parameter for the service.
102 * @param process We run in this process.
103 * @param parent Parent process that creates a new one.
104 * @return Created process.
105 */
106 public synchronized PluginCallImpl createPluginCall(final Plugin service,
107 final KernelQedeqBo qedeq, final Parameters parameters,
108 final InternalServiceProcess process, final PluginCall parent) {
109 final PluginCallImpl call = new PluginCallImpl(service, qedeq, parameters, process, parent);
110 calls.add(call);
111 return call;
112 }
113
114 /**
115 * Remove all service processes. All processes are also terminated via interruption.
116 */
117 public synchronized void terminateAndRemoveAllServiceProcesses() {
118 terminateAllServiceProcesses();
119 processes.clear();
120 }
121
122 /**
123 * Terminate all service processes.
124 */
125 public synchronized void terminateAllServiceProcesses() {
126 for (int i = 0; i < processes.size(); i++) {
127 final ServiceProcess proc = (ServiceProcess) processes.get(i);
128 proc.interrupt();
129 }
130 }
131
132
133 /**
134 * Execute a plugin on an QEDEQ module.
135 *
136 * @param id Plugin to use.
137 * @param qedeq QEDEQ module to work on.
138 * @param data Process parameters.
139 * @param proc Process. Might be <code>null</code>.
140 * @return Plugin specific result object. Might be <code>null</code>.
141 * @throws RuntimeException Plugin unknown.
142 */
143 Object executePlugin(final String id, final KernelQedeqBo qedeq, final Object data,
144 final InternalServiceProcess proc) {
145 final String method = "executePlugin(String, KernelQedeqBo, Object)";
146 final PluginBo plugin = pluginManager.getPlugin(id);
147 if (plugin == null) {
148 final String message = "Kernel does not know about plugin: ";
149 final RuntimeException e = new RuntimeException(message + id);
150 Trace.fatal(CLASS, this, method, message + id,
151 e);
152 throw e;
153 }
154 final Parameters parameters = KernelContext.getInstance().getConfig().getPluginEntries(plugin);
155 InternalServiceProcess process = null;
156 if (proc != null) {
157 if (!proc.isRunning()) {
158 return null;
159 }
160 process = proc;
161 } else {
162 process = new ServiceProcessImpl(plugin.getPluginActionName());
163 synchronized (this) {
164 processes.add(process);
165 }
166 }
167 process.setBlocked(true);
168 final PluginCallImpl call = new PluginCallImpl(plugin, qedeq, parameters, process,
169 process.getPluginCall());
170 process.setPluginCall(call);
171 boolean newBlockedModule = false;
172 try {
173 newBlockedModule = arbiter.lockRequiredModule(process, qedeq);
174 } catch (InterruptException e) {
175 final String msg = plugin.getPluginActionName() + " was interrupted.";
176 Trace.fatal(CLASS, this, method, msg, e);
177 QedeqLog.getInstance().logFailureReply(msg, qedeq.getUrl(), e.getMessage());
178 call.setFailureState();
179 process.setFailureState();
180 return null;
181 }
182 // synchronized (qedeq) {
183 process.setBlocked(false);
184 // FIXME final boolean newBlockedModule = !process.getBlockedModules().contains(qedeq);
185 try {
186 process.addBlockedModule(qedeq);
187 final PluginExecutor exe = plugin.createExecutor(qedeq, parameters);
188 qedeq.setCurrentlyRunningPlugin(plugin);
189 final Object result = exe.executePlugin(process, data);
190 if (exe.getInterrupted()) {
191 call.setFailureState();
192 process.setFailureState();
193 } else {
194 call.setSuccessState();
195 }
196 return result;
197 } catch (final RuntimeException e) {
198 final String msg = plugin.getPluginActionName() + " failed with a runtime exception.";
199 Trace.fatal(CLASS, this, method, msg, e);
200 QedeqLog.getInstance().logFailureReply(msg, qedeq.getUrl(), e.getMessage());
201 call.setFailureState();
202 process.setFailureState();
203 return null;
204 } finally {
205 if (newBlockedModule) {
206 arbiter.unlockRequiredModule(process, qedeq);
207 process.removeBlockedModule(qedeq);
208 }
209 // remove old executor
210 call.setExecutor(null);
211 qedeq.setCurrentlyRunningPlugin(null);
212 // if we created the process we also close it
213 if (proc == null) {
214 if (process.isRunning()) {
215 process.setSuccessState();
216 }
217 }
218 }
219 // }
220 }
221
222
223 }
|