<?php
/**
* Perforce Swarm
*
* @copyright 2012 Perforce Software. All rights reserved.
* @license Please see LICENSE.txt in top-level folder of this distribution.
* @version <release>/<patch>
*/
namespace Activity;
use Activity\Model\Activity;
use P4\Key\Key;
use P4\Spec\Definition as SpecDefinition;
use P4\Spec\Job;
use Projects\Model\Project;
use Reviews\Model\Review;
use Users\Model\Config as UserConfig;
use Users\Model\Group;
use Zend\Db\Sql\Sql;
use Zend\Mvc\MvcEvent;
class Module
{
/**
* Connect to queue events to record activity data.
*
* @param Event $event the bootstrap event
* @return void
*/
public function onBootstrap(MvcEvent $event)
{
$application = $event->getApplication();
$services = $application->getServiceManager();
$manager = $services->get('queue');
$events = $manager->getEventManager();
// connect to all tasks and write activity data
// we do this late (low-priority) so all handlers have
// a chance to influence the activity model.
$events->attach(
'*',
function ($event) use ($services) {
$model = $event->getParam('activity');
if (!$model instanceof Activity) {
return;
}
// ignore 'quiet' events.
$data = (array) $event->getParam('data') + array('quiet' => null);
$quiet = $event->getParam('quiet', $data['quiet']);
if ($quiet === true || in_array('activity', (array) $quiet)) {
return;
}
// don't record activity by users we ignore.
$config = $services->get('config');
$ignore = isset($config['activity']['ignored_users'])
? (array) $config['activity']['ignored_users']
: array();
if (in_array($model->get('user'), $ignore)) {
return;
}
// all activity should appear in the activity streams
// of the user that initiated the activity.
$model->addStream('user-' . $model->get('user'))
->addStream('personal-' . $model->get('user'));
// add anyone who follows the user that initiated this activity
$p4Admin = $services->get('p4_admin');
$model->addFollowers(
UserConfig::fetchFollowerIds($model->get('user'), 'user', $p4Admin)
);
// projects that are affected should also get the activity
// and, by extension, project members should see it too.
if ($model->getProjects()) {
$groups = Group::getCachedData($p4Admin);
$projects = Project::fetchAll(
array(Project::FETCH_BY_IDS => array_keys($model->getProjects())),
$p4Admin
);
foreach ($projects as $project) {
$model->addStream('project-' . $project->getId());
foreach ($project->getAllMembers(false, $groups) as $member) {
$model->addFollowers($member);
}
}
}
// activity related to a review should include review participants
// and should appear in the activity stream for the review itself
$review = $event->getParam('review');
if ($review instanceof Review) {
$model->addFollowers($review->getParticipants());
$model->addStream('review-' . $review->getId());
}
// ensure all 'followers' have this event on their personal stream
foreach ($model->getFollowers() as $follower) {
$model->addStream('personal-' . $follower);
}
try {
$model->setConnection($p4Admin)->save();
} catch (\Exception $e) {
$services->get('logger')->err($e);
}
},
-100
);
// connect to worker startup to check if we need to prime activity
// data (ie. this is a first run against an existing server).
$events->attach(
'worker.startup',
function ($event) use ($services, $manager, $events) {
// only run for the first worker.
if ($event->getParam('slot') !== 1) {
return;
}
// if we already have an event counter, nothing to do.
$p4Admin = $services->get('p4_admin');
if (Key::exists(Activity::KEY_COUNT, $p4Admin)) {
return;
}
// initialize count to zero so we exit early next time.
$key = new Key($p4Admin);
$key->setId(Activity::KEY_COUNT)
->set(0);
// looks like we're going to do the initial import, tie up as many
// worker slots as we can to minimize concurrency/out-of-order issues
// (if other workers were already running, we won't get all the slots)
// release these slots on shutdown - only really needed when testing
$slots = array();
while ($slot = $manager->getWorkerSlot()) {
$slots[] = $slot;
}
$events->attach(
'worker.shutdown',
function () use ($slots, $manager) {
foreach ($slots as $slot) {
$manager->releaseWorkerSlot($slot);
}
}
);
// grab the last 10k changes and get ready to queue them.
$queue = array();
$changes = $p4Admin->run('changes', array('-m10000', '-s', 'submitted'));
foreach ($changes->getData() as $change) {
$queue[] = array(
'type' => 'commit',
'id' => $change['change'],
'time' => (int) $change['time']
);
}
// grab the last 10k jobs and get ready to queue them.
// note, jobspec is mutable so we get the date via its code
try {
// use modified date field if available, falling-back to the default date field.
// often this will be the same field, by default the date field is a modified date.
$job = new Job($p4Admin);
$spec = SpecDefinition::fetch('job', $p4Admin);
$date = $job->hasModifiedDateField()
? $job->getModifiedDateField()
: $spec->fieldCodeToName(104);
$jobs = $p4Admin->run('jobs', array('-m10000', '-r'));
foreach ($jobs->getData() as $job) {
if (isset($job[$date])) {
$queue[] = array(
'type' => 'job',
'id' => $job['Job'],
'time' => strtotime($job[$date])
);
}
}
} catch (\Exception $e) {
$services->get('logger')->err($e);
}
// sort items by time so they are processed in order
// if other workers are already pulling tasks from the queue.
usort(
$queue,
function ($a, $b) {
return $a['time'] - $b['time'];
}
);
// we don't want to duplicate activity
// it's possible there are already tasks in the queue
// (imagine the trigger was running, but the workers were not),
// if there are >10k abort; else fetch them so we can skip them.
if ($manager->getTaskCount() > 10000) {
return;
}
$skip = array();
foreach ($manager->getTaskFiles() as $file) {
$task = $manager->parseTaskFile($file);
if ($task) {
$skip[$task['type'] . ',' . $task['id']] = true;
}
}
// again, we don't want to duplicate activity
// if there is any activity at this point, abort.
if (Key::fetch(Activity::KEY_COUNT, $p4Admin)->get()) {
return;
}
// add jobs and changes to the queue
foreach ($queue as $task) {
if (!isset($skip[$task['type'] . ',' . $task['id']])) {
$manager->addTask($task['type'], $task['id'], null, $task['time']);
}
}
}
);
}
public function getConfig()
{
return include __DIR__ . '/config/module.config.php';
}
public function getAutoloaderConfig()
{
return array(
'Zend\Loader\StandardAutoloader' => array(
'namespaces' => array(
__NAMESPACE__ => __DIR__ . '/src/' . __NAMESPACE__,
),
),
);
}
}