Skip to content
This repository was archived by the owner on Mar 11, 2019. It is now read-only.

Commit 45d82c8

Browse files
huertasmcolmant
authored andcommitted
feature(fuse): implement the FUSE reporter
1 parent d85e870 commit 45d82c8

File tree

3 files changed

+393
-0
lines changed

3 files changed

+393
-0
lines changed

powerapi-core/lib/fusejna.jar

872 KB
Binary file not shown.
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* This software is licensed under the GNU Affero General Public License, quoted below.
3+
*
4+
* This file is a part of PowerAPI.
5+
*
6+
* Copyright (C) 2011-2015 Inria, University of Lille 1.
7+
*
8+
* PowerAPI is free software: you can redistribute it and/or modify
9+
* it under the terms of the GNU Affero General Public License as
10+
* published by the Free Software Foundation, either version 3 of
11+
* the License, or (at your option) any later version.
12+
*
13+
* PowerAPI is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* GNU Affero General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU Affero General Public License
19+
* along with PowerAPI.
20+
*
21+
* If not, please consult http://www.gnu.org/licenses/agpl-3.0.html.
22+
*/
23+
package org.powerapi.reporter
24+
25+
import java.io.File
26+
import java.nio.ByteBuffer
27+
import java.util.UUID
28+
29+
import scalax.file.Path
30+
import scala.concurrent.Await
31+
import scala.concurrent.duration.{ DurationLong, FiniteDuration }
32+
33+
import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props }
34+
import akka.event.LoggingReceive
35+
import akka.pattern.ask
36+
37+
import net.fusejna.DirectoryFiller
38+
import net.fusejna.ErrorCodes
39+
import net.fusejna.FuseException
40+
import net.fusejna.StructFuseFileInfo.FileInfoWrapper
41+
import net.fusejna.StructStat.StatWrapper
42+
import net.fusejna.types.TypeMode.{ ModeWrapper, NodeType }
43+
import net.fusejna.util.FuseFilesystemAdapterFull
44+
45+
import org.powerapi.{ PowerMeter, PowerMonitoring }
46+
import org.powerapi.core.{APIComponent, ConfigValue, Configuration}
47+
import org.powerapi.core.power._
48+
import org.powerapi.core.target._
49+
import org.powerapi.module.PowerChannel.AggregatePowerReport
50+
51+
52+
/**
53+
* Main Configuration
54+
*/
55+
class FuseReporterConfiguration extends Configuration(None) {
56+
lazy val fuseFileName: String = load { _.getString("powerapi.fuse.filename") } match {
57+
case ConfigValue(value) => value
58+
case _ => "./test"
59+
}
60+
}
61+
62+
case class StartProcessMonitoring(frequency: FiniteDuration, targets: Set[String])
63+
case class StopProcessMonitoring(targets: Set[String])
64+
65+
/**
66+
* Dedicated type of message to get each monitored processes with
67+
* it corresponding energy consumption.
68+
*/
69+
object GetEnergyInfo
70+
71+
object StopFuse
72+
73+
/**
74+
* Display power information into a virtual file system using FUSE tool.
75+
*
76+
* @author Loïc Huertas <l.huertas.pro@gmail.com>
77+
*/
78+
class FuseReporter(pm: PowerMeter) extends FuseReporterConfiguration with APIComponent {
79+
private lazy val powerAPIFuse = new PowerAPIFuse(self, fuseFileName)
80+
private var powerAPIFuseThread: Option[Thread] = None
81+
82+
// monitored processes with it corresponding energy consumption.
83+
// The value is compute from aggregate power reports.
84+
// [process -> (timestamp, energy, power, muid)]
85+
lazy val energyInfo = collection.mutable.HashMap[Set[Target], (Long, Double, Double, Option[UUID])]()
86+
87+
88+
override def preStart() {
89+
powerAPIFuseThread = Some(new Thread(powerAPIFuse))
90+
powerAPIFuseThread.get.start
91+
}
92+
93+
def receive: PartialFunction[Any, Unit] = LoggingReceive {
94+
case msg: AggregatePowerReport => report(msg)
95+
case msg: StartProcessMonitoring => start(msg)
96+
case msg: StopProcessMonitoring => stop(msg)
97+
case StopFuse => stopAll
98+
case GetEnergyInfo => getEnergyInfo
99+
} orElse default
100+
101+
def report(aggPowerReport: AggregatePowerReport): Unit = {
102+
if (energyInfo contains aggPowerReport.targets) {
103+
energyInfo(aggPowerReport.targets) = (aggPowerReport.tick.timestamp,
104+
aggPowerReport.power.toWatts,
105+
energyInfo(aggPowerReport.targets)._3 + aggPowerReport.power.toWatts,
106+
Some(aggPowerReport.muid))
107+
}
108+
}
109+
110+
def start(msg: StartProcessMonitoring): Unit = {
111+
val targets: Set[Target] = for (target <- msg.targets) yield {
112+
if (target forall Character.isDigit) Process(target.toInt) else Application(target)
113+
}
114+
if (!energyInfo.contains(targets))
115+
pm.monitor(msg.frequency)(targets.toSeq:_*) to self
116+
}
117+
118+
def stop(msg: StopProcessMonitoring): Unit = {
119+
val targets: Set[Target] = for (target <- msg.targets) yield {
120+
if (target forall Character.isDigit) Process(target.toInt) else Application(target)
121+
}
122+
if (energyInfo contains targets)
123+
energyInfo(targets)._4 match {
124+
case Some(muid) => pm.stopMonitor(muid)
125+
case None => log.warning("target(s) {} doesn't exists", targets)
126+
}
127+
}
128+
129+
def stopAll: Unit = {
130+
powerAPIFuseThread match {
131+
case Some(thread) => {
132+
powerAPIFuse.unmount
133+
thread.interrupt
134+
}
135+
case _ => log.error("The FUSE thread is not created")
136+
}
137+
self ! PoisonPill
138+
}
139+
140+
def getEnergyInfo: Unit = {
141+
val currentProcesses = pm.getMonitoredProcesses.toSet
142+
val oldProcesses = energyInfo.keySet -- currentProcesses
143+
energyInfo --= oldProcesses
144+
val newProcesses = currentProcesses -- energyInfo.keySet
145+
newProcesses.foreach(targets =>
146+
energyInfo += (targets -> energyInfo.getOrElse(targets, (0, 0.0, 0.0, None)))
147+
)
148+
sender ! energyInfo.clone
149+
}
150+
}
151+
152+
class PowerAPIFuse(reporter: ActorRef, fuseFileName: String) extends FuseFilesystemAdapterFull with Runnable {
153+
// ------------------
154+
// --- Thread part --------------------------------------------------
155+
156+
def run = this.log(false).mount(mountPoint)
157+
158+
// ---------------------
159+
// --- FUSE-jna part --------------------------------------------------
160+
161+
lazy val mountPoint = {
162+
Path.fromString(fuseFileName).createDirectory(createParents=false, failIfExists=false)
163+
fuseFileName
164+
}
165+
166+
val pidsFileName = "pids"
167+
lazy val conf = collection.mutable.HashMap[String, String](
168+
("frequency" -> "1")
169+
)
170+
lazy val Dir = """/|/energy""".r
171+
lazy val EnergyPidFileFormat = """/energy/([\w+(,)?]+)/(\w+)""".r
172+
lazy val EnergyPidDirFormat = """/energy/([\w+(,)?]+)""".r
173+
174+
175+
override def getattr(path: String, stat: StatWrapper) =
176+
path match {
177+
case Dir() => {
178+
stat.setMode(NodeType.DIRECTORY)
179+
0
180+
}
181+
case EnergyPidDirFormat(pid) if energyInfo contains pid => {
182+
stat.setMode(NodeType.DIRECTORY)
183+
0
184+
}
185+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (conf contains file)) => {
186+
stat.setMode(NodeType.FILE).size(conf(file).length+1)
187+
0
188+
}
189+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (file == "energy")) => {
190+
while (energyInfo.apply(pid)._1 <= 0) Thread.sleep(1000)
191+
val v = energyInfo.apply(pid)
192+
stat.setMode(NodeType.FILE).size((pid + " " + v._3 + "\n").length+1)
193+
0
194+
}
195+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (file == "power")) => {
196+
while (energyInfo.apply(pid)._1 <= 0) Thread.sleep(1000)
197+
val v = energyInfo.apply(pid)
198+
stat.setMode(NodeType.FILE).size((pid + " " + v._1 + " " + v._2).length+1)
199+
0
200+
}
201+
case _ => -ErrorCodes.ENOENT()
202+
}
203+
204+
override def read(path: String, buffer: ByteBuffer, size: Long, offset: Long, info: FileInfoWrapper) =
205+
{
206+
val content = path match {
207+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (conf contains file)) => conf(file) + "\n"
208+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (file == "energy")) => {
209+
while (energyInfo.apply(pid)._1 <= 0) Thread.sleep(1000)
210+
val v = energyInfo.apply(pid)
211+
pid + " " + v._3 + "\n"
212+
}
213+
case EnergyPidFileFormat(pid, file) if ((energyInfo contains pid) && (file == "power")) => {
214+
while (energyInfo.apply(pid)._1 <= 0) Thread.sleep(1000)
215+
val v = energyInfo.apply(pid)
216+
pid + " " + v._1 + " " + v._2 + "\n"
217+
}
218+
case _ => ""
219+
}
220+
221+
val s = content.substring(offset.asInstanceOf[Int],
222+
Math.max(offset,
223+
Math.min(content.length() - offset, offset + size)
224+
).asInstanceOf[Int])
225+
buffer.put(s.getBytes())
226+
s.getBytes().length
227+
}
228+
229+
override def readdir(path: String, filler: DirectoryFiller) =
230+
path match {
231+
case File.separator => {
232+
filler.add("energy")
233+
0
234+
}
235+
case "/energy" => {
236+
energyInfo.keySet.foreach(pid => filler.add(pid))
237+
0
238+
}
239+
case EnergyPidDirFormat(pid) if energyInfo contains pid => {
240+
conf.keySet.foreach(confFile => filler.add(confFile))
241+
filler.add("power")
242+
filler.add("energy")
243+
0
244+
}
245+
case _ => -ErrorCodes.ENOENT()
246+
}
247+
248+
override def mkdir(path: String, mode: ModeWrapper) =
249+
path match {
250+
case EnergyPidDirFormat(pid) if !(energyInfo contains pid) => {
251+
reporter ! StartProcessMonitoring(conf("frequency").toLong.seconds, pid.split(",").toSet)
252+
0
253+
}
254+
case _ => -ErrorCodes.ENOENT()
255+
}
256+
257+
override def rmdir(path: String) =
258+
path match {
259+
case EnergyPidDirFormat(pid) if energyInfo contains pid => {
260+
reporter ! StopProcessMonitoring(pid.split(",").toSet)
261+
0
262+
}
263+
case _ => -ErrorCodes.ENOENT()
264+
}
265+
266+
private def energyInfo = Await.result(
267+
reporter.ask(GetEnergyInfo)(5.seconds), 5.seconds
268+
).asInstanceOf[collection.mutable.HashMap[Set[Target], (Long, Double, Double, Option[UUID])]].map(
269+
entry => (entry._1.mkString(",") -> entry._2)
270+
)
271+
}
272+
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* This software is licensed under the GNU Affero General Public License, quoted below.
3+
*
4+
* This file is a part of PowerAPI.
5+
*
6+
* Copyright (C) 2011-2015 Inria, University of Lille 1.
7+
*
8+
* PowerAPI is free software: you can redistribute it and/or modify
9+
* it under the terms of the GNU Affero General Public License as
10+
* published by the Free Software Foundation, either version 3 of
11+
* the License, or (at your option) any later version.
12+
*
13+
* PowerAPI is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* GNU Affero General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU Affero General Public License
19+
* along with PowerAPI.
20+
*
21+
* If not, please consult http://www.gnu.org/licenses/agpl-3.0.html.
22+
*/
23+
package org.powerapi.reporter
24+
25+
import com.typesafe.config.ConfigFactory
26+
27+
import scala.concurrent.duration.{FiniteDuration, Duration, DurationInt}
28+
import scalax.io.Resource
29+
import scalax.file.Path
30+
31+
import akka.actor.{ActorSystem, Props}
32+
import akka.testkit.TestKit
33+
import akka.util.Timeout
34+
35+
import org.powerapi.{PowerMeter, UnitTest}
36+
import org.powerapi.module.cpu.simple.ProcFSCpuSimpleModule
37+
38+
39+
class FuseReporterSuite(system: ActorSystem) extends UnitTest(system) {
40+
41+
implicit val timeout = Timeout(1.seconds)
42+
43+
def this() = this(ActorSystem("FuseReporterSuite"))
44+
45+
val eventListener = ConfigFactory.parseResources("test.conf")
46+
47+
override def afterAll() = {
48+
TestKit.shutdownActorSystem(system)
49+
}
50+
51+
"A FUSE reporter" should "display a list of monitored processes in a virtual file system" ignore {
52+
val _system = ActorSystem("FuseReporterSuiteTest1", eventListener)
53+
54+
val cpu_simple = new PowerMeter(_system, Seq(ProcFSCpuSimpleModule()))
55+
val fuse = _system.actorOf(Props(classOf[FuseReporter], cpu_simple))
56+
57+
val configuration = new FuseReporterConfiguration {}
58+
configuration.fuseFileName should equal("./PowerAPI")
59+
60+
val powerAPIFile = configuration.fuseFileName
61+
62+
//cpu_simple.waitFor(1.hour)
63+
Thread.sleep((1.second).toMillis)
64+
65+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("ls","-l",powerAPIFile+"/")).getInputStream).lines()
66+
.toList(1).split("\\s").last should equal ("energy")
67+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("ls","-l",powerAPIFile+"/energy/")).getInputStream).lines()
68+
.toList should have size 1
69+
70+
Runtime.getRuntime.exec(Array("mkdir",powerAPIFile+"/energy/1234"))
71+
72+
//Thread.sleep((1.second).toMillis)
73+
74+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("ls","-l",powerAPIFile+"/energy")).getInputStream).lines()
75+
.toList.map(_.split("\\s").last) should contain ("1234")
76+
val pidDir = Resource.fromInputStream(Runtime.getRuntime.exec(Array("ls","-l",powerAPIFile+"/energy/1234")).getInputStream).lines()
77+
.toList.map(_.split("\\s").last)
78+
pidDir should have size 4
79+
pidDir should contain ("frequency")
80+
pidDir should contain ("energy")
81+
pidDir should contain ("power")
82+
83+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("cat",powerAPIFile+"/energy/1234/frequency")).getInputStream).lines()
84+
.toList(0).size should be > 0
85+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("cat",powerAPIFile+"/energy/1234/energy")).getInputStream).lines()
86+
.toList(0).size should be > 0
87+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("cat",powerAPIFile+"/energy/1234/power")).getInputStream).lines()
88+
.toList(0).size should be > 0
89+
90+
Runtime.getRuntime.exec(Array("rmdir",powerAPIFile+"/energy/1234"))
91+
92+
Thread.sleep((1.second).toMillis)
93+
94+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("ls","-l",powerAPIFile+"/energy/")).getInputStream).lines()
95+
.toList should have size 1
96+
97+
Runtime.getRuntime.exec(Array("mkdir",powerAPIFile+"/energy/firefox"))
98+
99+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("cat",powerAPIFile+"/energy/firefox/power")).getInputStream).lines()
100+
.toList(0).size should be > 0
101+
102+
Runtime.getRuntime.exec(Array("rmdir",powerAPIFile+"/energy/firefox"))
103+
104+
Thread.sleep((1.second).toMillis)
105+
106+
Runtime.getRuntime.exec(Array("mkdir",powerAPIFile+"/energy/123,456"))
107+
108+
Resource.fromInputStream(Runtime.getRuntime.exec(Array("cat",powerAPIFile+"/energy/123,456/power")).getInputStream).lines()
109+
.toList(0).size should be > 0
110+
111+
Runtime.getRuntime.exec(Array("rmdir",powerAPIFile+"/energy/123,456"))
112+
113+
Thread.sleep((1.second).toMillis)
114+
115+
cpu_simple.shutdown
116+
fuse ! StopFuse
117+
_system.shutdown()
118+
_system.awaitTermination(timeout.duration)
119+
}
120+
}
121+

0 commit comments

Comments
 (0)