X-Git-Url: https://git.arvados.org/arvados-workbench2.git/blobdiff_plain/3f7e1a8afad27920adf8f03ce82eeb1ae58aa84f..6796b44d4934ddff098e1bfcf7b842ec11e4e210:/src/store/process-logs-panel/process-logs-panel-actions.ts diff --git a/src/store/process-logs-panel/process-logs-panel-actions.ts b/src/store/process-logs-panel/process-logs-panel-actions.ts index d4f5ab59..87a2fa12 100644 --- a/src/store/process-logs-panel/process-logs-panel-actions.ts +++ b/src/store/process-logs-panel/process-logs-panel-actions.ts @@ -3,28 +3,36 @@ // SPDX-License-Identifier: AGPL-3.0 import { unionize, ofType, UnionOf } from "common/unionize"; -import { ProcessLogs, getProcessLogsPanelCurrentUuid } from './process-logs-panel'; +import { ProcessLogs } from './process-logs-panel'; import { LogEventType } from 'models/log'; import { RootState } from 'store/store'; import { ServiceRepository } from 'services/services'; import { Dispatch } from 'redux'; -import { groupBy } from 'lodash'; -import { LogResource } from 'models/log'; -import { LogService } from 'services/log-service/log-service'; -import { ResourceEventMessage } from 'websocket/resource-event-message'; -import { getProcess } from 'store/processes/process'; -import { FilterBuilder } from "services/api/filter-builder"; -import { OrderBuilder } from "services/api/order-builder"; +import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service'; +import { Process, getProcess } from 'store/processes/process'; import { navigateTo } from 'store/navigation/navigation-action'; import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions'; +import { CollectionFile, CollectionFileType } from "models/collection-file"; +import { ContainerRequestResource } from "models/container-request"; + +const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`; +const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/; export const processLogsPanelActions = unionize({ RESET_PROCESS_LOGS_PANEL: ofType<{}>(), INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(), SET_PROCESS_LOGS_PANEL_FILTER: ofType(), - ADD_PROCESS_LOGS_PANEL_ITEM: ofType<{ logType: string, log: string }>(), + ADD_PROCESS_LOGS_PANEL_ITEM: ofType(), }); +// Max size of logs to fetch in bytes +const maxLogFetchSize: number = 128 * 1000; + +type FileWithProgress = { + file: CollectionFile; + lastByte: number; +} + export type ProcessLogsPanelAction = UnionOf; export const setProcessLogsPanelFilter = (filter: string) => @@ -32,83 +40,242 @@ export const setProcessLogsPanelFilter = (filter: string) => export const initProcessLogsPanel = (processUuid: string) => async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => { - dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL()); - const process = getProcess(processUuid)(getState().resources); - if (process && process.container) { - const logResources = await loadContainerLogs(process.container.uuid, logService); - const initialState = createInitialLogPanelState(logResources); + try { + dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL()); + const process = getProcess(processUuid)(getState().resources); + if (process?.containerRequest?.uuid) { + // Get log file size info + const logFiles = await loadContainerLogFileList(process.containerRequest, logService); + + // Populate lastbyte 0 for each file + const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0})); + + // Fetch array of LogFragments + const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process); + + // Populate initial state with filters + const initialState = createInitialLogPanelState(logFiles, logLines); + dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState)); + } + } catch(e) { + // On error, populate empty state to allow polling to start + const initialState = createInitialLogPanelState([], []); dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState)); + // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet + if (e.status !== 404) { + dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not load process logs', hideDuration: 2000, kind: SnackbarKind.ERROR })); + } } }; -export const addProcessLogsPanelItem = (message: ResourceEventMessage<{ text: string }>) => +export const pollProcessLogs = (processUuid: string) => async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => { - if (PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(message.eventType) > -1) { - const uuid = getProcessLogsPanelCurrentUuid(getState().router); - if (!uuid) { return } - const process = getProcess(uuid)(getState().resources); - if (!process) { return } - const { containerRequest, container } = process; - if (message.objectUuid === containerRequest.uuid - || (container && message.objectUuid === container.uuid)) { - dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({ - logType: ALL_FILTER_TYPE, - log: message.properties.text - })); - dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({ - logType: message.eventType, - log: message.properties.text - })); - if (MAIN_EVENT_TYPES.indexOf(message.eventType) > -1) { - dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({ - logType: MAIN_FILTER_TYPE, - log: message.properties.text - })); + try { + // Get log panel state and process from store + const currentState = getState().processLogsPanel; + const process = getProcess(processUuid)(getState().resources); + + // Check if container request is present and initial logs state loaded + if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) { + const logFiles = await loadContainerLogFileList(process.containerRequest, logService); + + // Determine byte to fetch from while filtering unchanged files + const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => { + // Fetch last byte or 0 for new log files + const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0; + + const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName))); + const isChanged = !isNew && currentStateLogLastByte < updatedFile.size; + + if (isNew || isChanged) { + return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte}); + } else { + return acc; + } + }, [] as FileWithProgress[]); + + // Perform range request(s) for each file + const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process); + + if (logFragments.length) { + // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging + const groupedLogs = groupLogs(logFiles, logFragments); + await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs)); } } + return Promise.resolve(); + } catch (e) { + // Remove log when polling error is handled in some way instead of being ignored + console.error("Error occurred in pollProcessLogs:", e); + return Promise.reject(); } }; -const loadContainerLogs = async (containerUuid: string, logService: LogService) => { - const requestFilters = new FilterBuilder() - .addEqual('object_uuid', containerUuid) - .addIn('event_type', PROCESS_PANEL_LOG_EVENT_TYPES) - .getFilters(); - const requestOrder = new OrderBuilder() - .addAsc('eventAt') - .getOrder(); - const requestParams = { - limit: MAX_AMOUNT_OF_LOGS, - filters: requestFilters, - order: requestOrder, - }; - const { items } = await logService.list(requestParams); - return items; -}; +const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => { + const logCollectionContents = await logService.listLogFiles(containerRequest); -const createInitialLogPanelState = (logResources: LogResource[]) => { - const allLogs = logsToLines(logResources); - const mainLogs = logsToLines(logResources.filter( - e => MAIN_EVENT_TYPES.indexOf(e.eventType) > -1 + // Filter only root directory files matching log event types which have bytes + return logCollectionContents.filter((file): file is CollectionFile => ( + file.type === CollectionFileType.FILE && + PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 && + file.size > 0 )); - const groupedLogResources = groupBy(logResources, log => log.eventType); - const groupedLogs = Object - .keys(groupedLogResources) - .reduce((grouped, key) => ({ - ...grouped, - [key]: logsToLines(groupedLogResources[key]) - }), {}); - const filters = [MAIN_FILTER_TYPE, ALL_FILTER_TYPE, ...Object.keys(groupedLogs)]; - const logs = { - [MAIN_FILTER_TYPE]: mainLogs, - [ALL_FILTER_TYPE]: allLogs, - ...groupedLogs - }; +}; + +/** + * Loads the contents of each file from each file's lastByte simultaneously + * while respecting the maxLogFetchSize by requesting the start and end + * of the desired block and inserting a snipline. + * @param logFilesWithProgress CollectionFiles with the last byte previously loaded + * @param logService + * @param process + * @returns LogFragment[] containing a single LogFragment corresponding to each input file + */ +const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => ( + (await Promise.allSettled(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => { + const requestSize = file.size - lastByte; + if (requestSize > maxLogFetchSize) { + const chunkSize = Math.floor(maxLogFetchSize / 2); + const firstChunkEnd = lastByte+chunkSize-1; + return Promise.all([ + logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd), + logService.getLogFileContents(process.containerRequest, file, file.size-chunkSize, file.size-1) + ] as Promise<(LogFragment)>[]); + } else { + return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size-1)]); + } + })).then((res) => { + if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) { + // Since allSettled does not pass promise rejection we throw an + // error if every request failed + const error = res.find( + (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected' + )?.reason; + return Promise.reject(error); + } + return res.filter((promiseResult): promiseResult is PromiseFulfilledResult => ( + // Filter out log files with rejected promises + // (Promise.all rejects on any failure) + promiseResult.status === 'fulfilled' && + // Filter out files where any fragment is empty + // (prevent incorrect snipline generation or an un-resumable situation) + !!promiseResult.value.every(logFragment => logFragment.contents.length) + )).map(one => one.value) + })).map((logResponseSet)=> { + // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line + // Don't add snip line as a separate line so that sorting won't reorder it + for (let i = 1; i < logResponseSet.length; i++) { + const fragment = logResponseSet[i-1]; + const lastLineIndex = fragment.contents.length-1; + const lastLineContents = fragment.contents[lastLineIndex]; + const newLastLine = `${lastLineContents}\n${SNIPLINE}`; + + logResponseSet[i-1].contents[lastLineIndex] = newLastLine; + } + + // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment + return logResponseSet.reduce((acc, curr: LogFragment) => ({ + logType: curr.logType, + contents: [...(acc.contents || []), ...curr.contents] + }), {} as LogFragment); + }) +); + +const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => { + const logs = groupLogs(logFiles, logFragments); + const filters = Object.keys(logs); return { filters, logs }; +} + +/** + * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs + * @param logFiles + * @param logFragments + * @returns ProcessLogs for the store + */ +const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => { + const sortableLogFragments = mergeMultilineLoglines(logFragments); + + const allLogs = mergeSortLogFragments(sortableLogFragments); + const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType)))); + + const groupedLogs = logFragments.reduce((grouped, fragment) => ({ + ...grouped, + [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents} + }), {}); + + return { + [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs}, + [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs}, + ...groupedLogs, + } +}; + +/** + * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs + * If there is no previous line (first line has no timestamp), the line is deleted. + * Only used for combined logs that need sorting by timestamp after merging + * @param logFragments + * @returns Modified LogFragment[] + */ +const mergeMultilineLoglines = (logFragments: LogFragment[]) => ( + logFragments.map((fragment) => { + // Avoid altering the original fragment copy + let fragmentCopy: LogFragment = { + logType: fragment.logType, + contents: [...fragment.contents], + } + // Merge any non-timestamped lines in sortable log types with previous line + if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) { + for (let i = 0; i < fragmentCopy.contents.length; i++) { + const lineContents = fragmentCopy.contents[i]; + if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) { + // Partial line without timestamp detected + if (i > 0) { + // If not first line, copy line to previous line + const previousLineContents = fragmentCopy.contents[i-1]; + const newPreviousLineContents = `${previousLineContents}\n${lineContents}`; + fragmentCopy.contents[i-1] = newPreviousLineContents; + } + // Delete the current line and prevent iterating + fragmentCopy.contents.splice(i, 1); + i--; + } + } + } + return fragmentCopy; + }) +); + +/** + * Merges log lines of different types and sorts types that contain timestamps (are sortable) + * @param logFragments + * @returns string[] of merged and sorted log lines + */ +const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => { + const sortableLines = fragmentsToLines(logFragments + .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)))); + + const nonSortableLines = fragmentsToLines(logFragments + .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType))) + .sort((a, b) => (a.logType.localeCompare(b.logType)))); + + return [...nonSortableLines, ...sortableLines.sort(sortLogLines)] }; -const logsToLines = (logs: LogResource[]) => - logs.map(({ properties }) => properties.text); +const sortLogLines = (a: string, b: string) => { + return a.localeCompare(b); +}; + +const fragmentsToLines = (fragments: LogFragment[]): string[] => ( + fragments.reduce((acc, fragment: LogFragment) => ( + acc.concat(...fragment.contents) + ), [] as string[]) +); + +const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => { + return logFiles.find((file) => (file.name.startsWith(key)))?.size +}; export const navigateToLogCollection = (uuid: string) => async (dispatch: Dispatch, getState: () => RootState, services: ServiceRepository) => { @@ -120,8 +287,6 @@ export const navigateToLogCollection = (uuid: string) => } }; -const MAX_AMOUNT_OF_LOGS = 10000; - const ALL_FILTER_TYPE = 'All logs'; const MAIN_FILTER_TYPE = 'Main logs'; @@ -143,3 +308,8 @@ const PROCESS_PANEL_LOG_EVENT_TYPES = [ LogEventType.CONTAINER, LogEventType.KEEPSTORE, ]; + +const NON_SORTED_LOG_TYPES = [ + LogEventType.NODE_INFO, + LogEventType.CONTAINER, +];